Quellcode durchsuchen

Merge branch 'new_sqlite'

oz123 vor 10 Jahren
Ursprung
Commit
9db9bca4be

+ 8 - 5
Makefile

@@ -1,4 +1,4 @@
-.PHONY: clean-pyc clean-build docs clean test
+.PHONY: clean-pyc clean-build docs clean test coverage coverage-run
 
 help:
 	@echo "clean-build - remove build artifacts"
@@ -13,6 +13,7 @@ help:
 
 clean: clean-build clean-pyc
 	rm -fr htmlcov/
+	rm -f test.db
 
 clean-build:
 	rm -fr build/
@@ -27,18 +28,20 @@ clean-pyc:
 lint:
 	flake8 pwman scripts
 
-test: install
+test: install clean 
 	git checkout pwman/tests/pwman.v0.0.8.db
 	python setup.py test
 
 test-all:
 	tox
 
-coverage:
+coverage-run:
 	coverage run --source pwman setup.py test
 	coverage report -m
-	coverage html
-	#xdg-open htmlcov/index.html
+	@coverage html
+
+coverage: coverage-run
+	@rm test.db
 
 docs:
 	#rm -f docs/manutils.rst

+ 33 - 0
documentation/general_notes.txt

@@ -91,5 +91,38 @@ if [ ! -z "$VAR" ]; then
   echo "You've left a BREAK POINT in one of your files! Aborting commit..."
   exit 1
 fi 
+
+make test || exit 1
 exit 0
 
+### testing :
+
+Each class should have at least 95% coverage. 
+Each module should be tested as a stand alone with:
+
+  foo.py 
+  test_foo.py 
+
+The test module test_foo.py should contain at the bottom:
+
+    if __name__ == '__main__':
+        
+        # prepare everything for testing 
+        try:
+            unittest.main(verbosity=2)
+        except SystemExit:
+            # clean everything after test was run 
+
+You should be able to run the module testing with:
+
+    $ python -m pwman.tests.test_foo
+
+But you should also run the tests as part of a larger scheme with:
+
+
+    from test_foo import TestFoo 
+    ...
+    ...
+    loader = unittest.TestLoader()
+    suite = unittest.TestSuite()
+    suite.addTest(loader.loadTestsFromTestCase(DBTests))

+ 1 - 0
pwman/__init__.py

@@ -88,6 +88,7 @@ def parser_options(formatter_class=argparse.HelpFormatter):
                         default=os.path.expanduser("~/.pwman/config"),
                         help='cofiguration file to read')
     parser.add_argument('-d', '--database', dest='dbase')
+    parser.add_argument('-i', '--import', dest='import_file', type=file)
     return parser
 
 

+ 176 - 6
pwman/data/drivers/sqlite.py

@@ -1,4 +1,4 @@
-#============================================================================
+# ============================================================================
 # This file is part of Pwman3.
 #
 # Pwman3 is free software; you can redistribute iut and/or modify
@@ -13,12 +13,11 @@
 # You should have received a copy of the GNU General Public License
 # along with Pwman3; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
-#============================================================================
-# Copyright (C) 2012 Oz Nahum <nahumoz@gmail.com>
-#============================================================================
-#============================================================================
+# ============================================================================
+# Copyright (C) 2012, 2013, 2014 Oz Nahum Tiram <nahumoz@gmail.com>
+# ============================================================================
 # Copyright (C) 2006 Ivan Kelly <ivan@ivankelly.net>
-#============================================================================
+# ============================================================================
 
 """SQLite Database implementation."""
 from pwman.data.database import Database, DatabaseException
@@ -354,3 +353,174 @@ class SQLiteDatabaseNewForm(Database):
             return None
         else:
             return keyrow[0]
