소스 검색

Fix more issues with python3 and cryptography

Oz N Tiram 8 년 전
부모
커밋
99391aa7a0
2개의 변경된 파일28개의 추가작업 그리고 17개의 파일을 삭제
  1. 16 7
      pwman/data/database.py
  2. 12 10
      tests/test_sqlite.py

+ 16 - 7
pwman/data/database.py

@@ -120,15 +120,23 @@ class Database(object):
             self._update_tag_lookup(nodeid, tid)
 
     def _get_tag(self, tagcipher):
-        sql_search = "SELECT ID FROM TAG WHERE DATA = {}".format(self._sub)
-        self._cur.execute(sql_search, ([tagcipher]))
-        rv = self._cur.fetchone()
-        return rv
+        sql_search = "SELECT * FROM TAG"
+        self._cur.execute(sql_search)
+        
+        ce = CryptoEngine.get()
+        tag = ce.decrypt(tagcipher)
+
+        rv = self._cur.fetchall()
+        for idx, cipher in rv:
+            if tag == ce.decrypt(cipher):
+                return idx
+
+        return
 
     def _get_or_create_tag(self, tagcipher):
         rv = self._get_tag(tagcipher)
         if rv:
-            return rv[0]
+            return rv
         else:
             self._cur.execute(self._insert_tag_sql, ([tagcipher]))
             try:
@@ -168,8 +176,9 @@ class Database(object):
             tagid = self._get_tag(filter)
             if not tagid:
                 return []  # pragma: no cover
-
-            self._cur.execute(self._list_nodes_sql, (tagid))
+            
+            # will this work for many nodes??? with the same tag?
+            self._cur.execute(self._list_nodes_sql, (tagid,))
             self._con.commit()
             ids = self._cur.fetchall()
             return [id[0] for id in ids]

+ 12 - 10
tests/test_sqlite.py

@@ -73,13 +73,13 @@ class TestSQLite(unittest.TestCase):
                     **{'username': u"alice", 'password': u"secret",
                        'url': u"wonderland.com",
                        'notes': u"a really great place",
-                       'tags': [u'foo', u'bar']})
+                       'tags': ['foo', 'bar']})
         ce = CryptoEngine.get()
         self.db._get_or_create_tag(node._tags[0])
         self.assertEqual(1, self.db._get_or_create_tag(node._tags[0]))
-        rv = self.db._get_or_create_tag(ce.encrypt('baz'))
-        self.db._con.commit()
+        rv = self.db._get_or_create_tag(ce.encrypt(b'baz'))
         self.assertEqual(3, rv)
+        self.db._con.commit()
 
     def test_5_test_lookup(self):
         self.db._cur.execute('SELECT nodeid, tagid FROM LOOKUP')
@@ -102,10 +102,10 @@ class TestSQLite(unittest.TestCase):
         # test_3_add_node
         # test_6_listnodes
 
-        tag = ce.encrypt(u'bar')
+        tag = ce.encrypt(b'bar')
         rv = self.db.listnodes(tag)
         self.assertEqual(len(rv), 2)
-        tag = ce.encrypt(u'baz')
+        tag = ce.encrypt(b'baz')
         # the tag 'baz' is found in a node created in
         # test_6_listnodes
         rv = self.db.listnodes(tag)
@@ -117,14 +117,16 @@ class TestSQLite(unittest.TestCase):
 
     def test_9_editnode(self):
         # delibertly insert clear text into the database
+        ce = CryptoEngine.get()
+        tags = [ce.encrypt("foo"), ce.encrypt("auto")]
         node = {'user': 'transparent', 'password': 'notsecret',
-                'tags': ['foo', 'bank']}
+                'tags': tags}
         self.db.editnode('2', **node)
         self.db._cur.execute('SELECT USER, PASSWORD FROM NODE WHERE ID=2')
         rv = self.db._cur.fetchone()
-        self.assertEqual(rv, (u'transparent', u'notsecret'))
+        self.assertEqual(rv, ('transparent', 'notsecret'))
         node = {'user': 'modify', 'password': 'notsecret',
-                'tags': ['foo', 'auto']}
+                'tags': tags}
         # now the tags bank and baz are orphan ...
         # what happens? it should be completely removed.
         # To spare IO we only delete orphand tags when
@@ -134,13 +136,13 @@ class TestSQLite(unittest.TestCase):
     def test_9_test_orphans(self):
         self.db._clean_orphans()
         ce = CryptoEngine.get()
-        baz_encrypted = ce.encrypt(u'baz').decode()
+        baz_encrypted = ce.encrypt(b'baz')
 
         self.db._cur.execute('SELECT DATA FROM TAG')
         rv = self.db._cur.fetchall()
         for data in rv:
             if isinstance(data[0], str):
-                self.assertNotIn(u'bank', data[0])
+                self.assertNotIn(b'bank', data[0])
             else:
                 self.assertNotIn(baz_encrypted, data[0].decode())