0

我正在开发一个玩具模块来加密临时文件。这个想法是公开一个类似于tempfile模块中的接口,但使用会话密钥对数据进行透明加密。同样,这只是一个玩具项目,而不是生产代码。

首先,一些技术细节。该项目实施pycrypto并正在使用:

  • AES-CTR
  • 具有 1 位随机前缀的计数器

基本类的结构是为了模仿文件对象接口。当一个方法如reador被调用时,我们使用类的属性write初始化一个Crypto.Cipher.AES对象。_cipher该属性实现如下:

def _cipher():
    doc = doc = "Returns a stateful AES object ready to decrypt at the required \
                 stream position"

    def fget(self):
        ctr = Counter.new(nbits=64,
                          initial_value=self.tell()  # alias of self._file.tell
                          prefix=self._nonce)  # set by `Random.new().read(8)
        return AES.new(self._key, counter=ctr, mode=self._opmode)  # self._opmode = Crypto.Cipher.AES.MODE_CTR

    def fset(self, value):
        raise AttributeError('Cannot set _cipher')

    def fdel(self, value):
        raise AttributeError('Cannot delete _cipher')

    return locals()
        _cipher = property(**_cipher())

这是一个_cipher在调用方法期间如何使用属性进行透明加密的示例write

def write(self, data):
    if not isinstance(data, str):
        raise TypeError('Data must be str (or bytestring)')
    self._file.write(self._cipher.encrypt(data))

解密时,我们应用相反的交易:

def read(self, size=-1):
    return self._cipher.decrypt(self._file.read(size))

这在对 进行单个调用时有效write,但在多个调用write被链接时失败。例如:

ep = EphemeralFile()  # the class in question
ep.write('Now is the winter of our discontent')
ep.seek(0)
print ep.read()

>> Now is the winter of our discontent

到目前为止一切都很好......但这就是失败的地方

ep.write(' made glorious summer by this sun of York')
ep.seek(0)
print ep.read()

>> Now is the winter of our discontent"d_"��U�L~ �w���S��h��]"U(��P^��9k

我究竟做错了什么?self._file.tell()属性中的使用不应该_cipher为解密产生适当的计数器位置吗?

请注意,我首先怀疑我可能会偏离一个计数器刻度,因此我尝试将initial_value=self.tell()行修改为initial_value=self.tell() + 1(也尝试使用-1),但无济于事。

为方便起见,这里是完整的类定义。它相当短,可能会产生一些见解。

import tempfile

from Crypto.Cipher import AES
from Crypto import Random
from Crypto.Util import Counter

PRNG = Random.new()


class EphemeralFile(object):
    def __init__(self, mode='w+b', bufsize=-1, suffix='', prefix='', dir=None,
                 key_size=32):

        self._key = PRNG.read(key_size)
        self._nonce = PRNG.read(8)
        self._opmode = AES.MODE_CTR

        self._file = tempfile.TemporaryFile(mode=mode, bufsize=bufsize,
                                            suffix=suffix, prefix=prefix, dir=dir)

        # alias tempfile methods and parameters
        self.close = self._file.close
        self.closed = self._file.closed
        self.encoding = self._file.encoding
        self.errors = self._file.errors
        self.fileno = self._file.fileno
        self.flush = self._file.flush
        self.isatty = self._file.isatty
        self.mode = self._file.mode
        self.name = self._file.name
        self.softspace = self._file.softspace
        self.truncate = self._file.truncate
        self.seek = self._file.seek
        self.tell = self._file.tell

    def __enter__(self):
        return self

    def __exit__(self, type, value, traceback):
        self._file.close()

    def __iter__(self):
        return (line for line in self.readlines())

    def _cipher():
        doc = "Returns a stateful AES object ready to decrypt at the required \
               stream position"

        def fget(self):
            ctr = Counter.new(nbits=64,
                              initial_value=self.tell(),
                              prefix=self._nonce)
            return AES.new(self._key, counter=ctr, mode=self._opmode)

        def fset(self, value):
            raise AttributeError('Cannot set EphemeralFile._cipher')

        def fdel(self):
            raise AttributeError('Cannot delete EphemeralFile._cipher')

        return locals()
    _cipher = property(**_cipher())

    def write(self, data):
        if not isinstance(data, str):
            raise TypeError('Data must be str (or bytestring)')

        self._file.write(self._cipher.encrypt(data))

    def writelines(self, lines):
        self.write("\n".join(lines))

    def read(self, size=-1):
        return self._cipher.decrypt(self._file.read(size))

    def readline(self, size=-1):
        fptr = self.tell()
        bytes = []
        got_line = False
        while not got_line:
            bytes.append(self.read(1))
            if not bytes[-1] or ('\n' in bytes[-1]):
                bytes[-1] = bytes[-1][0:bytes[-1].find('\n') + 1]
                got_line = True

        plaintext = ''.join(bytes)
        self.seek(fptr + len(plaintext))  # rewind
        return plaintext

    def readlines(self, size=-1):
        return [line for line in self]

    def read_ciphertext(self, size=-1):
        """Read ciphertext without decrypting.

        size : int (default -1)
            Number of bytes to read.  Negative values read the entire stream

        return : str
            Ciphertext
        """
        return self._file.read(size)

    def next(self):
        return self.readline()

在这一点上,我真的不知道问题出在哪里,所以请随时向我提出问题并提出可能的解决方案。

提前谢谢了!

4

1 回答 1

1

我相信一个问题是Counter对象应该作为initial_valueAES 块号接收,而不是字节偏移量。换句话说,您需要:

ctr = Counter.new(nbits=64,
                  initial_value=self.tell() % 16,
                  prefix=self._nonce)

这是必需的,因为在 AES CTR 模式下,每次越过 AES 数据边界(16 个字节)时,您都会增加该值。

这也意味着任何write操作的正确顺序大致是:

  1. 如上所示启动密码对象 - 在加密模式下。
  2. 加密self.tell() % 16任何数据的字节并丢弃结果。
  3. 加密输入数据并将其写入文件。

同样的阅读:

  1. 如上所示启动密码对象 - 在解密模式下。
  2. 解密self.tell() % 16任何数据的字节并丢弃结果。
  3. 解密从文件中读取的数据。

通过快速查看代码,第二个问题也可能是您使用相同的 AES 对象进行加密和解密。您需要两个单独的对象,每个方向一个。

于 2013-09-03T20:46:14.907 回答