test_sqlite.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. # ============================================================================
  2. # This file is part of Pwman3.
  3. #
  4. # Pwman3 is free software; you can redistribute iut and/or modify
  5. # it under the terms of the GNU General Public License, version 2
  6. # as published by the Free Software Foundation;
  7. #
  8. # Pwman3 is distributed in the hope that it will be useful,
  9. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. # GNU General Public License for more details.
  12. #
  13. # You should have received a copy of the GNU General Public License
  14. # along with Pwman3; if not, write to the Free Software
  15. # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
  16. # ============================================================================
  17. # Copyright (C) 2012, 2013, 2014 Oz Nahum Tiram <nahumoz@gmail.com>
  18. # ============================================================================
  19. import os
  20. import unittest
  21. from pwman.data.drivers.sqlite import SQLite
  22. from pwman.data.nodes import Node
  23. from pwman.util.crypto_engine import CryptoEngine
  24. from .test_crypto_engine import give_key, DummyCallback
  25. class TestSQLite(unittest.TestCase):
  26. @classmethod
  27. def tearDownClass(cls):
  28. cls.db.close()
  29. for item in ('test.db',):
  30. try:
  31. os.remove(item)
  32. except OSError:
  33. continue
  34. @classmethod
  35. def setUp(cls):
  36. cls.db = SQLite('test.db')
  37. cls.db._open()
  38. def test_1_create_tables(self):
  39. self.db._create_tables()
  40. self.db._con.commit()
  41. # the method _open calls _create_tables
  42. self.db.save_crypto_info("foo", "bar")
  43. self.db._create_tables()
  44. def test_1a_create_tables(self):
  45. self.db._create_tables()
  46. def test_2_crypto_info(self):
  47. self.db._create_tables()
  48. self.db.save_crypto_info("foo", "bar")
  49. f = self.db.fetch_crypto_info()
  50. self.assertListEqual([u'foo', u'bar'], list(f))
  51. def test_3_add_node(self):
  52. node = Node(clear_text=True,
  53. **{'username': "alice", 'password': "secret",
  54. 'url': "wonderland.com",
  55. 'notes': "a really great place",
  56. 'tags': ['foo', 'bar']})
  57. self.db.add_node(node)
  58. rv = self.db._cur.execute("select * from node")
  59. ce = CryptoEngine.get()
  60. res = rv.fetchone()
  61. self.assertEqual(ce.decrypt(res[1]), b"alice")
  62. def test_4_test_tags(self):
  63. node = Node(clear_text=True,
  64. **{'username': u"alice", 'password': u"secret",
  65. 'url': u"wonderland.com",
  66. 'notes': u"a really great place",
  67. 'tags': ['foo', 'bar']})
  68. ce = CryptoEngine.get()
  69. self.db._get_or_create_tag(node._tags[0])
  70. self.assertEqual(1, self.db._get_or_create_tag(node._tags[0]))
  71. rv = self.db._get_or_create_tag(ce.encrypt(b'baz'))
  72. self.assertEqual(3, rv)
  73. self.db._con.commit()
  74. def test_5_test_lookup(self):
  75. self.db._cur.execute('SELECT nodeid, tagid FROM LOOKUP')
  76. rows = self.db._cur.fetchall()
  77. self.assertEqual(2, len(rows))
  78. def test_6_listnodes(self):
  79. node = Node(clear_text=True,
  80. **{'username': u"hatman", 'password': u"secret",
  81. 'url': u"wonderland.com",
  82. 'notes': u"a really great place",
  83. 'tags': [u'baz', u'bar']})
  84. self.db.add_node(node)
  85. ids = self.db.listnodes()
  86. self.assertEqual(2, len(ids))
  87. def test_7_listnodes_w_filter(self):
  88. ce = CryptoEngine.get()
  89. # the tag 'bar' is found in a node created in:
  90. # test_3_add_node
  91. # test_6_listnodes
  92. tag = ce.encrypt(b'bar')
  93. rv = self.db.listnodes(tag)
  94. self.assertEqual(len(rv), 2)
  95. tag = ce.encrypt(b'baz')
  96. # the tag 'baz' is found in a node created in
  97. # test_6_listnodes
  98. rv = self.db.listnodes(tag)
  99. self.assertEqual(len(rv), 1)
  100. def test_8_getnodes(self):
  101. nodes = self.db.getnodes([1, 2])
  102. self.assertEqual(len(nodes), 2)
  103. def test_9_editnode(self):
  104. # delibertly insert clear text into the database
  105. ce = CryptoEngine.get()
  106. tags = [ce.encrypt("foo"), ce.encrypt("auto")]
  107. node = {'user': 'transparent', 'password': 'notsecret',
  108. 'tags': tags}
  109. self.db.editnode('2', **node)
  110. self.db._cur.execute('SELECT USER, PASSWORD FROM NODE WHERE ID=2')
  111. rv = self.db._cur.fetchone()
  112. self.assertEqual(rv, ('transparent', 'notsecret'))
  113. node = {'user': 'modify', 'password': 'notsecret',
  114. 'tags': tags}
  115. # now the tags bank and baz are orphan ...
  116. # what happens? it should be completely removed.
  117. # To spare IO we only delete orphand tags when
  118. # db.close is called.
  119. self.db.editnode('2', **node)
  120. def test_9_test_no_orphans(self):
  121. self.db._clean_orphans()
  122. self.db._con.commit()
  123. ce = CryptoEngine.get()
  124. tags = None
  125. while not tags:
  126. tags = self.db._cur.execute('SELECT * FROM tag').fetchall()
  127. tags_clear = [ce.decrypt(tag[1]) for tag in tags]
  128. self.assertNotIn(b"baz", tags_clear)
  129. def test_a10_test_listtags(self):
  130. """there should be only 3 tags left"""
  131. tags = self.db.listtags()
  132. self.assertEqual(3, len(list(tags)))
  133. def test_a11_test_rmnodes(self):
  134. for n in [1, 2]:
  135. self.db.removenodes([n])
  136. rv = self.db._cur.execute("select * from node").fetchall()
  137. self.assertListEqual(rv, [])
  138. def test_a12_test_savekey(self):
  139. ce = CryptoEngine.get()
  140. self.db.savekey(ce.get_cryptedkey())
  141. self.assertEqual(ce.get_cryptedkey(), self.db.loadkey())
  142. if __name__ == '__main__':
  143. ce = CryptoEngine.get()
  144. ce.callback = DummyCallback()
  145. ce.changepassword(reader=give_key)
  146. unittest.main(verbosity=2, failfast=True)