Explorar el Código

Merge branch 'new_sqlite' of https://github.com/pwman3/pwman3 into new_sqlite

Tiram hace 10 años
padre
commit
29246e94ff

+ 8 - 5
Makefile

@@ -1,4 +1,4 @@
-.PHONY: clean-pyc clean-build docs clean test
+.PHONY: clean-pyc clean-build docs clean test coverage coverage-run
 
 help:
 	@echo "clean-build - remove build artifacts"
@@ -13,6 +13,7 @@ help:
 
 clean: clean-build clean-pyc
 	rm -fr htmlcov/
+	rm -f test.db
 
 clean-build:
 	rm -fr build/
@@ -27,18 +28,20 @@ clean-pyc:
 lint:
 	flake8 pwman scripts
 
-test: install
+test: install clean 
 	git checkout pwman/tests/pwman.v0.0.8.db
 	python setup.py test
 
 test-all:
 	tox
 
-coverage:
+coverage-run:
 	coverage run --source pwman setup.py test
 	coverage report -m
-	coverage html
-	#xdg-open htmlcov/index.html
+	@coverage html
+
+coverage: coverage-run
+	@rm test.db
 
 docs:
 	#rm -f docs/manutils.rst

+ 129 - 6
pwman/data/drivers/sqlite.py

@@ -1,4 +1,4 @@
-#============================================================================
+# ============================================================================
 # This file is part of Pwman3.
 #
 # Pwman3 is free software; you can redistribute iut and/or modify
@@ -13,12 +13,11 @@
 # You should have received a copy of the GNU General Public License
 # along with Pwman3; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
-#============================================================================
-# Copyright (C) 2012 Oz Nahum <nahumoz@gmail.com>
-#============================================================================
-#============================================================================
+# ============================================================================
+# Copyright (C) 2012, 2013, 2014 Oz Nahum Tiram <nahumoz@gmail.com>
+# ============================================================================
 # Copyright (C) 2006 Ivan Kelly <ivan@ivankelly.net>
-#============================================================================
+# ============================================================================
 
 """SQLite Database implementation."""
 from pwman.data.database import Database, DatabaseException
@@ -354,3 +353,127 @@ class SQLiteDatabaseNewForm(Database):
             return None
         else:
             return keyrow[0]
