|
@@ -0,0 +1,106 @@
|
|
|
+
|
|
|
+import unittest
|
|
|
+import sys
|
|
|
+from .test_crypto_engine import give_key, DummyCallback
|
|
|
+if sys.version_info.major > 2: # pragma: no cover
|
|
|
+ from urllib.parse import urlparse
|
|
|
+else: # pragma: no cover
|
|
|
+ from urlparse import urlparse
|
|
|
+
|
|
|
+import pymongo
|
|
|
+from pwman.data.drivers.mongodb import MongoDB
|
|
|
+from pwman.util.crypto_engine import CryptoEngine
|
|
|
+
|
|
|
+
|
|
|
+class TestMongoDB(unittest.TestCase):
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def setUpClass(cls):
|
|
|
+ u = "mongodb://pwman:123456@localhost:27017/pwmantest"
|
|
|
+ u = urlparse(u)
|
|
|
+ cls.db = MongoDB(u)
|
|
|
+ cls.db._open()
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def tearDownClass(cls):
|
|
|
+ # Drop collections here
|
|
|
+ pass
|
|
|
+
|
|
|
+ def test_1_con(self):
|
|
|
+ self.assertFalse(True)
|
|
|
+
|
|
|
+ def test_2_create_collections(self):
|
|
|
+ pass
|
|
|
+
|
|
|
+ def test_3_load_key(self):
|
|
|
+ self.db.savekey('SECRET$6$KEY')
|
|
|
+ secretkey = self.db.loadkey()
|
|
|
+ self.assertEqual(secretkey, 'SECRET$6$KEY')
|
|
|
+
|
|
|
+ def test_4_save_crypto(self):
|
|
|
+ self.db.save_crypto_info("TOP", "SECRET")
|
|
|
+ secretkey = self.db.loadkey()
|
|
|
+ self.assertEqual(secretkey, 'TOP$6$SECRET')
|
|
|
+ row = self.db.fetch_crypto_info()
|
|
|
+ self.assertEqual(row, ('TOP', 'SECRET'))
|
|
|
+
|
|
|
+ def test_5_add_node(self):
|
|
|
+ innode = ["TBONE", "S3K43T", "example.org", "some note",
|
|
|
+ ["bartag", "footag"]]
|
|
|
+ self.db.add_node(innode)
|
|
|
+
|
|
|
+ outnode = self.db.getnodes([1])[0]
|
|
|
+ self.assertEqual(innode[:-1] + [t for t in innode[-1]], outnode[1:])
|
|
|
+
|
|
|
+ def test_6_list_nodes(self):
|
|
|
+ ret = self.db.listnodes()
|
|
|
+ self.assertEqual(ret, [1])
|
|
|
+ ret = self.db.listnodes("footag")
|
|
|
+ self.assertEqual(ret, [1])
|
|
|
+
|
|
|
+ def test_6a_list_tags(self):
|
|
|
+ ret = self.db.listtags()
|
|
|
+ self.assertListEqual(ret, ['bartag', 'footag'])
|
|
|
+
|
|
|
+ def test_6b_get_nodes(self):
|
|
|
+ ret = self.db.getnodes([1])
|
|
|
+ retb = self.db.getnodes([])
|
|
|
+ self.assertListEqual(ret, retb)
|
|
|
+
|
|
|
+ def test_7_get_or_create_tag(self):
|
|
|
+ s = self.db._get_or_create_tag("SECRET")
|
|
|
+ s1 = self.db._get_or_create_tag("SECRET")
|
|
|
+
|
|
|
+ self.assertEqual(s, s1)
|
|
|
+
|
|
|
+ def test_7a_clean_orphans(self):
|
|
|
+
|
|
|
+ self.db._clean_orphans()
|
|
|
+ rv = self.db._get_tag("SECRET")
|
|
|
+ self.assertIsNone(rv)
|
|
|
+
|
|
|
+ def test_8_remove_node(self):
|
|
|
+ self.db.removenodes([1])
|
|
|
+ n = self.db.listnodes()
|
|
|
+ self.assertEqual(len(n), 0)
|
|
|
+
|
|
|
+ def test_9_check_db_version(self):
|
|
|
+
|
|
|
+ dburi = "mysql://pwman:123456@localhost:3306/pwmantest"
|
|
|
+ v = self.db.check_db_version(urlparse(dburi))
|
|
|
+ self.assertEqual(v, '0.6')
|
|
|
+ self.db._cur.execute("DROP TABLE DBVERSION")
|
|
|
+ self.db._con.commit()
|
|
|
+ v = self.db.check_db_version(urlparse(dburi))
|
|
|
+ self.assertEqual(v, None)
|
|
|
+ self.db._cur.execute("CREATE TABLE DBVERSION("
|
|
|
+ "VERSION TEXT NOT NULL) ")
|
|
|
+ self.db._con.commit()
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+
|
|
|
+ ce = CryptoEngine.get()
|
|
|
+ ce.callback = DummyCallback()
|
|
|
+ ce.changepassword(reader=give_key)
|
|
|
+ unittest.main(verbosity=2, failfast=True)
|