+
+
+class SQLite(SQLiteDatabaseNewForm):
+
+    def __init__(self, filename, dbformat=0.6):
+        """Initialise SQLitePwmanDatabase instance."""
+        self._filename = filename
+        self.dbformat = dbformat
+
+    def _open(self):
+        self._con = sqlite.connect(self._filename)
+        self._cur = self._con.cursor()
+
+    def listnodes(self, filter=None):
+        if not filter:
+            sql_all = "SELECT ID FROM NODE"
+            self._cur.execute(sql_all)
+            ids = self._cur.fetchall()
+            return ids
+        else:
+            tagid = self._get_tag(filter)
+            sql_filter = "SELECT NODEID FROM LOOKUP WHERE TAGID = ? "
+            self._cur.execute(sql_filter, (tagid))
+            ids = self._cur.fetchall()
+            return ids
+
+    def listtags(self):
+        self._clean_orphands()
+        get_tags = "select data from tag"
+        self._cur.execute(get_tags)
+        tags = self._cur.fetchall()
+        if tags:
+            return tags
+        return []
+
+    def _create_tables(self):
+        self._cur.execute("PRAGMA TABLE_INFO(NODE)")
+        if self._cur.fetchone() is not None:
+            return
+
+        self._cur.execute("CREATE TABLE NODE (ID INTEGER PRIMARY KEY "
+                          "AUTOINCREMENT, "
+                          "USER TEXT NOT NULL, "
+                          "PASSWORD TEXT NOT NULL, "
+                          "URL TEXT NOT NULL,"
+                          "NOTES TEXT NOT NULL)")
+
+        self._cur.execute("CREATE TABLE TAG"
+                          "(ID INTEGER PRIMARY KEY AUTOINCREMENT,"
+                          "DATA BLOB NOT NULL UNIQUE)")
+
+        self._cur.execute("CREATE TABLE LOOKUP ("
+                          "nodeid INTEGER NOT NULL, "
+                          "tagid INTEGER NOT NULL, "
+                          "FOREIGN KEY(nodeid) REFERENCES NODE(ID),"
+                          "FOREIGN KEY(tagid) REFERENCES TAG(ID))")
+
+        self._cur.execute("CREATE TABLE CRYPTO"
+                          "(SEED TEXT,"
+                          " DIGEST TEXT)")
+
+        # create a table to hold DB version info
+        self._cur.execute("CREATE TABLE DBVERSION"
+                          "(DB VERSION TEXT NOT NULL DEFAULT '%s')" %
+                          self.dbformat)
+        self._cur.execute("INSERT INTO DBVERSION VALUES('%s')" %
+                          self.dbformat)
+        try:
+            self._con.commit()
+        except DatabaseException as e:  # pragma: no cover
+            self._con.rollback()
+            raise e
+
+    def fetch_crypto_info(self):
+        self._cur.execute("SELECT * FROM CRYPTO")
+        keyrow = self._cur.fetchone()
+        return keyrow
+
+    def save_crypto_info(self, seed, digest):
+        """save the random seed and the digested key"""
+        self._cur.execute("DELETE  FROM CRYPTO")
+        self._cur.execute("INSERT INTO CRYPTO VALUES(?, ?)", [seed, digest])
+        self._con.commit()
+
+    def add_node(self, node):
+        sql = ("INSERT INTO NODE(USER, PASSWORD, URL, NOTES)"
+               "VALUES(?, ?, ?, ?)")
+        node_tags = list(node)
+        node, tags = node_tags[:4], node_tags[-1]
+        self._cur.execute(sql, (node))
+        self._setnodetags(self._cur.lastrowid, tags)
+        self._con.commit()
+
+    def _get_tag(self, tagcipher):
+        sql_search = "SELECT ID FROM TAG WHERE DATA = ?"
+        self._cur.execute(sql_search, ([tagcipher]))
+        rv = self._cur.fetchone()
+        return rv
+
+    def _get_or_create_tag(self, tagcipher):
+        rv = self._get_tag(tagcipher)
+        if rv:
+            return rv[0]
+        else:
+            sql_insert = "INSERT INTO TAG(DATA) VALUES(?)"
+            self._cur.execute(sql_insert, ([tagcipher]))
+            return self._cur.lastrowid
+
+    def _update_tag_lookup(self, nodeid, tid):
+        sql_lookup = "INSERT INTO LOOKUP(nodeid, tagid) VALUES(?,?)"
+        self._cur.execute(sql_lookup, (nodeid, tid))
+        self._con.commit()
+
+    def _setnodetags(self, nodeid, tags):
+        for tag in tags:
+            tid = self._get_or_create_tag(tag)
+            self._update_tag_lookup(nodeid, tid)
+
+    def getnodes(self, ids):
+        sql = "SELECT * FROM NODE WHERE ID IN (%s)" % ','.join('?'*len(ids))
+        self._cur.execute(sql, (ids))
+        nodes = self._cur.fetchall()
+        return nodes
+
+    def editnode(self, nid, **kwargs):
+        tags = kwargs.pop('tags', None)
+        sql = ("UPDATE NODE SET %s WHERE ID = ? "
+               "" % ','.join('%s=?' % k for k in list(kwargs)))
+        self._cur.execute(sql, (list(kwargs.values()) + [nid]))
+        if tags:
+            # update all old node entries in lookup
+            # create new entries
+            # clean all old tags
+            sql_clean = "DELETE FROM LOOKUP WHERE NODEID=?"
+            self._cur.execute(sql_clean, str(nid))
+            self._setnodetags(nid, tags)
+
+        self._con.commit()
+
+    def removenodes(self, nids):
+        sql_rm = "delete from node where id in (%s)" % ','.join('?'*len(nids))
+        self._cur.execute(sql_rm, (nids))
+
+    def _clean_orphands(self):
+        clean = ("delete from tag where not exists "
+                 "(select 'x' from lookup l where l.tagid = tag.id)")
+
+        self._cur.execute(clean)
+        self._con.commit()
+
+    def savekey(self, key):
+        salt, digest = key.split('$6$')
+        sql = "INSERT INTO CRYPTO(SEED, DIGEST) VALUES(?,?)"
+        self._cur.execute("DELETE FROM CRYPTO")
+        self._cur.execute(sql, (salt, digest))
+        self._digest = digest.encode('utf-8')
+        self._salt = salt.encode('utf-8')
+
+    def loadkey(self):
+        # TODO: rename this method!
+        """
+        return _keycrypted
+        """
+        sql = "SELECT * FROM CRYPTO"
+        seed, digest = self._cur.execute(sql).fetchone()
+        return seed + u'$6$' + digest
+
+    def close(self):
+        self._clean_orphands()
+        self._cur.close()
+        self._con.close()