+
+
+class SQLite(SQLiteDatabaseNewForm):
+
+    def __init__(self, filename, dbformat=0.6):
+        """Initialise SQLitePwmanDatabase instance."""
+        self._filename = filename
+        self.dbformat = dbformat
+
+    def _open(self):
+        self._con = sqlite.connect(self._filename)
+        self._cur = self._con.cursor()
+
+    def listnodes(self, filter=None):
+        if not filter:
+            sql_all = "SELECT ID FROM NODE"
+            self._cur.execute(sql_all)
+            ids = self._cur.fetchall()
+            return ids
+        else:
+            tagid = self._get_tag(filter)
+            sql_filter = "SELECT NODEID FROM LOOKUP WHERE TAGID = ? "
+            self._cur.execute(sql_filter, tagid)
+            ids = self._cur.fetchall()
+            return ids
+
+    def _create_tables(self):
+        self._cur.execute("PRAGMA TABLE_INFO(NODE)")
+        if self._cur.fetchone() is not None:
+            return
+
+        self._cur.execute("CREATE TABLE NODE (ID INTEGER PRIMARY KEY "
+                          "AUTOINCREMENT, "
+                          "USER TEXT NOT NULL, "
+                          "PASSWORD TEXT NOT NULL, "
+                          "URL TEXT NOT NULL,"
+                          "NOTES TEXT NOT NULL)")
+
+        self._cur.execute("CREATE TABLE TAG"
+                          "(ID INTEGER PRIMARY KEY AUTOINCREMENT,"
+                          "DATA BLOB NOT NULL UNIQUE)")
+
+        self._cur.execute("CREATE TABLE LOOKUP ("
+                          "nodeid INTEGER NOT NULL, "
+                          "tagid INTEGER NOT NULL, "
+                          "FOREIGN KEY(nodeid) REFERENCES NODE(ID),"
+                          "FOREIGN KEY(tagid) REFERENCES TAG(ID))")
+
+        self._cur.execute("CREATE TABLE CRYPTO"
+                          "(SEED TEXT,"
+                          " DIGEST TEXT)")
+
+        # create a table to hold DB version info
+        self._cur.execute("CREATE TABLE DBVERSION"
+                          "(DB VERSION TEXT NOT NULL DEFAULT '%s')" %
+                          self.dbformat)
+        self._cur.execute("INSERT INTO DBVERSION VALUES('%s')" %
+                          self.dbformat)
+        try:
+            self._con.commit()
+        except DatabaseException as e:  # pragma: no cover
+            self._con.rollback()
+            raise e
+
+    def fetch_crypto_info(self):
+        self._cur.execute("SELECT * FROM CRYPTO")
+        keyrow = self._cur.fetchone()
+        return keyrow
+
+    def save_crypto_info(self, seed, digest):
+        """save the random seed and the digested key"""
+        self._cur.execute("DELETE  FROM CRYPTO")
+        self._cur.execute("INSERT INTO CRYPTO VALUES(?, ?)", [seed, digest])
+        self._con.commit()
+
+    def add_node(self, node):
+        sql = ("INSERT INTO NODE(USER, PASSWORD, URL, NOTES)"
+               "VALUES(?, ?, ?, ?)")
+        node_tags = list(node)
+        node, tags = node_tags[:4], node_tags[-1]
+        self._cur.execute(sql, (node))
+        self._setnodetags(self._cur.lastrowid, tags)
+        self._con.commit()
+
+    def _get_tag(self, tagcipher):
+        sql_search = "SELECT ID FROM TAG WHERE DATA LIKE (?)"
+        self._cur.execute(sql_search, ([tagcipher]))
+        rv = self._cur.fetchone()
+        return rv
+
+    def _get_or_create_tag(self, tagcipher):
+        rv = self._get_tag(tagcipher)
+        if rv:
+            return rv[0]
+        else:
+            sql_insert = "INSERT INTO TAG(DATA) VALUES(?)"
+            self._cur.execute(sql_insert, ([tagcipher]))
+            return self._cur.lastrowid
+
+    def _update_tag_lookup(self, nodeid, tid):
+        sql_lookup = "INSERT INTO LOOKUP(nodeid, tagid) VALUES(?,?)"
+        self._cur.execute(sql_lookup, (nodeid, tid))
+
+    def _setnodetags(self, nodeid, tags):
+        for tag in tags:
+            tid = self._get_or_create_tag(tag)
+            self._update_tag_lookup(nodeid, tid)
+
+    def getnodes(self, ids):
+        sql = "SELECT * FROM NODE WHERE ID IN (%s)" % ','.join('?'*len(ids))
+        self._cur.execute(sql, (ids))
+        nodes = self._cur.fetchall()
+        return nodes
+
+    def editnode(self, id, **kwargs):
+        sql = ("UPDATE NODE SET %s WHERE ID = ? "
+               "" % ','.join('%s=?' % k for k in list(kwargs)))
+        self._cur.execute(sql, (list(kwargs.values()) + [id]))
+        self._con.commit()
+        # TODO, update tags lookup
+
+    def close(self):
+        self._cur.close()
+        self._con.close()

+ 99 - 0
pwman/data/nodes.py

@@ -127,3 +127,102 @@ class NewNode(object):
         """Set the Notes."""
         enc = CryptoEngine.get()
         self._notes = enc.encrypt(value).strip()
