Переглянути джерело

Improve the sqlite driver

remove orphan tags when closing the database ...
oz123 10 роки тому
батько
коміт
bad05e1ebd
2 змінених файлів з 34 додано та 15 видалено
  1. 13 4
      pwman/data/drivers/sqlite.py
  2. 21 11
      pwman/tests/test_sqlite.py

+ 13 - 4
pwman/data/drivers/sqlite.py

@@ -438,7 +438,7 @@ class SQLite(SQLiteDatabaseNewForm):
         self._con.commit()
 
     def _get_tag(self, tagcipher):
-        sql_search = "SELECT ID FROM TAG WHERE DATA LIKE (?)"
+        sql_search = "SELECT ID FROM TAG WHERE DATA = ?"
         self._cur.execute(sql_search, ([tagcipher]))
         rv = self._cur.fetchone()
         return rv
@@ -453,11 +453,9 @@ class SQLite(SQLiteDatabaseNewForm):
             return self._cur.lastrowid
 
     def _update_tag_lookup(self, nodeid, tid):
-        # clean all old tags
-        #sql_clean = "DELETE FROM LOOKUP WHERE NODEID=?"
-        #self._cur.execute(sql_clean, str(nodeid))
         sql_lookup = "INSERT INTO LOOKUP(nodeid, tagid) VALUES(?,?)"
         self._cur.execute(sql_lookup, (nodeid, tid))
+        self._con.commit()
 
     def _setnodetags(self, nodeid, tags):
         for tag in tags:
@@ -478,11 +476,22 @@ class SQLite(SQLiteDatabaseNewForm):
         if tags:
             #  update all old node entries in lookup
             #  create new entries
+            # clean all old tags
+            sql_clean = "DELETE FROM LOOKUP WHERE NODEID=?"
+            self._cur.execute(sql_clean, str(nid))
             # TODO, update tags lookup
             self._setnodetags(nid, tags)
 
         self._con.commit()
 
+    def _clean_orphands(self):
+        clean = ("delete from tag where not exists "
+                 "(select 'x' from lookup l where l.tagid = tag.id)")
+
+        self._cur.execute(clean)
+        self._con.commit()
+
     def close(self):
+        self._clean_orphands()
         self._cur.close()
         self._con.close()

+ 21 - 11
pwman/tests/test_sqlite.py

@@ -74,8 +74,8 @@ class TestSQLite(unittest.TestCase):
         self.assertEqual(3, rv)
 
     def test_5_test_lookup(self):
-        self.db._cur.execute('SELECT * FROM LOOKUP')
-        rows = self.db._cur.fetchall()[0]
+        self.db._cur.execute('SELECT nodeid, tagid FROM LOOKUP')
+        rows = self.db._cur.fetchall()
         self.assertEqual(2, len(rows))
 
     def test_6_listnodes(self):
@@ -98,16 +98,8 @@ class TestSQLite(unittest.TestCase):
         rv = self.db.listnodes(tag)
         self.assertEqual(len(rv), 2)
         tag = ce.encrypt(u'baz')
-
-        node_from_db = self.db.getnodes([2])
         # the tag 'baz' is found in a node created in
         # test_6_listnodes
-        node = Node(clear_text=True,
-                    **{'username': u"hatman", 'password': u"secret",
-                       'url': u"wonderland.com",
-                       'notes': u"a really great place",
-                       'tags': [u'baz', u'bar']})
-
         rv = self.db.listnodes(tag)
         self.assertEqual(len(rv), 1)
 
@@ -123,6 +115,25 @@ class TestSQLite(unittest.TestCase):
         self.db._cur.execute('SELECT USER, PASSWORD FROM NODE WHERE ID=2')
         rv = self.db._cur.fetchone()
         self.assertEqual(rv, (u'transparent', u'notsecret'))
+        node = {'user': 'modify', 'password': 'notsecret',
+                'tags': ['foo', 'auto']}
+        # now the tags bank and baz are orphand ...
+        # what happens? it should be completely removed.
+        # To spare IO we only delete orphand tags when
+        # db.close is called.
+        self.db.editnode('2', **node)
+
+    def test_9_test_orphands(self):
+        ce = CryptoEngine.get()
+        baz_encrypted = ce.encrypt(u'baz').decode()
+
+        self.db._cur.execute('SELECT DATA FROM TAG')
+        rv = self.db._cur.fetchall()
+        for data in rv:
+            if isinstance(data[0], str):
+                self.assertNotIn(u'bank', data[0])
+            else:
+                self.assertNotIn(baz_encrypted, data[0].decode())
 
     def tearDown(self):
         self.db.close()
@@ -136,5 +147,4 @@ if __name__ == '__main__':
     try:
         unittest.main(verbosity=2, failfast=True)
     except SystemExit:
-        raw_input()
         os.remove('test.db')