+ 106 - 7
pwman/data/nodes.py

@@ -1,5 +1,5 @@
-#============================================================================
-# This file is part of Pwman3.
+# ============================================================================
+# :This file is part of Pwman3.
 #
 # Pwman3 is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License, version 2
@@ -13,12 +13,11 @@
 # You should have received a copy of the GNU General Public License
 # along with Pwman3; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
-#============================================================================
-# Copyright (C) 2012 Oz Nahum <nahumoz@gmail.com>
-#============================================================================
-#============================================================================
+# ============================================================================
+# Copyright (C) 2012-2014 Oz Nahum <nahumoz@gmail.com>
+# ============================================================================
 # Copyright (C) 2006 Ivan Kelly <ivan@ivankelly.net>
-#============================================================================
+# ============================================================================
 
 from pwman.util.crypto_engine import CryptoEngine
 
@@ -127,3 +126,103 @@ class NewNode(object):
         """Set the Notes."""
         enc = CryptoEngine.get()
         self._notes = enc.encrypt(value).strip()
+
+
+class Node(object):
+
+    def __init__(self, clear_text=True, **kwargs):
+        if clear_text:
+            enc = CryptoEngine.get()
+            self._username = enc.encrypt(kwargs.get('username')).strip()
+            self._password = enc.encrypt(kwargs.get('password')).strip()
+            self._url = enc.encrypt(kwargs.get('url')).strip()
+            self._notes = enc.encrypt(kwargs.get('notes')).strip()
+            self._tags = [enc.encrypt(t).strip() for t in kwargs.get('tags')]
+
+    @classmethod
+    def from_encrypted_entries(cls, username, password, url, notes, tags):
+        """
+        We use this alternatively, to create a node instance when reading
+        the encrypted entities from the database
+        """
+        node = Node(clear_text=False)
+        node._username = username.strip()
+        node._password = password.strip()
+        node._url = url.strip()
+        node._notes = notes.strip()
+        node._tags = [t.strip() for t in tags]
+        return node
+
+    def __repr__(self):
+        """we use this method to write node to the database"""
+        res = u''
+        tags = self.__dict__.pop('_tags', None)
+        for item in self.__dict__:
+            res += self.__dict__[item]
+        return res
+
+    def __iter__(self):
+        for item in ['_username', '_password',
+                     '_url', '_notes']:
+            yield getattr(self, item)
+        yield self._tags
+
+    @property
+    def password(self):
+        """Get the current password."""
+        enc = CryptoEngine.get()
+        return enc.decrypt(self._password).strip()
+
+    @property
+    def username(self):
+        """Get the current username."""
+        enc = CryptoEngine.get()
+        return enc.decrypt(self._username).strip()
+
+    @username.setter
+    def username(self, value):
+        """Set the username."""
+        enc = CryptoEngine.get()
+        self._username = enc.encrypt(value).strip()
+
+    @password.setter
+    def password(self, value):
+        """Set the Notes."""
+        enc = CryptoEngine.get()
+        self._password = enc.encrypt(value).strip()
+
+    @property
+    def tags(self):
+        enc = CryptoEngine.get()
+        try:
+            return [enc.decrypt(tag) for tag in filter(None, self._tags)]
+        except Exception:
+            return [tag for tag in filter(None, self._tags)]
+
+    @tags.setter
+    def tags(self, value):
+        self._tags = [tag for tag in value]
+
+    @property
+    def url(self):
+        """Get the current url."""
+        enc = CryptoEngine.get()
+        return enc.decrypt(self._url).strip()
+
+    @url.setter
+    def url(self, value):
+        """Set the Notes."""
+        enc = CryptoEngine.get()
+        self._url = enc.encrypt(value).strip()
+
+    @property
+    def notes(self):
+        """Get the current notes."""
+        enc = CryptoEngine.get()
+        return enc.decrypt(self._notes).strip()
+
+    @notes.setter
+    def notes(self, value):
+        """Set the Notes."""
+        enc = CryptoEngine.get()
+        self._notes = enc.encrypt(value).strip()

