Browse Source

Add python2 and python3 new crypto engine

oz123 10 năm trước cách đây
mục cha
commit
3ed5188310
3 tập tin đã thay đổi với 253 bổ sung0 xóa
  1. 33 0
      pwman/tests/test_crypto_engine.py
  2. 2 0
      pwman/tests/test_pwman.py
  3. 218 0
      pwman/util/crypto_engine.py

+ 33 - 0
pwman/tests/test_crypto_engine.py

@@ -0,0 +1,33 @@
+import unittest
+import pwman.util.config as config
+import os
+from pwman.util.crypto_engine import (write_password, save_a_secret_message,
+                                      read_a_secret_message)
+
+# set cls_timout to negative number (e.g. -1) to disable
+default_config = {'Global': {'umask': '0100', 'colors': 'yes',
+                             'cls_timeout': '5'
+                             },
+                  'Database': {'type': 'SQLite',
+                               'filename': os.path.join("tests", "pwman.db")},
+                  'Encryption': {'algorithm': 'AES'},
+                  'Readline': {'history': os.path.join("tests",
+                                                       "history")}
+                  }
+
+config.set_defaults(default_config)
+
+give_key = lambda msg: "verysecretkey"
+
+
+class CryptoEngineTest(unittest.TestCase):
+
+
+    def test_write_password(self):
+        write_password(reader=give_key)
+
+    def test_save_secret(self):
+        save_a_secret_message(reader=give_key)
+
+    def test_read_secret(self):
+        read_a_secret_message(reader=give_key)

+ 2 - 0
pwman/tests/test_pwman.py

@@ -25,6 +25,7 @@ from .db_tests import (DBTests, SetupTester, CLITests, ConfigTest,
                        TestDBFalseConfig, FactoryTest)
 
 from .crypto_tests import CryptoTest
+from .test_crypto_engine import CryptoEngineTest
 
 if 'win' not in sys.platform:
     from .test_complete_ui import Ferrum, NEW_DB_PATH
@@ -49,6 +50,7 @@ def suite():
     suite.addTest(loader.loadTestsFromTestCase(ConfigTest))
     suite.addTest(loader.loadTestsFromTestCase(FactoryTest))
     suite.addTest(loader.loadTestsFromTestCase(TestDBFalseConfig))
+    suite.addTest(loader.loadTestsFromTestCase(CryptoEngineTest))
     if 'win' not in sys.platform:
         suite.addTest(loader.loadTestsFromTestCase(Ferrum))
     return suite

+ 218 - 0
pwman/util/crypto_engine.py

