我正在开发一个玩具模块来加密临时文件。这个想法是公开一个类似于tempfile
模块中的接口,但使用会话密钥对数据进行透明加密。同样,这只是一个玩具项目,而不是生产代码。
首先,一些技术细节。该项目实施pycrypto
并正在使用:
- AES-CTR
- 具有 1 位随机前缀的计数器
基本类的结构是为了模仿文件对象接口。当一个方法如read
or被调用时,我们使用类的属性write
初始化一个Crypto.Cipher.AES
对象。_cipher
该属性实现如下:
def _cipher():
doc = doc = "Returns a stateful AES object ready to decrypt at the required \
stream position"
def fget(self):
ctr = Counter.new(nbits=64,
initial_value=self.tell() # alias of self._file.tell
prefix=self._nonce) # set by `Random.new().read(8)
return AES.new(self._key, counter=ctr, mode=self._opmode) # self._opmode = Crypto.Cipher.AES.MODE_CTR
def fset(self, value):
raise AttributeError('Cannot set _cipher')
def fdel(self, value):
raise AttributeError('Cannot delete _cipher')
return locals()
_cipher = property(**_cipher())
这是一个_cipher
在调用方法期间如何使用属性进行透明加密的示例write
。
def write(self, data):
if not isinstance(data, str):
raise TypeError('Data must be str (or bytestring)')
self._file.write(self._cipher.encrypt(data))
解密时,我们应用相反的交易:
def read(self, size=-1):
return self._cipher.decrypt(self._file.read(size))
这在对 进行单个调用时有效write
,但在多个调用write
被链接时失败。例如:
ep = EphemeralFile() # the class in question
ep.write('Now is the winter of our discontent')
ep.seek(0)
print ep.read()
>> Now is the winter of our discontent
到目前为止一切都很好......但这就是失败的地方
ep.write(' made glorious summer by this sun of York')
ep.seek(0)
print ep.read()
>> Now is the winter of our discontent"d_"��U�L~ �w���S��h��]"U(��P^��9k
我究竟做错了什么?self._file.tell()
属性中的使用不应该_cipher
为解密产生适当的计数器位置吗?
请注意,我首先怀疑我可能会偏离一个计数器刻度,因此我尝试将initial_value=self.tell()
行修改为initial_value=self.tell() + 1
(也尝试使用-1
),但无济于事。
为方便起见,这里是完整的类定义。它相当短,可能会产生一些见解。
import tempfile
from Crypto.Cipher import AES
from Crypto import Random
from Crypto.Util import Counter
PRNG = Random.new()
class EphemeralFile(object):
def __init__(self, mode='w+b', bufsize=-1, suffix='', prefix='', dir=None,
key_size=32):
self._key = PRNG.read(key_size)
self._nonce = PRNG.read(8)
self._opmode = AES.MODE_CTR
self._file = tempfile.TemporaryFile(mode=mode, bufsize=bufsize,
suffix=suffix, prefix=prefix, dir=dir)
# alias tempfile methods and parameters
self.close = self._file.close
self.closed = self._file.closed
self.encoding = self._file.encoding
self.errors = self._file.errors
self.fileno = self._file.fileno
self.flush = self._file.flush
self.isatty = self._file.isatty
self.mode = self._file.mode
self.name = self._file.name
self.softspace = self._file.softspace
self.truncate = self._file.truncate
self.seek = self._file.seek
self.tell = self._file.tell
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self._file.close()
def __iter__(self):
return (line for line in self.readlines())
def _cipher():
doc = "Returns a stateful AES object ready to decrypt at the required \
stream position"
def fget(self):
ctr = Counter.new(nbits=64,
initial_value=self.tell(),
prefix=self._nonce)
return AES.new(self._key, counter=ctr, mode=self._opmode)
def fset(self, value):
raise AttributeError('Cannot set EphemeralFile._cipher')
def fdel(self):
raise AttributeError('Cannot delete EphemeralFile._cipher')
return locals()
_cipher = property(**_cipher())
def write(self, data):
if not isinstance(data, str):
raise TypeError('Data must be str (or bytestring)')
self._file.write(self._cipher.encrypt(data))
def writelines(self, lines):
self.write("\n".join(lines))
def read(self, size=-1):
return self._cipher.decrypt(self._file.read(size))
def readline(self, size=-1):
fptr = self.tell()
bytes = []
got_line = False
while not got_line:
bytes.append(self.read(1))
if not bytes[-1] or ('\n' in bytes[-1]):
bytes[-1] = bytes[-1][0:bytes[-1].find('\n') + 1]
got_line = True
plaintext = ''.join(bytes)
self.seek(fptr + len(plaintext)) # rewind
return plaintext
def readlines(self, size=-1):
return [line for line in self]
def read_ciphertext(self, size=-1):
"""Read ciphertext without decrypting.
size : int (default -1)
Number of bytes to read. Negative values read the entire stream
return : str
Ciphertext
"""
return self._file.read(size)
def next(self):
return self.readline()
在这一点上,我真的不知道问题出在哪里,所以请随时向我提出问题并提出可能的解决方案。
提前谢谢了!