+ 39 - 9
pwman/exchange/importer.py

@@ -1,4 +1,4 @@
-#============================================================================
+# ============================================================================
 # This file is part of Pwman3.
 #
 # Pwman3 is free software; you can redistribute it and/or modify
@@ -13,13 +13,43 @@
 # You should have received a copy of the GNU General Public License
 # along with Pwman3; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
-#============================================================================
-# Copyright (C) 2012 Oz Nahum <nahumoz@gmail.com>
-#============================================================================
-#============================================================================
-# Copyright (C) 2006 Ivan Kelly <ivan@ivankelly.net>
-#============================================================================
-
+# ============================================================================
+# Copyright (C) 2014 Oz Nahum Tiram <nahumoz@gmail.com>
+# ============================================================================
 '''
-Not implemented yet
+A module to hold the importer class
 '''
+
+
+class BaseImporter(object):
+
+    """
+    The base class which defines the action needed to import data
+    to pwman database
+    """
+
+    def __init__(self):
+        pass
+
+
+class CSVImporter(BaseImporter):
+
+    """
+    A reference implementation which imports a CSV to the pwman database
+    """
+    def __init__(self):
+        pass
+
+
+class Importer(object):
+
+    """
+    The actual job runner which by default runs a csv importer instance.
+    This could be changes by calling other instance which for example import
+    from KeePass XML or other formats.
+    """
+    def __init__(self, invoke=CSVImporter):
+        self.runner = invoke
+
+    def run(self):
+        self.runner()

+ 10 - 152
pwman/tests/db_tests.py

@@ -21,14 +21,12 @@ from pwman.data.nodes import NewNode
 from pwman.data.tags import TagNew
 from pwman.data import factory
 from pwman.data.drivers.sqlite import DatabaseException, SQLiteDatabaseNewForm
-from pwman.util import config
 from pwman.util.config import get_pass_conf
 from pwman.util.generator import leetlist
 from pwman.util.crypto_engine import CryptoEngine
-from pwman import default_config, set_xsel
 from pwman.ui import get_ui_platform
 from pwman.ui.tools import CMDLoop, CliMenuItem
-from pwman import (parser_options, get_conf_options, get_conf, set_umask)
+# from pwman import (parser_options, get_conf_options, get_conf, set_umask)
 from pwman.data.database import __DB_FORMAT__
 import sys
 import unittest
@@ -173,34 +171,6 @@ class DBTests(unittest.TestCase):
         self.assertEqual("test", db._filename)
 
 
