|
@@ -18,9 +18,16 @@
|
|
# ============================================================================
|
|
# ============================================================================
|
|
|
|
|
|
import unittest
|
|
import unittest
|
|
-from .test_crypto_engine import give_key, DummyCallback
|
|
|
|
|
|
+import sys
|
|
|
|
+if sys.version_info.major > 2: # pragma: no cover
|
|
|
|
+ from urllib.parse import urlparse
|
|
|
|
+else: # pragma: no cover
|
|
|
|
+ from urlparse import urlparse
|
|
import pymongo
|
|
import pymongo
|
|
|
|
+from .test_crypto_engine import give_key, DummyCallback
|
|
|
|
+from pwman.util.crypto_engine import CryptoEngine
|
|
from pwman.data.drivers.mongodb import MongoDB
|
|
from pwman.data.drivers.mongodb import MongoDB
|
|
|
|
+from pwman.data.nodes import Node
|
|
# use pwmantest
|
|
# use pwmantest
|
|
|
|
|
|
# db.createUser(
|
|
# db.createUser(
|
|
@@ -30,7 +37,6 @@ from pwman.data.drivers.mongodb import MongoDB
|
|
# roles: [{ role: "dbAdmin", db: "pwmantest" },
|
|
# roles: [{ role: "dbAdmin", db: "pwmantest" },
|
|
# { role: "readWrite", db: "pwmantest" },]
|
|
# { role: "readWrite", db: "pwmantest" },]
|
|
# })
|
|
# })
|
|
-from pwman.util.crypto_engine import CryptoEngine
|
|
|
|
|
|
|
|
|
|
|
|
class TestMongoDB(unittest.TestCase):
|
|
class TestMongoDB(unittest.TestCase):
|
|
@@ -38,7 +44,7 @@ class TestMongoDB(unittest.TestCase):
|
|
@classmethod
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
def setUpClass(cls):
|
|
u = u"mongodb://tester:12345678@localhost:27017/pwmantest"
|
|
u = u"mongodb://tester:12345678@localhost:27017/pwmantest"
|
|
- cls.db = MongoDB(u)
|
|
|
|
|
|
+ cls.db = MongoDB(urlparse(u))
|
|
cls.db._open()
|
|
cls.db._open()
|
|
|
|
|
|
@classmethod
|
|
@classmethod
|
|
@@ -56,7 +62,11 @@ class TestMongoDB(unittest.TestCase):
|
|
def test_2_create_collections(self):
|
|
def test_2_create_collections(self):
|
|
pass
|
|
pass
|
|
|
|
|
|
- def test_3_load_key(self):
|
|
|
|
|
|
+ def test_3a_load_key(self):
|
|
|
|
+ secretkey = self.db.loadkey()
|
|
|
|
+ self.assertIsNone(secretkey)
|
|
|
|
+
|
|
|
|
+ def test_3b_load_key(self):
|
|
self.db.savekey('SECRET$6$KEY')
|
|
self.db.savekey('SECRET$6$KEY')
|
|
secretkey = self.db.loadkey()
|
|
secretkey = self.db.loadkey()
|
|
self.assertEqual(secretkey, u'SECRET$6$KEY')
|
|
self.assertEqual(secretkey, u'SECRET$6$KEY')
|
|
@@ -72,19 +82,34 @@ class TestMongoDB(unittest.TestCase):
|
|
def test_5_add_node(self):
|
|
def test_5_add_node(self):
|
|
innode = [u"TBONE", u"S3K43T", u"example.org", u"some note",
|
|
innode = [u"TBONE", u"S3K43T", u"example.org", u"some note",
|
|
[u"bartag", u"footag"]]
|
|
[u"bartag", u"footag"]]
|
|
- self.db.add_node(innode)
|
|
|
|
|
|
+
|
|
|
|
+ kwargs = {
|
|
|
|
+ "username":innode[0], "password": innode[1],
|
|
|
|
+ "url": innode[2], "notes": innode[3], "tags": innode[4]
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ node = Node(clear_text=True, **kwargs)
|
|
|
|
+ self.db.add_node(node)
|
|
outnode = self.db.getnodes([1])[0]
|
|
outnode = self.db.getnodes([1])[0]
|
|
- self.assertEqual(innode[:-1] + [t for t in innode[-1]], outnode[1:])
|
|
|
|
|
|
+ no = outnode[1:5]
|
|
|
|
+ no.append(outnode[5:])
|
|
|
|
+ o = Node.from_encrypted_entries(*no)
|
|
|
|
+ self.assertEqual(list(node), list(o))
|
|
|
|
|
|
def test_6_list_nodes(self):
|
|
def test_6_list_nodes(self):
|
|
ret = self.db.listnodes()
|
|
ret = self.db.listnodes()
|
|
self.assertEqual(ret, [1])
|
|
self.assertEqual(ret, [1])
|
|
- ret = self.db.listnodes("footag")
|
|
|
|
|
|
+ ce = CryptoEngine.get()
|
|
|
|
+ fltr = ce.encrypt("footag")
|
|
|
|
+ ret = self.db.listnodes(fltr)
|
|
self.assertEqual(ret, [1])
|
|
self.assertEqual(ret, [1])
|
|
|
|
|
|
def test_6a_list_tags(self):
|
|
def test_6a_list_tags(self):
|
|
ret = self.db.listtags()
|
|
ret = self.db.listtags()
|
|
- self.assertListEqual(ret, ['bartag', 'footag'])
|
|
|
|
|
|
+ ce = CryptoEngine.get()
|
|
|
|
+ ec_tags = map(ce.encrypt,[u'bartag', u'footag'])
|
|
|
|
+ for t in ec_tags:
|
|
|
|
+ self.assertIn(t, ret)
|
|
|
|
|
|
def test_6b_get_nodes(self):
|
|
def test_6b_get_nodes(self):
|
|
ret = self.db.getnodes([1])
|
|
ret = self.db.getnodes([1])
|