+
+
+class Node(object):
+
+    def __init__(self, clear_text=True, **kwargs):
+        if clear_text:
+            enc = CryptoEngine.get()
+            self._username = enc.encrypt(kwargs.get('username')).strip()
+            self._password = enc.encrypt(kwargs.get('password')).strip()
+            self._url = enc.encrypt(kwargs.get('url')).strip()
+            self._notes = enc.encrypt(kwargs.get('notes')).strip()
+            self._tags = [enc.encrypt(t).strip() for t in kwargs.get('tags')]
+
+    @classmethod
+    def from_encrypted_entries(cls, username, password, url, notes, tags):
+        """
+        We use this alternatively, to create a node instance when reading
+        the encrypted entities from the database
+        """
+        node = Node(clear_text=False)
+        node._username = username.strip()
+        node._password = password.strip()
+        node._url = url.strip()
+        node._notes = notes.strip()
+        node._tags = [t.strip() for t in tags]
+        return node
+
+    def __repr__(self):
+        """we use this method to write node to the database"""
+        res = u''
+        for item in self.__dir__:
+            res += self.__dir__[item]
+        return res
+
+    def __iter__(self):
+        for item in ['_username', '_password',
+                     '_url', '_notes']:
+            yield getattr(self, item)
+        yield self._tags
+
+    @property
+    def password(self):
+        """Get the current password."""
+        enc = CryptoEngine.get()
+        return enc.decrypt(self._password).strip()
+
+    @property
+    def username(self):
+        """Get the current username."""
+        enc = CryptoEngine.get()
+        return enc.decrypt(self._username).strip()
+
+    @username.setter
+    def username(self, value):
+        """Set the username."""
+        enc = CryptoEngine.get()
+        self._username = enc.encrypt(value).strip()
+
+    @password.setter
+    def password(self, value):
+        """Set the Notes."""
+        enc = CryptoEngine.get()
+        self._password = enc.encrypt(value).strip()
+
+    @property
+    def tags(self):
+        enc = CryptoEngine.get()
+        try:
+            return [enc.decrypt(tag) for tag in filter(None, self._tags)]
+        except Exception:
+            return [tag for tag in filter(None, self._tags)]
+
+    @tags.setter
+    def tags(self, value):
+        self._tags = [tag for tag in value]
+
+    @property
+    def url(self):
+        """Get the current url."""
+        enc = CryptoEngine.get()
+        return enc.decrypt(self._url).strip()
+
+    @url.setter
+    def url(self, value):
+        """Set the Notes."""
+        enc = CryptoEngine.get()
+        self._url = enc.encrypt(value).strip()
+
+    @property
+    def notes(self):
+        """Get the current notes."""
+        enc = CryptoEngine.get()
+        return enc.decrypt(self._notes).strip()
+
+    @notes.setter
+    def notes(self, value):
+        """Set the Notes."""
+        enc = CryptoEngine.get()
+        self._notes = enc.encrypt(value).strip()

+ 0 - 1
pwman/tests/test_crypto_engine.py

@@ -1,5 +1,4 @@
 import unittest
-import pwman.util.config as config
 from pwman.util.callback import Callback
 import os
 from pwman.util.crypto_engine import (CryptoEngine, CryptoException)

+ 2 - 0
pwman/tests/test_pwman.py