-#class TestDBFalseConfig(unittest.TestCase):
-#
-#    def setUp(self):
-#        # filename = default_config['Database'].pop('filename')
-#        self.fname1 = default_config['Database'].pop('filename')
-#        self.fname = config._conf['Database'].pop('filename')
-#
-#    @unittest.skip("Legacy test. Database and file should always be given")
-#    def test_db_missing_conf_parameter(self):
-#        self.assertRaises(DatabaseException, factory.create,
-#                          'SQLite', __DB_FORMAT__)
-#
-#    def test_get_ui_platform(self):
-#        uiclass, osx = get_ui_platform('windows')
-#        self.assertFalse(osx)
-#        self.assertEqual(uiclass.__name__, PwmanCliWinNew.__name__)
-#        uiclass, osx = get_ui_platform('darwin')
-#        self.assertTrue(osx)
-#        self.assertEqual(uiclass.__name__, PwmanCliMacNew.__name__)
-#        del(uiclass)
-#        del(osx)
-#
-#    def tearDown(self):
-#        config.set_value('Database', 'filename', self.fname)
-#        default_config['Database']['filename'] = self.fname1
-#        config._conf['Database']['filename'] = self.fname
-
-
 class CLITests(unittest.TestCase):
 
     """
@@ -240,8 +210,13 @@ class CLITests(unittest.TestCase):
     def test_leet_password(self):
         password = self.tester.cli.get_password(None, leetify=True,
                                                 reader=lambda x: u'HAtman')
-        self.assertRegexpMatches(password, ("(H|h)?(A|a|4)?(T|t|\+)?(m|M|\|"
-                                            "\/\|)?(A|a|4)?(N|n|\|\\|)?"))
+        # python3 compatability
+        if sys.version_info.major < 3:
+            self.assertRegexpMatches(password, ("(H|h)?(A|a|4)?(T|t|\+)?(m|M|\|"
+                                                "\/\|)?(A|a|4)?(N|n|\|\\|)?"))
+        else:
+            self.assertRegex(password, ("(H|h)?(A|a|4)?(T|t|\+)?(m|M|\|"
+                                        "\/\|)?(A|a|4)?(N|n|\|\\|)?"))
 
     def test_get_url(self):
         url = self.tester.cli.get_url(reader=lambda: u'example.com')
@@ -373,11 +348,11 @@ class CLITests(unittest.TestCase):
 class FactoryTest(unittest.TestCase):
 
     def test_factory_check_db_ver(self):
-        self.assertEquals(factory.check_db_version('SQLite', testdb), 0.5)
+        self.assertEqual(factory.check_db_version('SQLite', testdb), 0.5)
 
     def test_factory_check_db_file(self):
         factory.create('SQLite', version='0.3', filename='baz.db')
-        self.assertEquals(factory.check_db_version('SQLite', 'baz.db'), 0.3)
+        self.assertEqual(factory.check_db_version('SQLite', 'baz.db'), 0.3)
         os.unlink('baz.db')
 
     def test_factory_create(self):
@@ -390,123 +365,6 @@ class FactoryTest(unittest.TestCase):
         self.assertRaises(DatabaseException, factory.create, 'UNKNOWN')
 
 
-#class ConfigTest(unittest.TestCase):
-#
-#    def setUp(self):
-#        "test that the right db instance was created"
-#        dbver = 0.4
-#        self.dbtype = config.get_value("Database", "type")
-#        self.db = factory.create(self.dbtype, dbver, testdb)
-#        self.tester = SetupTester(dbver, testdb)
-#        self.tester.create()
-#        self.orig_config = config._conf.copy()
-#        self.orig_config['Encryption'] = {'algorithm': 'AES'}
-#
-#    def test_config_write(self):
-#        _filename = os.path.join(os.path.dirname(__file__),
-#                                 'testing_config')
-#        config._file = _filename
-#        config.save(_filename)
-#        self.assertTrue(_filename)
-#        os.remove(_filename)
-#
-#    def test_config_write_with_none(self):
-#        _filename = os.path.join(os.path.dirname(__file__),
-#                                 'testing_config')
-#        config._file = _filename
-#        config.save()
-#        self.assertTrue(os.path.exists(_filename))
-#        os.remove(_filename)
-
-#    def test_write_no_permission(self):
-#        # this test will pass if you run as root ...
-#        # assuming you are not doing something like that
-#        self.assertRaises(config.ConfigException, config.save,
-#                          '/root/test_config')
-
-#    def test_add_default(self):
-#        config.add_defaults({'Section1': {'name': 'value'}})
-#        self.assertIn('Section1', config._defaults)
-#        config._defaults.pop('Section1')
-
-#    def test_get_conf(self):
-#        cnf = config.get_conf()
-#        cnf_keys = cnf.keys()
-#        self.assertTrue('Encryption' in cnf_keys)
-#        self.assertTrue('Readline' in cnf_keys)
-#        self.assertTrue('Global' in cnf_keys)
-#        self.assertTrue('Database' in cnf_keys)
-
-#    def test_load_conf(self):
-#        self.assertRaises(config.ConfigException, config.load, 'NoSuchFile')
-#        # Everything should be ok
-#        config.save('TestConfig.ini')
-#        config.load('TestConfig.ini')
-        # let's corrupt the file
-#        cfg = open('TestConfig.ini', 'w')
-#        cfg.write('Corruption')
-#        cfg.close()
-#        self.assertRaises(config.ConfigException, config.load,
-#                          'TestConfig.ini')
-#        os.remove('TestConfig.ini')
-
-#    def test_all_config(self):
-#        sys.argv = ['pwman3']
-#        default_config['Database'] = {'type': '',
-#                                      'filename': ''}
-#        _save_conf = config._conf.copy()
-#        config._conf = {}
-#        with open('dummy.conf', 'w') as dummy:
-#            dummy.write(dummyfile)
-#        sys.argv = ['pwman3', '-d', '', '-c', 'dummy.conf']
-#        p2 = parser_options()
-#        args = p2.parse_args()
-#        self.assertRaises(Exception, get_conf_options, args, False)
-#        config._conf = _save_conf.copy()
-#        os.unlink('dummy.conf')
-
-#    def test_set_xsel(self):
-#        set_xsel(config, False)
-
-#        set_xsel(config, True)
-#        if sys.platform == 'linux2':
-#            self.assertEqual(None, config._conf['Global']['xsel'])
-
-#    def test_get_conf_file(self):
-#        Args = namedtuple('args', 'cfile')
-#        args = Args(cfile='nosuchfile')
-        # setting the default
-        # in case the user specifies cfile as command line option
-        # and that file does not exist!
-#        foo = config._conf.copy()
-#        get_conf_file(args)
-        # args.cfile does not exist, hence the config values
-        # should be the same as in the defaults
-#        config.set_config(foo)
-
-#   def test_get_conf_options(self):
-#       Args = namedtuple('args', 'cfile, dbase, algo')
-#        args = Args(cfile='nosuchfile', dbase='dummy.db', algo='AES')
-#        self.assertRaises(Exception, get_conf_options, (args, 'False'))
-#        config._defaults['Database']['type'] = 'SQLite'
-#        # config._conf['Database']['type'] = 'SQLite'
-#        xsel, dbtype = get_conf_options(args, 'True')
-#        self.assertEqual(dbtype, 'SQLite')
-
-#    def test_set_conf(self):
-#        set_conf_f = getattr(config, 'set_conf')
-#        private_conf = getattr(config, '_conf')
-#        set_conf_f({'Config': 'OK'})
-#        self.assertDictEqual({'Config': 'OK'}, config._conf)
-#        config._conf = private_conf
-
-#    def test_umask(self):
-#        config._defaults = {'Global': {}}
-#        self.assertRaises(config.ConfigException, set_umask, config)
-
-#    def tearDown(self):
-#        config._conf = self.orig_config.copy()
-
 if __name__ == '__main__':
     # make sure we use local pwman
     sys.path.insert(0, os.getcwd())

+ 0 - 1
pwman/tests/test_crypto_engine.py

@@ -1,5 +1,4 @@
 import unittest
-import pwman.util.config as config
 from pwman.util.callback import Callback
 import os
 from pwman.util.crypto_engine import (CryptoEngine, CryptoException)

+ 5 - 2
pwman/tests/test_pwman.py

@@ -21,13 +21,15 @@
 import os
 import sys
 import unittest
-from .db_tests import (DBTests, SetupTester, CLITests, #ConfigTest,
-                       #TestDBFalseConfig,
+from .db_tests import (DBTests, SetupTester, CLITests,
+                       # ConfigTest,
+                       # TestDBFalseConfig,
                        FactoryTest)
 
 #from .crypto_tests import CryptoTest
 from .test_crypto_engine import CryptoEngineTest
 from .test_config import TestConfig
+from .test_sqlite import TestSQLite
 
 if 'win' not in sys.platform:
     from .test_complete_ui import (Ferrum, NEW_DB_PATH)
@@ -54,6 +56,7 @@ def suite():
     #suite.addTest(loader.loadTestsFromTestCase(TestDBFalseConfig))
     suite.addTest(loader.loadTestsFromTestCase(CryptoEngineTest))
     suite.addTest(loader.loadTestsFromTestCase(TestConfig))
+    suite.addTest(loader.loadTestsFromTestCase(TestSQLite))
     #if 'win' not in sys.platform:
     #    suite.addTest(loader.loadTestsFromTestCase(Ferrum))
     return suite

+ 164 - 0
pwman/tests/test_sqlite.py

@@ -0,0 +1,164 @@
+# ============================================================================
+# This file is part of Pwman3.
+#
+# Pwman3 is free software; you can redistribute iut and/or modify
+# it under the terms of the GNU General Public License, version 2
+# as published by the Free Software Foundation;
+#
+# Pwman3 is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Pwman3; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+# ============================================================================
+# Copyright (C) 2012, 2013, 2014 Oz Nahum Tiram <nahumoz@gmail.com>
+# ============================================================================
+import os
+import unittest
+from pwman.data.drivers.sqlite import SQLite
+from pwman.data.nodes import Node
+from pwman.util.crypto_engine import CryptoEngine
+from .test_crypto_engine import give_key, DummyCallback
+
+
+class TestSQLite(unittest.TestCase):
+
+    def setUp(self):
+        self.db = SQLite('test.db')
+        self.db._open()
+
+    def test_1_create_tables(self):
+        self.db._create_tables()
+        self.db._con.commit()
+        # the method _open calls _create_tables
+        self.db.save_crypto_info("foo", "bar")
+        self.db._create_tables()
+
+    def test_1a_create_tables(self):
+        self.db._create_tables()
+
+    def test_2_crypto_info(self):
+        self.db._create_tables()
+        self.db.save_crypto_info("foo", "bar")
+        f = self.db.fetch_crypto_info()
+        self.assertListEqual([u'foo', u'bar'], list(f))
+
+    def test_3_add_node(self):
+        node = Node(clear_text=True,
+                    **{'username': u"alice", 'password': u"secret",
+                       'url': u"wonderland.com",
+                       'notes': u"a really great place",
+                       'tags': [u'foo', u'bar']})
+        self.db.add_node(node)
+        rv = self.db._cur.execute("select * from node")
+        # clearly this fails, while alice is not found in clear text in the
+        # database!
+        ce = CryptoEngine.get()
+        res = rv.fetchone()
+        self.assertIn(ce.encrypt(u'alice'), res[1])
+
+    def test_4_test_tags(self):
+        node = Node(clear_text=True,
+                    **{'username': u"alice", 'password': u"secret",
+                       'url': u"wonderland.com",
+                       'notes': u"a really great place",
+                       'tags': [u'foo', u'bar']})
+        ce = CryptoEngine.get()
+        self.db._get_or_create_tag(node._tags[0])
+        self.assertEqual(1, self.db._get_or_create_tag(node._tags[0]))
+        rv = self.db._get_or_create_tag(ce.encrypt('baz'))
+        self.db._con.commit()
+        self.assertEqual(3, rv)
+
+    def test_5_test_lookup(self):
+        self.db._cur.execute('SELECT nodeid, tagid FROM LOOKUP')
+        rows = self.db._cur.fetchall()
+        self.assertEqual(2, len(rows))
+
+    def test_6_listnodes(self):
+        node = Node(clear_text=True,
+                    **{'username': u"hatman", 'password': u"secret",
+                       'url': u"wonderland.com",
+                       'notes': u"a really great place",
+                       'tags': [u'baz', u'bar']})
+        self.db.add_node(node)
+        ids = self.db.listnodes()
+        self.assertEqual(2, len(ids))
+
+    def test_7_listnodes_w_filter(self):
+        ce = CryptoEngine.get()
+        # the tag 'bar' is found in a node created in:
+        # test_3_add_node
+        # test_6_listnodes
+
+        tag = ce.encrypt(u'bar')
+        rv = self.db.listnodes(tag)
+        self.assertEqual(len(rv), 2)
+        tag = ce.encrypt(u'baz')
+        # the tag 'baz' is found in a node created in
+        # test_6_listnodes
+        rv = self.db.listnodes(tag)
+        self.assertEqual(len(rv), 1)
+
+    def test_8_getnodes(self):
+        nodes = self.db.getnodes([1, 2])
+        self.assertEqual(len(nodes), 2)
+
+    def test_9_editnode(self):
+        # delibertly insert clear text into the database
+        node = {'user': 'transparent', 'password': 'notsecret',
+                'tags': ['foo', 'bank']}
+        self.db.editnode('2', **node)
+        self.db._cur.execute('SELECT USER, PASSWORD FROM NODE WHERE ID=2')
+        rv = self.db._cur.fetchone()
+        self.assertEqual(rv, (u'transparent', u'notsecret'))
+        node = {'user': 'modify', 'password': 'notsecret',
+                'tags': ['foo', 'auto']}
+        # now the tags bank and baz are orphand ...
+        # what happens? it should be completely removed.
+        # To spare IO we only delete orphand tags when
+        # db.close is called.
+        self.db.editnode('2', **node)
+
+    def test_9_test_orphands(self):
+        ce = CryptoEngine.get()
+        baz_encrypted = ce.encrypt(u'baz').decode()
+
+        self.db._cur.execute('SELECT DATA FROM TAG')
+        rv = self.db._cur.fetchall()
+        for data in rv:
+            if isinstance(data[0], str):
+                self.assertNotIn(u'bank', data[0])
+            else:
+                self.assertNotIn(baz_encrypted, data[0].decode())
+
+    def test_a10_test_listtags(self):
+        tags = self.db.listtags()
+        self.assertEqual(4, len(tags))
+
+    def test_a11_test_rmnodes(self):
+        self.db.removenodes([1, 2])
+        rv = self.db._cur.execute("select * from node").fetchall()
+        self.assertListEqual(rv, [])
+
+    def test_a12_test_savekey(self):
+        ce = CryptoEngine.get()
+        self.db.savekey(ce.get_cryptedkey())
+        self.assertEqual(ce.get_cryptedkey(), self.db.loadkey())
+
+    def tearDown(self):
+        self.db.close()
+
+if __name__ == '__main__':
+
+    ce = CryptoEngine.get()
+    ce.callback = DummyCallback()
+    ce.changepassword(reader=give_key)
+
+    try:
+        unittest.main(verbosity=2, failfast=True)
+    except SystemExit:
+        os.remove('test.db')

+ 6 - 7
pwman/util/crypto_engine.py

@@ -82,16 +82,15 @@ def prepare_data(text, block_size):
 class CryptoEngine(object):  # pagma: no cover
     _timeoutcount = 0
     _instance = None
-    _instance_new = None
     _callback = None
 
     @classmethod
     def get(cls, dbver=None, timeout=-1):
-        if CryptoEngine._instance_new:
-            return CryptoEngine._instance_new
+        if CryptoEngine._instance:
+            return CryptoEngine._instance
 
-        CryptoEngine._instance_new = CryptoEngine(timeout)
-        return CryptoEngine._instance_new
+        CryptoEngine._instance = CryptoEngine(timeout)
+        return CryptoEngine._instance
 
     def __init__(self, salt=None, digest=None, algorithm='AES',
                  timeout=-1, reader=None):
@@ -177,7 +176,7 @@ class CryptoEngine(object):  # pagma: no cover
     def changepassword(self, reader=raw_input):
         if self._callback is None:
             raise CryptoException("No callback class has been specified")
-        #if not self._is_authenticated():
+        # if not self._is_authenticated():
         #    p, s = self._auth()
         # if you change the password of the database you have to Change
         # all the cipher texts in the databse!!!
@@ -225,4 +224,4 @@ class CryptoEngine(object):  # pagma: no cover
         """
         return _keycrypted
         """
-        return self._salt + '$6$' + self._digest
+        return self._salt.decode() + u'$6$' + self._digest.decode()

+ 10 - 4
scripts/pwman3

@@ -1,5 +1,5 @@
 #!/usr/bin/env python
-#============================================================================
+# ============================================================================
 # This file is part of Pwman3.
 #
 # Pwman3 is free software; you can redistribute it and/or modify
@@ -14,11 +14,11 @@
 # You should have received a copy of the GNU General Public License
 # along with Pwman3; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
-#============================================================================
+# ============================================================================
 # Copyright (C) 2012-2014 Oz Nahum <nahumoz@gmail.com>
-#============================================================================
+# ============================================================================
 # Copyright (C) 2006 Ivan Kelly <ivan@ivankelly.net>
-#============================================================================
+# ============================================================================
 from __future__ import print_function
 import sys
 from pwman import get_conf_options, get_db_version
@@ -37,6 +37,12 @@ def main(args):
     xselpath, dbtype, config = get_conf_options(args, OSX)
     dbver = get_db_version(config, dbtype, args)
     CryptoEngine.get(dbver)
+
+    if args.import_file:
+       importer = Importer(args, config)
+       importer.run()
+       sys.exit(0)
+
     fname = config.get_value('Database', 'filename')
     db = pwman.data.factory.create(dbtype, dbver, fname)
     cli = PwmanCliNew(db, xselpath, CLICallback, config)