test_sqlite.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. # ============================================================================
  2. # This file is part of Pwman3.
  3. #
  4. # Pwman3 is free software; you can redistribute iut and/or modify
  5. # it under the terms of the GNU General Public License, version 2
  6. # as published by the Free Software Foundation;
  7. #
  8. # Pwman3 is distributed in the hope that it will be useful,
  9. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. # GNU General Public License for more details.
  12. #
  13. # You should have received a copy of the GNU General Public License
  14. # along with Pwman3; if not, write to the Free Software
  15. # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
  16. # ============================================================================
  17. # Copyright (C) 2012, 2013, 2014 Oz Nahum Tiram <nahumoz@gmail.com>
  18. # ============================================================================
  19. import os
  20. import unittest
  21. from pwman.data.drivers.sqlite import SQLite
  22. from pwman.data.nodes import Node
  23. from pwman.util.crypto_engine import CryptoEngine
  24. from .test_crypto_engine import give_key, DummyCallback
  25. class TestSQLite(unittest.TestCase):
  26. @classmethod
  27. def tearDownClass(cls):
  28. cls.db.close()
  29. for item in ('test.db',):
  30. try:
  31. os.remove(item)
  32. except OSError:
  33. continue
  34. @classmethod
  35. def setUp(cls):
  36. cls.db = SQLite('test.db')
  37. cls.db._open()
  38. def test_1_create_tables(self):
  39. self.db._create_tables()
  40. self.db._con.commit()
  41. # the method _open calls _create_tables
  42. self.db.save_crypto_info("foo", "bar")
  43. self.db._create_tables()
  44. def test_1a_create_tables(self):
  45. self.db._create_tables()
  46. def test_2_crypto_info(self):
  47. self.db._create_tables()
  48. self.db.save_crypto_info("foo", "bar")
  49. f = self.db.fetch_crypto_info()
  50. self.assertListEqual([u'foo', u'bar'], list(f))
  51. def test_3_add_node(self):
  52. node = Node(clear_text=True,
  53. **{'username': u"alice", 'password': u"secret",
  54. 'url': u"wonderland.com",
  55. 'notes': u"a really great place",
  56. 'tags': [u'foo', u'bar']})
  57. self.db.add_node(node)
  58. rv = self.db._cur.execute("select * from node")
  59. # clearly this fails, while alice is not found in clear text in the
  60. # database!
  61. ce = CryptoEngine.get()
  62. res = rv.fetchone()
  63. self.assertIn(ce.encrypt(u'alice'), res[1])
  64. def test_4_test_tags(self):
  65. node = Node(clear_text=True,
  66. **{'username': u"alice", 'password': u"secret",
  67. 'url': u"wonderland.com",
  68. 'notes': u"a really great place",
  69. 'tags': [u'foo', u'bar']})
  70. ce = CryptoEngine.get()
  71. self.db._get_or_create_tag(node._tags[0])
  72. self.assertEqual(1, self.db._get_or_create_tag(node._tags[0]))
  73. rv = self.db._get_or_create_tag(ce.encrypt('baz'))
  74. self.db._con.commit()
  75. self.assertEqual(3, rv)
  76. def test_5_test_lookup(self):
  77. self.db._cur.execute('SELECT nodeid, tagid FROM LOOKUP')
  78. rows = self.db._cur.fetchall()
  79. self.assertEqual(2, len(rows))
  80. def test_6_listnodes(self):
  81. node = Node(clear_text=True,
  82. **{'username': u"hatman", 'password': u"secret",
  83. 'url': u"wonderland.com",
  84. 'notes': u"a really great place",
  85. 'tags': [u'baz', u'bar']})
  86. self.db.add_node(node)
  87. ids = self.db.listnodes()
  88. self.assertEqual(2, len(ids))
  89. def test_7_listnodes_w_filter(self):
  90. ce = CryptoEngine.get()
  91. # the tag 'bar' is found in a node created in:
  92. # test_3_add_node
  93. # test_6_listnodes
  94. tag = ce.encrypt(u'bar')
  95. rv = self.db.listnodes(tag)
  96. self.assertEqual(len(rv), 2)
  97. tag = ce.encrypt(u'baz')
  98. # the tag 'baz' is found in a node created in
  99. # test_6_listnodes
  100. rv = self.db.listnodes(tag)
  101. self.assertEqual(len(rv), 1)
  102. def test_8_getnodes(self):
  103. nodes = self.db.getnodes([1, 2])
  104. self.assertEqual(len(nodes), 2)
  105. def test_9_editnode(self):
  106. # delibertly insert clear text into the database
  107. node = {'user': 'transparent', 'password': 'notsecret',
  108. 'tags': ['foo', 'bank']}
  109. self.db.editnode('2', **node)
  110. self.db._cur.execute('SELECT USER, PASSWORD FROM NODE WHERE ID=2')
  111. rv = self.db._cur.fetchone()
  112. self.assertEqual(rv, (u'transparent', u'notsecret'))
  113. node = {'user': 'modify', 'password': 'notsecret',
  114. 'tags': ['foo', 'auto']}
  115. # now the tags bank and baz are orphand ...
  116. # what happens? it should be completely removed.
  117. # To spare IO we only delete orphand tags when
  118. # db.close is called.
  119. self.db.editnode('2', **node)
  120. def test_9_test_orphands(self):
  121. self.db._clean_orphands()
  122. ce = CryptoEngine.get()
  123. baz_encrypted = ce.encrypt(u'baz').decode()
  124. self.db._cur.execute('SELECT DATA FROM TAG')
  125. rv = self.db._cur.fetchall()
  126. for data in rv:
  127. if isinstance(data[0], str):
  128. self.assertNotIn(u'bank', data[0])
  129. else:
  130. self.assertNotIn(baz_encrypted, data[0].decode())
  131. def test_a10_test_listtags(self):
  132. tags = self.db.listtags()
  133. self.assertEqual(4, len(list(tags)))
  134. def test_a11_test_rmnodes(self):
  135. self.db.removenodes([1, 2])
  136. rv = self.db._cur.execute("select * from node").fetchall()
  137. self.assertListEqual(rv, [])
  138. def test_a12_test_savekey(self):
  139. ce = CryptoEngine.get()
  140. self.db.savekey(ce.get_cryptedkey())
  141. self.assertEqual(ce.get_cryptedkey(), self.db.loadkey())
  142. if __name__ == '__main__':
  143. ce = CryptoEngine.get()
  144. ce.callback = DummyCallback()
  145. ce.changepassword(reader=give_key)
  146. unittest.main(verbosity=2, failfast=True)