Parcourir la source

Add encrypt and decrypt method to crypto_engine

oz123 il y a 10 ans
Parent
commit
b47ff72600
2 fichiers modifiés avec 59 ajouts et 7 suppressions
  1. 25 4
      pwman/tests/test_crypto_engine.py
  2. 34 3
      pwman/util/crypto_engine.py

+ 25 - 4
pwman/tests/test_crypto_engine.py

@@ -3,7 +3,7 @@ import pwman.util.config as config
 import os
 from pwman.util.crypto_engine import (write_password, save_a_secret_message,
                                       read_a_secret_message,
-                                      CryptoEngine)
+                                      CryptoEngine, CryptoException)
 import time
 
 # set cls_timout to negative number (e.g. -1) to disable
@@ -20,6 +20,7 @@ default_config = {'Global': {'umask': '0100', 'colors': 'yes',
 config.set_defaults(default_config)
 
 give_key = lambda msg: "verysecretkey"
+give_wrong_key = lambda msg: "verywrongtkey"
 
 
 class CryptoEngineTest(unittest.TestCase):
@@ -48,14 +49,34 @@ class CryptoEngineTest(unittest.TestCase):
         ce = CryptoEngine.get()
         self.assertFalse(ce.authenticate('verywrong'))
         self.assertTrue(ce.authenticate('verysecretkey'))
+        ce._timeout = -1
         self.assertTrue(ce._is_authenticated())
 
-    def test_is_timedout(self):
+    def test_f_encrypt_decrypt(self):
+        ce = CryptoEngine.get()
+        ce._reader = give_key
+        secret = ce.encrypt("topsecret")
+        decrypt = ce.decrypt(secret)
+        self.assertEqual(decrypt, "topsecret")
+        ce._cipher = None
+        secret = ce.encrypt("topsecret")
+        decrypt = ce.decrypt(secret)
+        self.assertEqual(decrypt, "topsecret")
+
+    def test_g_encrypt_decrypt_wrong_pass(self):
+        ce = CryptoEngine.get()
+        ce._cipher = None
+        ce._reader = give_wrong_key
+        self.assertRaises(CryptoException, ce.encrypt, "secret")
+        ce._reader = give_key
+        secret = ce.encrypt("topsecret")
+        decrypt = ce.decrypt(secret)
+        self.assertEqual(decrypt, "topsecret")
+
+    def test__hhh_is_timedout(self):
         ce = CryptoEngine.get()
         ce._timeout = 1
         time.sleep(1.1)
         self.assertTrue(ce._is_timedout())
         self.assertIsNone(ce._cipher)
         self.assertFalse(ce._is_authenticated())
-        #:self.assertFalse(ce._is_timedout())
-

+ 34 - 3
pwman/util/crypto_engine.py

@@ -35,6 +35,10 @@ EncodeAES = lambda c, s: base64.b64encode(c.encrypt(s))
 DecodeAES = lambda c, e: c.decrypt(base64.b64decode(e)).rstrip()
 
 
+class CryptoException(Exception):
+    pass
+
+
 def get_digest(password, salt):
     """
     Get a digest based on clear text password
@@ -91,11 +95,18 @@ def cli_auth(reader=raw_input):
     are read from the file.
     """
     salt, digest = get_digest_from_file('passwords.txt')
-    while True:
-        password = reader("Please type in your password:").encode('utf-8')
+    tries = 0
+    while tries < 5:
+        password = reader("Please type in your master password:"
+                          ).encode('utf-8')
         if authenticate(password, salt, digest):
             return password, salt
 
+        print("You entered a wrong password...")
+        tries += 1
+
+    raise CryptoException("You entered wrong password 5 times..")
+
 
 def prepare_data(text, block_size):
     """
@@ -172,7 +183,7 @@ class CryptoEngine(object):  # pagma: no cover
             return CryptoEngine._instance_new
 
     def __init__(self, salt=None, digest=None, algorithm='AES',
-                 timeout=-1):
+                 timeout=-1, reader=raw_input):
         """
         Initialise the Cryptographic Engine
         """
@@ -181,6 +192,7 @@ class CryptoEngine(object):  # pagma: no cover
         self._salt = salt if salt else None
         self._timeout = timeout
         self._cipher = None
+        self._reader = reader
 
     def authenticate(self, password):
         """
@@ -193,6 +205,25 @@ class CryptoEngine(object):  # pagma: no cover
             return True
         return False
 
+    def encrypt(self, text):
+        if not self._is_authenticated():
+            p, s = cli_auth(self._reader)
+            cipher = get_cipher(p, s)
+            del(p)
+            return EncodeAES(cipher, prepare_data(text, AES.block_size))
+
+        return EncodeAES(self._cipher, prepare_data(text, AES.block_size))
+
+    def decrypt(self, cipher_text):
+        if not self._is_authenticated():
+            p, s = cli_auth(self._reader)
+            cipher = get_cipher(p, s)
+            del(p)
+            return DecodeAES(cipher, prepare_data(cipher_text, AES.block_size))
+
+        return DecodeAES(self._cipher, prepare_data(cipher_text,
+                                                    AES.block_size))
+
     def _is_authenticated(self):
         if not self._is_timedout() and self._cipher is not None:
             return True