@@ -0,0 +1,218 @@
+# ============================================================================
+# This file is part of Pwman3.
+#
+# Pwman3 is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License, version 2
+# as published by the Free Software Foundation;
+#
+# Pwman3 is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Pwman3; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+# ============================================================================
+# Copyright (C) 2014 Oz Nahum <nahumoz@gmail.com>
+# ============================================================================
+
+from __future__ import print_function
+from Crypto.Cipher import AES
+from Crypto.Protocol.KDF import PBKDF2
+import base64
+import os
+import sys
+import binascii
+
+from pwman.util.callback import Callback
+import pwman.util.config as config
+
+if sys.version_info.major > 2:  # pragma: no cover
+    raw_input = input
+
+EncodeAES = lambda c, s: base64.b64encode(c.encrypt(s))
+DecodeAES = lambda c, e: c.decrypt(base64.b64decode(e)).rstrip()
+
+
+def get_digest(password, salt):
+    """
+    Get a digest based on clear text password
+    """
+    iterations = 5000
+    return PBKDF2(password, salt, dkLen=32, count=iterations)
+
+
+def authenticate(password, salt, digest):
+    """
+    salt and digest are stored in a file or a database
+    """
+    dig = get_digest(password, salt)
+    return binascii.hexlify(dig) == digest
+
+
+def write_password(reader=raw_input):
+    """
+    Write a secret password as a hash and the salt used for this hash
+    to a file
+    """
+    salt = base64.b64encode(os.urandom(32))
+    passwd = reader("Please type in the secret key:")
+    key = get_digest(passwd, salt)
+    f = open('passwords.txt', 'wt')
+    hpk = salt+'$6$'.encode('utf8')+binascii.hexlify(key)
+    f.write(hpk.decode('utf-8'))
+    f.close()
+
+
+def get_digest_from_file(filename):
+    """
+    Read a digested password and salt from the file
+    """
+    f = open(filename, 'rt')
+    salt, digest = f.readline().split('$6$')
+    return salt.encode('utf-8'), digest.encode('utf-8')
+
+
+def get_cipher(password, salt):
+    """
+    Create a chiper object from a hashed password
+    """
+    iv = os.urandom(AES.block_size)
+    dig = get_digest(password, salt)
+    chiper = AES.new(dig, AES.MODE_ECB, iv)
+    return chiper
+
+
+def cli_auth(reader=raw_input):
+    """
+    Read password from the user, if the password is correct,
+    finish the execution an return the password and salt which
+    are read from the file.
+    """
+    salt, digest = get_digest_from_file('passwords.txt')
+    while True:
+        password = reader("Please type in your password:").encode('utf-8')
+        if authenticate(password, salt, digest):
+            return password, salt
+
+
+def prepare_data(text, block_size):
+    """
+    prepare data before encryption so the lenght matches the expected
+    lenght by the algorithm.
+    """
+    num_blocks = len(text)//block_size + 1
+    newdatasize = block_size*num_blocks
+    return text.ljust(newdatasize)
+
+
+def save_a_secret_message(reader=raw_input):
+    """
+    PoC to show we can encrypt a message
+    """
+    secret_msg = """This is a very important message! Learn Cryptography!!!"""
+    # the secret message will be encrypted with the secret password found
+    # in the file
+    passwd, salt = cli_auth(reader=reader)
+    cipher = get_cipher(passwd, salt)
+    # explictly destroy password, so now there is no clear text reference
+    # to the input given by the user
+    del(passwd)
+    msg = EncodeAES(cipher, prepare_data(secret_msg, AES.block_size))
+    with open('secret.enc', 'wt') as s:
+        s.write(msg.decode())
+    print("The cipher message is:", msg.decode())
+
+
+def read_a_secret_message(reader=raw_input):
+    """
+    PoC to show we can decrypt a message
+    """
+    passwd, salt = cli_auth(reader)
+    cipher = get_cipher(passwd, salt)
+    del(passwd)
+    with open('secret.enc') as s:
+        msg = s.readline()
+        print("The decrypted secret message is:")
+        decoded = DecodeAES(cipher, msg)
+        print(decoded)
+
+
+class CryptoEngine(object):  # pragma: no cover
+    _timeoutcount = 0
+    _instance = None
+    _instance_new = None
+    _callback = None
+
+    @classmethod
+    def get(cls, dbver=0.5):
+        if CryptoEngine._instance:
+            return CryptoEngine._instance
+        if CryptoEngine._instance_new:
+            return CryptoEngine._instance_new
+
+        keycrypted = config.get_value("Encryption", "keycrypted")
+        algo = config.get_value("Encryption", "algorithm")
+
+        if not algo:
+            raise Exception("Parameters missing, no algorithm given")
+
+        try:
+            timeout = int(config.get_value("Encryption", "timeout"))
+        except ValueError:
+            timeout = -1
+
+        kwargs = {'keycrypted': keycrypted, 'algorithm': algo,
+                  'timeout': timeout}
+
+        if dbver >= 0.5:
+            CryptoEngine._instance_new = CryptoEngine(**kwargs)
+            return CryptoEngine._instance_new
+
+    @property
+    def callback(self):
+        """
+        return call back function
+        """
+        return self._callback
+
+    @callback.setter
+    def callback(self, callback):
+        if isinstance(callback, Callback):
+            self._callback = callback
+            self._getsecret = callback.getsecret
+            self._getnewsecret = callback.getnewsecret
+        else:
+            raise Exception("callback must be an instance of Callback!")
+
+    def _create_password(self):
+        """
+        Write a secret password as a hash and the salt used for this hash
+        to a file
+        """
+        salt = base64.b64encode(os.urandom(32))
+        passwd = raw_input("Please type in the secret key:")
+        key = get_digest(passwd, salt)
+        hpk = salt+'$6$'.encode('utf8')+binascii.hexlify(key)
+        return hpk.decode('utf-8')
+
+    def changepassword(self):
+        self._keycrypted = self._create_password()
+        return self._keycrypted
+
+    def __init__(self, keycrypted=None, algorithm='AES', timeout=-1):
+        """
+        Initialise the Cryptographic Engine
+        """
+        self._algo = algorithm
+        self._keycrypted = keycrypted if keycrypted else None
+        self._timeout = timeout
+        self._cipher = None
+
+
+if __name__ == '__main__':  # pragma: no cover
+    if '-i' in sys.argv:
+        write_password()
+    save_a_secret_message()
+    read_a_secret_message()