test_sqlite.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. # ============================================================================
  2. # This file is part of Pwman3.
  3. #
  4. # Pwman3 is free software; you can redistribute iut and/or modify
  5. # it under the terms of the GNU General Public License, version 2
  6. # as published by the Free Software Foundation;
  7. #
  8. # Pwman3 is distributed in the hope that it will be useful,
  9. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. # GNU General Public License for more details.
  12. #
  13. # You should have received a copy of the GNU General Public License
  14. # along with Pwman3; if not, write to the Free Software
  15. # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
  16. # ============================================================================
  17. # Copyright (C) 2012, 2013, 2014 Oz Nahum Tiram <nahumoz@gmail.com>
  18. # ============================================================================
  19. import os
  20. import unittest
  21. from pwman.data.drivers.sqlite import SQLite
  22. from pwman.data.nodes import Node
  23. from pwman.util.crypto_engine import CryptoEngine
  24. from .test_crypto_engine import give_key, DummyCallback
  25. class TestSQLite(unittest.TestCase):
  26. @classmethod
  27. def tearDownClass(cls):
  28. cls.db.close()
  29. for item in ('test.db',):
  30. try:
  31. os.remove(item)
  32. except OSError:
  33. continue
  34. @classmethod
  35. def setUp(cls):
  36. cls.db = SQLite('test.db')
  37. cls.db._open()
  38. def test_1_create_tables(self):
  39. self.db._create_tables()
  40. self.db._con.commit()
  41. # the method _open calls _create_tables
  42. self.db.save_crypto_info("foo", "bar")
  43. self.db._create_tables()
  44. def test_1a_create_tables(self):
  45. self.db._create_tables()
  46. def test_2_crypto_info(self):
  47. self.db._create_tables()
  48. self.db.save_crypto_info("foo", "bar")
  49. f = self.db.fetch_crypto_info()
  50. self.assertListEqual([u'foo', u'bar'], list(f))
  51. def test_3_add_node(self):
  52. node = Node(clear_text=True,
  53. **{'username': "alice", 'password': "secret",
  54. 'url': "wonderland.com",
  55. 'notes': "a really great place",
  56. 'tags': ['foo', 'bar']})
  57. self.db.add_node(node)
  58. rv = self.db._cur.execute("select * from node")
  59. ce = CryptoEngine.get()
  60. res = rv.fetchone()
  61. self.assertEqual(ce.decrypt(res[1]), b"alice")
  62. def test_4_test_tags(self):
  63. node = Node(clear_text=True,
  64. **{'username': u"alice", 'password': u"secret",
  65. 'url': u"wonderland.com",
  66. 'notes': u"a really great place",
  67. 'tags': [u'foo', u'bar']})
  68. ce = CryptoEngine.get()
  69. self.db._get_or_create_tag(node._tags[0])
  70. self.assertEqual(1, self.db._get_or_create_tag(node._tags[0]))
  71. rv = self.db._get_or_create_tag(ce.encrypt('baz'))
  72. self.db._con.commit()
  73. self.assertEqual(3, rv)
  74. def test_5_test_lookup(self):
  75. self.db._cur.execute('SELECT nodeid, tagid FROM LOOKUP')
  76. rows = self.db._cur.fetchall()
  77. self.assertEqual(2, len(rows))
  78. def test_6_listnodes(self):
  79. node = Node(clear_text=True,
  80. **{'username': u"hatman", 'password': u"secret",
  81. 'url': u"wonderland.com",
  82. 'notes': u"a really great place",
  83. 'tags': [u'baz', u'bar']})
  84. self.db.add_node(node)
  85. ids = self.db.listnodes()
  86. self.assertEqual(2, len(ids))
  87. def test_7_listnodes_w_filter(self):
  88. ce = CryptoEngine.get()
  89. # the tag 'bar' is found in a node created in:
  90. # test_3_add_node
  91. # test_6_listnodes
  92. tag = ce.encrypt(u'bar')
  93. rv = self.db.listnodes(tag)
  94. self.assertEqual(len(rv), 2)
  95. tag = ce.encrypt(u'baz')
  96. # the tag 'baz' is found in a node created in
  97. # test_6_listnodes
  98. rv = self.db.listnodes(tag)
  99. self.assertEqual(len(rv), 1)
  100. def test_8_getnodes(self):
  101. nodes = self.db.getnodes([1, 2])
  102. self.assertEqual(len(nodes), 2)
  103. def test_9_editnode(self):
  104. # delibertly insert clear text into the database
  105. node = {'user': 'transparent', 'password': 'notsecret',
  106. 'tags': ['foo', 'bank']}
  107. self.db.editnode('2', **node)
  108. self.db._cur.execute('SELECT USER, PASSWORD FROM NODE WHERE ID=2')
  109. rv = self.db._cur.fetchone()
  110. self.assertEqual(rv, (u'transparent', u'notsecret'))
  111. node = {'user': 'modify', 'password': 'notsecret',
  112. 'tags': ['foo', 'auto']}
  113. # now the tags bank and baz are orphan ...
  114. # what happens? it should be completely removed.
  115. # To spare IO we only delete orphand tags when
  116. # db.close is called.
  117. self.db.editnode('2', **node)
  118. def test_9_test_orphans(self):
  119. self.db._clean_orphans()
  120. ce = CryptoEngine.get()
  121. baz_encrypted = ce.encrypt(u'baz').decode()
  122. self.db._cur.execute('SELECT DATA FROM TAG')
  123. rv = self.db._cur.fetchall()
  124. for data in rv:
  125. if isinstance(data[0], str):
  126. self.assertNotIn(u'bank', data[0])
  127. else:
  128. self.assertNotIn(baz_encrypted, data[0].decode())
  129. def test_a10_test_listtags(self):
  130. tags = self.db.listtags()
  131. self.assertEqual(4, len(list(tags)))
  132. def test_a11_test_rmnodes(self):
  133. for n in [1, 2]:
  134. self.db.removenodes([n])
  135. rv = self.db._cur.execute("select * from node").fetchall()
  136. self.assertListEqual(rv, [])
  137. def test_a12_test_savekey(self):
  138. ce = CryptoEngine.get()
  139. self.db.savekey(ce.get_cryptedkey())
  140. self.assertEqual(ce.get_cryptedkey(), self.db.loadkey())
  141. if __name__ == '__main__':
  142. ce = CryptoEngine.get()
  143. ce.callback = DummyCallback()
  144. ce.changepassword(reader=give_key)
  145. unittest.main(verbosity=2, failfast=True)