فهرست منبع

Migrate crypto_engine to use cryptography and ferret

Thanks for smart programming earlier the refactor at this
level is quick.
Let's see how much effort other layers will take!

[ci skip]
Oz N Tiram 8 سال پیش
والد
کامیت
68f0caf71c
3فایلهای تغییر یافته به همراه30 افزوده شده و 39 حذف شده
  1. 17 26
      pwman/util/crypto_engine.py
  2. 1 1
      requirements.txt
  3. 12 12
      tests/test_crypto_engine.py

+ 17 - 26
pwman/util/crypto_engine.py

@@ -27,17 +27,10 @@ import string
 import sys
 import time
 
-try:
-    from Crypto.Cipher import AES
-    from Crypto.Protocol.KDF import PBKDF2
-except ImportError as E:
-    # PyCrypto not found, we use a compatible implementation
-    # in pure Python.
-    # This is good for Windows where software installation suck
-    # or embeded devices where compilation is a bit harder
-    from pwman.util.crypto import AES
-    from pwman.util.crypto.pypbkdf2 import PBKDF2
-
+from cryptography.fernet import Fernet
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
 
 from pwman.util.callback import Callback
 
@@ -88,23 +81,23 @@ def get_digest(password, salt):
     """
     Get a digest based on clear text password
     """
-    iterations = 5000
-    if isinstance(password, bytes):
-        password = password.decode()
-    try:
-        return PBKDF2(password, salt, dkLen=32, count=iterations)
-    except TypeError:
-        return PBKDF2(password, salt, iterations=iterations).read(32)
+    kdf = PBKDF2HMAC(
+        algorithm=hashes.SHA256(),
+        length=32,
+        salt=salt,
+        iterations=5000,
+        backend=default_backend()
+    )
+    key = base64.urlsafe_b64encode(kdf.derive(password))
+    return key
 
 
 def get_cipher(password, salt):
     """
     Create a chiper object from a hashed password
     """
-    iv = os.urandom(AES.block_size)
     dig = get_digest(password, salt)
-    chiper = AES.new(dig, AES.MODE_ECB, iv)
-    return chiper
+    return Fernet(dig)
 
 
 def prepare_data(text, block_size):
@@ -164,8 +157,7 @@ class CryptoEngine(object):  # pagma: no cover
         salt = self._salt
         tries = 0
         while tries < 5:
-            password = self._getsecret("Please type in your master password"
-                                       ).encode('utf-8')
+            password = self._getsecret("Please type in your master password")
             if self.authenticate(password):
                 return password, salt
 
@@ -180,7 +172,7 @@ class CryptoEngine(object):  # pagma: no cover
             self._cipher = cipher
             del(p)
 
-        return encode_AES(self._cipher, prepare_data(text, AES.block_size))
+        return encode_AES(self._cipher, text)
 
     def decrypt(self, cipher_text):
         if not self._is_authenticated():
@@ -189,8 +181,7 @@ class CryptoEngine(object):  # pagma: no cover
             self._cipher = cipher
             del(p)
 
-        return decode_AES(self._cipher, prepare_data(cipher_text,
-                                                     AES.block_size))
+        return decode_AES(self._cipher, cipher_text)
 
     def forget(self):
         """

+ 1 - 1
requirements.txt

@@ -1,2 +1,2 @@
-pycrypto>=2.6
+cryptography
 colorama>=0.2.4

+ 12 - 12
tests/test_crypto_engine.py

@@ -35,8 +35,8 @@ default_config = {'Global': {'umask': '0100', 'colors': 'yes',
                                                        "history")}
                   }
 
-give_key = lambda msg: "12345"
-give_wrong_key = lambda msg: "verywrongtkey"
+give_key = lambda msg: b"12345"
+give_wrong_key = lambda msg: b"verywrongtkey"
 
 salt = b'cUDHNMJdTRxiIDPXuT163UMvi4fd2pXz/bRg2Zm8ajE='
 digest = b'9eaec7dc1ee647338406739c54dbf9c4881c74702008eb978622811cfc46a07f'
@@ -45,13 +45,13 @@ digest = b'9eaec7dc1ee647338406739c54dbf9c4881c74702008eb978622811cfc46a07f'
 class DummyCallback(Callback):
 
     def getinput(self, question):
-        return u'12345'
+        return b'12345'
 
     def getsecret(self, question):
-        return u'12345'
+        return b'12345'
 
     def getnewsecret(self, question):
-        return u'12345'
+        return b'12345'
 
 
 class TestPassGenerator(unittest.TestCase):
@@ -104,8 +104,8 @@ class CryptoEngineTest(unittest.TestCase):
     def test5_e_authenticate(self):
         ce = CryptoEngine.get()
         ce._reader = give_key
-        self.assertFalse(ce.authenticate('verywrong'))
-        self.assertTrue(ce.authenticate('12345'))
+        self.assertFalse(ce.authenticate(b'verywrong'))
+        self.assertTrue(ce.authenticate(b'12345'))
         ce._timeout = -1
         self.assertTrue(ce._is_authenticated())
 
@@ -122,11 +122,11 @@ class CryptoEngineTest(unittest.TestCase):
         ce._reader = give_key
         if not ce._salt:
             ce._salt = salt
-        secret = ce.encrypt("topsecret")
+        secret = ce.encrypt(b"topsecret")
         decrypt = ce.decrypt(secret)
         self.assertEqual(decrypt.decode(), "topsecret")
         ce._cipher = None
-        secret = ce.encrypt("topsecret")
+        secret = ce.encrypt(b"topsecret")
         decrypt = ce.decrypt(secret)
         self.assertEqual(decrypt.decode(), "topsecret")
 
@@ -134,9 +134,9 @@ class CryptoEngineTest(unittest.TestCase):
         ce = CryptoEngine.get()
         ce._cipher = None
         ce._getsecret = give_wrong_key
-        self.assertRaises(CryptoException, ce.encrypt, "secret")
-        ce._getsecret = lambda x: u'12345'
-        secret = ce.encrypt(u"topsecret")
+        self.assertRaises(CryptoException, ce.encrypt, b"secret")
+        ce._getsecret = lambda x: b'12345'
+        secret = ce.encrypt(b"topsecret")
         decrypt = ce.decrypt(secret)
         self.assertEqual(decrypt.decode(), "topsecret")