4

为了解决一个研究问题,我们必须在 python 中组织位掩码搜索。作为输入,我们有一个原始数据(我们将其呈现为位序列)。大小约为 1,5Gb。作为输出,我们必须获得特定位掩码的出现次数。让我举个例子来描述这种情况

input:    sequence of bits, a bitmask to search(mask length: 12bits)

第一个想法(不是有效的)是像这样使用 XOR:

1step: from input we take 12 first bits(position 0 to 11) and make XOR with mask 
2step: from input we take bits from 1 to 12 position and XOR with mask ...

让我们进行 2 个第一步:

input sequence 100100011110101010110110011010100101010110101010
mask to search: 100100011110
step 1: take first 12 bits from input: 100100011110 and XOR it with mask.
step 2: teke bits from 1 to 12position: 001000111101 and XOR it with mask.
...

问题是:如何组织从输入中获取位?我们能够获取前 12 位,但是我们如何才能从 1 到 12 位置获取我们需要进行下一次迭代的位?

在我们使用 python BitString 包之前,但是我们搜索所有掩码所花费的时间非常高。还有一个。掩码的大小可以从 12 位到 256 位。有什么建议吗?任务必须在python中实现

4

3 回答 3

4

您的算法是在数据中搜索“字符串”的天真方法,但幸运的是有更好的算法。一个示例是KMP 算法,但还有其他可能更适合您的用例。

使用更好的算法,您可以将复杂度从O(n*m)变为O(n+m)

于 2011-02-06T22:19:20.810 回答
3

如果您的掩码是 8 位的倍数,则您的搜索将变成一个相对微不足道的字节比较,并且任何子字符串搜索算法就足够了(我建议转换为字符串并使用内置搜索,因为您可能会遇到字符验证失败的问题问题。)

sequence = <list of 8-bit integers>
mask = [0b10010001, 0b01101101]
matches = my_substring_search(sequence, mask)

对于大于 8 位但不是 8 倍数的掩码,我建议将掩码截断为 8 的倍数并使用与上述相同的子字符串搜索。然后对于找到的任何匹配项,您可以测试其余部分。

sequence = <list of 8-bit integers>
mask_a = [0b10010001]
mask_b = 0b01100000
mask_b_pattern = 0b11110000   # relevant bits of mask_b
matches = my_substring_search(sequence, mask_a)

for match in matches:
    if (sequence[match+len(mask_a)] & mask_b_pattern) == mask_b:
        valid_match = True  # or something more useful...

如果sequence是 4096 字节的列表,您可能需要考虑部分之间的重叠。这可以通过制作sequence一个4096+ceil(mask_bits/8.0)字节列表来轻松完成,但每次读取下一个块时仍然只前进 4096。


作为生成和使用这些掩码的演示:

class Mask(object):
    def __init__(self, source, source_mask):
        self._masks = list(self._generate_masks(source, source_mask))

    def match(self, buffer, i, j):
        return any(m.match(buffer, i, j) for m in self._masks)

    class MaskBits(object):
        def __init__(self, pre, pre_mask, match_bytes, post, post_mask):
            self.match_bytes = match_bytes
            self.pre, self.pre_mask = pre, pre_mask
            self.post, self.post_mask = post, post_mask

        def __repr__(self):
            return '(%02x %02x) (%s) (%02x %02x)' % (self.pre, self.pre_mask,
                ', '.join('%02x' % m for m in self.match_bytes),
                self.post, self.post_mask)

        def match(self, buffer, i, j):
            return (buffer[i:j] == self.match_bytes and
                    buffer[i-1] & self.pre_mask == self.pre and
                    buffer[j] & self.post_mask == self.post)

    def _generate_masks(self, src, src_mask):
        pre_mask = 0
        pre = 0
        post_mask = 0
        post = 0
        while pre_mask != 0xFF:
            src_bytes = []
            for i in (24, 16, 8, 0):
                if (src_mask >> i) & 0xFF == 0xFF:
                    src_bytes.append((src >> i) & 0xFF)
                else:
                    post_mask = (src_mask >> i) & 0xFF
                    post = (src >> i) & 0xFF
                    break
            yield self.MaskBits(pre, pre_mask, src_bytes, post, post_mask)
            pre += pre
            pre_mask += pre_mask
            if src & 0x80000000: pre |= 0x00000001
            pre_mask |= 0x00000001
            src = (src & 0x7FFFFFFF) * 2
            src_mask = (src_mask & 0x7FFFFFFF) * 2

此代码不是完整的搜索算法,它构成验证匹配的一部分。Mask 对象由源值和源掩码构成,均左对齐且(在此实现中)32 位长:

src = 0b11101011011011010101001010100000
src_mask = 0b11111111111111111111111111100000

缓冲区是一个字节值列表:

buffer_1 = [0x7e, 0xb6, 0xd5, 0x2b, 0x88]

一个 Mask 对象生成一个移位掩码的内部列表:

>> m = Mask(src, src_mask)
>> m._masks
[(00 00) (eb, 6d, 52) (a0 e0),
 (01 01) (d6, da, a5) (40 c0),
 (03 03) (ad, b5, 4a) (80 80),
 (07 07) (5b, 6a, 95) (00 00),
 (0e 0f) (b6, d5) (2a fe),
 (1d 1f) (6d, aa) (54 fc),
 (3a 3f) (db, 54) (a8 f8),
 (75 7f) (b6, a9) (50 f0)]

中间元素是完全匹配的子字符串(没有简洁的方法可以按原样从该对象中获取它,但它是m._masks[i].match_bytes)。一旦你使用了一种有效的算法来找到这个子序列,你就可以使用 来验证周围的位m.match(buffer, i, j),其中i是第一个匹配字节j的索引,是最后一个匹配字节之后的字节的索引(例如buffer[i:j] == match_bytes)。

buffer上面,可以从第 5 位开始找到位序列,这意味着_masks[4].match_bytes可以在buffer[1:3]. 因此:

>> m.match(buffer, 1, 3)
True

(随意使用、改编、修改、出售或以任何可能的方式折磨此代码。我非常喜欢将它放在一起 - 一个有趣的问题 - 尽管我不会对任何错误负责,所以请确保你彻底测试它!)

于 2011-02-06T22:34:51.177 回答
1

在字节数据中搜索位模式比典型搜索更具挑战性。通常的算法并不总是能很好地工作,因为从字节数据中提取每一位都是有成本的,而且只有两个字符的“字母”,所以碰巧有 50% 的比较会匹配(这使得许多算法效率较低)。

你提到尝试位串模块(我写的),但它太慢了。这对我来说并不奇怪,所以如果有人对如何做到这一点有任何好主意,我会注意!但是位串的方式为您提供了可能的加速:

为了进行匹配,位串将字节数据块转换为普通的“0”和“1”字符串,然后使用 Python 的find方法进行快速搜索。很多时间都花在将数据转换为字符串上,但是当您多次搜索相同的数据时,可以节省大量时间。

masks = ['0000101010100101', '010100011110110101101', '01010101101']
byte_data_chunk = bytearray('blahblahblah')
# convert to a string with one character per bit
# uses lookup table not given here!
s = ''.join(BYTE_TO_BITS[x] for x in byte_data_chunk)
for mask in masks:
    p = s.find(mask)
    # etc.

find关键是,一旦您转换为普通字符串,您就可以使用经过很好优化的内置来搜索每个掩码。当您使用位串时,它必须对每个会影响性能的掩码进行转换。

于 2011-02-07T17:14:48.070 回答