@@ -29,6 +29,7 @@ from .db_tests import (DBTests, SetupTester, CLITests,
 #from .crypto_tests import CryptoTest
 from .test_crypto_engine import CryptoEngineTest
 from .test_config import TestConfig
+from .test_sqlite import TestSQLite
 
 if 'win' not in sys.platform:
     from .test_complete_ui import (Ferrum, NEW_DB_PATH)
@@ -55,6 +56,7 @@ def suite():
     #suite.addTest(loader.loadTestsFromTestCase(TestDBFalseConfig))
     suite.addTest(loader.loadTestsFromTestCase(CryptoEngineTest))
     suite.addTest(loader.loadTestsFromTestCase(TestConfig))
+    suite.addTest(loader.loadTestsFromTestCase(TestSQLite))
     #if 'win' not in sys.platform:
     #    suite.addTest(loader.loadTestsFromTestCase(Ferrum))
     return suite

+ 115 - 0
pwman/tests/test_sqlite.py

@@ -0,0 +1,115 @@
+# ============================================================================
+# This file is part of Pwman3.
+#
+# Pwman3 is free software; you can redistribute iut and/or modify
+# it under the terms of the GNU General Public License, version 2
+# as published by the Free Software Foundation;
+#
+# Pwman3 is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Pwman3; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+# ============================================================================
+# Copyright (C) 2012, 2013, 2014 Oz Nahum Tiram <nahumoz@gmail.com>
+# ============================================================================
+import os
+import unittest
+from pwman.data.drivers.sqlite import SQLite
+from pwman.data.nodes import Node
+from pwman.util.crypto_engine import CryptoEngine
+
+
+class TestSQLite(unittest.TestCase):
+    def setUp(self):
+        self.db = SQLite('test.db')
+        self.db._open()
+
+    def test_1_create_tables(self):
+        self.db._create_tables()
+        self.db._con.commit()
+        # the method _open calls _create_tables
+        self.db.save_crypto_info("foo", "bar")
+        self.db._create_tables()
+
+    def test_1a_create_tables(self):
+        self.db._create_tables()
+
+    def test_2_crypto_info(self):
+        self.db._create_tables()
+        self.db.save_crypto_info("foo", "bar")
+        f = self.db.fetch_crypto_info()
+        self.assertListEqual([u'foo', u'bar'], list(f))
+
+    def test_3_add_node(self):
+        node = Node(clear_text=True,
+                    **{'username': u"alice", 'password': u"secret",
+                       'url': u"wonderland.com",
+                       'notes': u"a really great place",
+                       'tags': [u'foo', u'bar']})
+        self.db.add_node(node)
+        rv = self.db._cur.execute("select * from node")
+        # clearly this fails, while alice is not found in clear text in the
+        # database!
+        ce = CryptoEngine.get()
+        res = rv.fetchone()
+        self.assertIn(ce.encrypt(u'alice'), res[1])
+
+    def test_4_test_tags(self):
+        node = Node(clear_text=True,
+                    **{'username': u"alice", 'password': u"secret",
+                       'url': u"wonderland.com",
+                       'notes': u"a really great place",
+                       'tags': [u'foo', u'bar']})
+        ce = CryptoEngine.get()
+        self.db._get_or_create_tag(node._tags[0])
+        self.assertEqual(1, self.db._get_or_create_tag(node._tags[0]))
+        self.assertEqual(3, self.db._get_or_create_tag(ce.encrypt('baz')))
+
+    def test_5_test_lookup(self):
+        self.db._cur.execute('SELECT * FROM LOOKUP')
+        rows = self.db._cur.fetchall()
+        self.assertEqual(2, len(rows))
+
+    def test_6_listnodes(self):
+        node = Node(clear_text=True,
+                    **{'username': u"hatman", 'password': u"secret",
+                       'url': u"wonderland.com",
+                       'notes': u"a really great place",
+                       'tags': [u'baz', u'bar']})
+        self.db.add_node(node)
+        ids = self.db.listnodes()
+        self.assertEqual(2, len(ids))
+
+    def test_7_listnodes_w_filter(self):
+        ce = CryptoEngine.get()
+        tag = ce.encrypt(u'bar')
+        rv = self.db.listnodes(tag)
+        self.assertEqual(len(rv), 2)
+        tag = ce.encrypt(u'baz')
+        rv = self.db.listnodes(tag)
+        self.assertEqual(len(rv), 1)
+
+    def test_8_getnodes(self):
+        nodes = self.db.getnodes([1, 2])
+        self.assertEqual(len(nodes), 2)
+
+    def test_9_editnode(self):
+        # delibertly insert clear text into the database
+        node = {'user': 'transparent', 'password': 'notsecret'}
+        self.db.editnode(2, **node)
+        self.db._cur.execute('SELECT USER, PASSWORD FROM NODE WHERE ID=2')
+        rv = self.db._cur.fetchone()
+        self.assertEqual(rv, (u'transparent', u'notsecret'))
+
+    def tearDown(self):
+        self.db.close()
+
+if __name__ == '__main__':
+    try:
+        unittest.main(verbosity=2)
+    except SystemExit:
+        os.remove('test.db')

+ 4 - 5
pwman/util/crypto_engine.py

@@ -82,16 +82,15 @@ def prepare_data(text, block_size):
 class CryptoEngine(object):  # pagma: no cover
     _timeoutcount = 0
     _instance = None
-    _instance_new = None
     _callback = None
 
     @classmethod
     def get(cls, dbver=None, timeout=-1):
-        if CryptoEngine._instance_new:
-            return CryptoEngine._instance_new
+        if CryptoEngine._instance:
+            return CryptoEngine._instance
 
-        CryptoEngine._instance_new = CryptoEngine(timeout)
-        return CryptoEngine._instance_new
+        CryptoEngine._instance = CryptoEngine(timeout)
+        return CryptoEngine._instance
 
     def __init__(self, salt=None, digest=None, algorithm='AES',
                  timeout=-1, reader=None):