9

我正在解析这样的文件:

--标题--
数据1
数据2
--标题--
数据3
数据4
数据5
--标题--
--标题--
...

我想要这样的团体:

[ [header, data1, data2], [header, data3, data4, data5], [header], [header], ... ]

所以我可以像这样遍历它们:

for grp in group(open('file.txt'), lambda line: 'header' in line):
    for item in grp:
        process(item)

并将检测组逻辑与处理组逻辑分开。

但是我需要一个可迭代的迭代,因为这些组可以任意大,我不想存储它们。也就是说,每次遇到“哨兵”或“标题”项目时,我都想将一个可迭代对象拆分为子组,如谓词所示。似乎这将是一项常见任务,但我找不到有效的 Pythonic 实现。

这是愚蠢的追加到列表的实现:

def group(iterable, isstart=lambda x: x):
    """Group `iterable` into groups starting with items where `isstart(item)` is true.

    Start items are included in the group.  The first group may or may not have a 
    start item.  An empty `iterable` results in an empty result (zero groups)."""
    items = []
    for item in iterable:
        if isstart(item) and items:
            yield iter(items)
            items = []
        items.append(item)
    if items:
        yield iter(items) 

感觉必须有一个不错的itertools版本,但它让我望而却步。'明显'(?!)groupby解决方案似乎不起作用,因为可能存在相邻的标题,并且它们需要分成不同的组。我能想到的最好的办法是(ab)使用groupby一个保持计数器的关键功能:

def igroup(iterable, isstart=lambda x: x):
    def keyfunc(item):
        if isstart(item):
            keyfunc.groupnum += 1       # Python 2's closures leave something to be desired
        return keyfunc.groupnum
    keyfunc.groupnum = 0
    return (group for _, group in itertools.groupby(iterable, keyfunc))

但我觉得 Python 可以做得更好——遗憾的是,这甚至比哑列表版本还要慢:

#ipython
%time deque(group(xrange(10 ** 7), lambda x: x % 1000 == 0), maxlen=0)
CPU 时间:用户 4.20 秒,系统:0.03 秒,总计:4.23 秒

%time deque(igroup(xrange(10 ** 7), lambda x: x % 1000 == 0), maxlen=0)
CPU 时间:用户 5.45 秒,系统:0.01 秒,总计:5.46 秒

为了方便您,这里有一些单元测试代码:

class Test(unittest.TestCase):
    def test_group(self):
        MAXINT, MAXLEN, NUMTRIALS = 100, 100000, 21
        isstart = lambda x: x == 0
        self.assertEqual(next(igroup([], isstart), None), None)
        self.assertEqual([list(grp) for grp in igroup([0] * 3, isstart)], [[0]] * 3)
        self.assertEqual([list(grp) for grp in igroup([1] * 3, isstart)], [[1] * 3])
        self.assertEqual(len(list(igroup([0,1,2] * 3, isstart))), 3)        # Catch hangs when groups are not consumed
        for _ in xrange(NUMTRIALS):
            expected, items = itertools.tee(itertools.starmap(random.randint, itertools.repeat((0, MAXINT), random.randint(0, MAXLEN))))
            for grpnum, grp in enumerate(igroup(items, isstart)):
                start = next(grp)
                self.assertTrue(isstart(start) or grpnum == 0)
                self.assertEqual(start, next(expected))
                for item in grp:
                    self.assertFalse(isstart(item))
                    self.assertEqual(item, next(expected))

那么:如何在 Python 中通过谓词优雅高效地对可迭代对象进行分组?

4

4 回答 4

5

如何在 Python 中通过谓词优雅高效地对可迭代对象进行分组?

这是一个简洁、节省内存的实现,与您的问题非常相似:

from itertools import groupby, imap
from operator import itemgetter

def igroup(iterable, isstart):
    def key(item, count=[False]):
        if isstart(item):
           count[0] = not count[0] # start new group
        return count[0]
    return imap(itemgetter(1), groupby(iterable, key))

它支持无限组。

tee基于 - 的解决方案稍微快一些,但它会消耗当前组的内存(类似于list问题中基于 - 的解决方案):

from itertools import islice, tee

def group(iterable, isstart):
    it, it2 = tee(iterable)
    count = 0
    for item in it:
        if isstart(item) and count:
            gr = islice(it2, count)
            yield gr
            for _ in gr:  # skip to the next group
                pass
            count = 0
        count += 1
    if count:
       gr = islice(it2, count)
       yield gr
       for _ in gr:  # skip to the next group
           pass

groupby- 解决方案可以在纯 Python 中实现:

def igroup_inline_key(iterable, isstart):
    it = iter(iterable)

    def grouper():
        """Yield items from a single group."""
        while not p[START]:
            yield p[VALUE]  # each group has at least one element (a header)
            p[VALUE] = next(it)
            p[START] = isstart(p[VALUE])

    p = [None]*2 # workaround the absence of `nonlocal` keyword in Python 2.x
    START, VALUE = 0, 1
    p[VALUE] = next(it)
    while True:
        p[START] = False # to distinguish EOF and a start of new group
        yield grouper()
        while not p[START]: # skip to the next group
            p[VALUE] = next(it)
            p[START] = isstart(p[VALUE])

为了避免重复代码,while True循环可以写成:

while True:
    p[START] = False  # to distinguish EOF and a start of new group
    g = grouper()
    yield g
    if not p[START]:  # skip to the next group
        for _ in g:
            pass
        if not p[START]:  # EOF
            break

虽然以前的变体可能更明确和可读。

我认为纯 Python 中的通用内存高效解决方案不会比groupby基于 - 的解决方案快得多。

如果process(item)比较快igroup()并且可以在字符串中有效地找到标头(例如,对于固定的静态标头),那么您可以通过大块读取文件并拆分标头值来提高性能。它应该使您的任务受 IO 限制。

于 2012-10-08T20:33:02.320 回答
4

我没有完全阅读您的所有代码,但我认为这可能会有所帮助:

from itertools import izip, tee, chain


def pairwise(iterable):
    a, b = tee(iterable)
    return izip(a, chain(b, [next(b, None)]))


def group(iterable, isstart):

    pairs = pairwise(iterable)

    def extract(current, lookahead, pairs=pairs, isstart=isstart):
        yield current
        if isstart(lookahead):
            return
        for current, lookahead in pairs:
            yield current
            if isstart(lookahead):
                return

    for start, lookahead in pairs:
        gen = extract(start, lookahead)
        yield gen
        for _ in gen:
            pass


for gen in group(xrange(4, 16), lambda x: x % 5 == 0):
    print '------------------'
    for n in gen:
        print n

print [list(g) for g in group([], lambda x: x % 5 == 0)]

结果:

$ python gen.py
------------------
4
------------------
5
6
7
8
9
------------------
10
11
12
13
14
------------------
15
[]

编辑:

这是另一种解决方案,与上述类似,但没有 the pairwise()and 一个哨兵。我不知道哪个更快:

def group(iterable, isstart):

    sentinel = object()

    def interleave(iterable=iterable, isstart=isstart, sentinel=sentinel):
        for item in iterable:
            if isstart(item):
                yield sentinel
            yield item

    items = interleave()

    def extract(item, items=items, isstart=isstart, sentinel=sentinel):
        if item is not sentinel:
            yield item
        for item in items:
            if item is sentinel:
                return
            yield item

    for lookahead in items:
        gen = extract(lookahead)
        yield gen
        for _ in gen:
            pass

现在两者都通过了测试用例,这要感谢 JFSebastians 的想法,即耗尽跳过的子组生成器。

于 2012-10-08T05:10:07.103 回答
2

关键是你必须编写一个生成子生成器的生成器。我的解决方案在概念上与@pillmuncher 的解决方案相似,但更加独立,因为它避免使用 itertools 机器制作辅助生成器。缺点是我必须使用一个有点不雅的临时列表。在 Python 3 中,这也许可以用nonlocal.

def grouper(iterable, isstart):
    it = iter(iterable)
    last = [next(it)]
    def subgroup():
        while True:
            toYield = last[0]
            try:
                last.append(next(it))
            except StopIteration, e:
                last.pop(0)
                yield toYield
                raise StopIteration
            else:
                yield toYield
                last.pop(0)
            if isstart(last[0]):
                raise StopIteration
    while True:
        sg = subgroup()
        yield sg
        if len(last) == 2:
            # subgenerator was aborted before completion, let's finish it
            for a in sg:
                pass
        if last:
            # sub-generator left next element waiting, next sub-generator will yield it
            pass
        else:
            # sub-generator left "last" empty because source iterable was exhausted
            raise StopIteration

>>> for g in grouper([0, 1, 1, 0, 1, 0, 1, 1, 1, 1, 0], lambda x: x==0):
...     print "Group",
...     for i in g:
...         print i,
...     print
Group 0 1 1
Group 0 1
Group 0 1 1 1 1
Group 0

我不知道这在性能方面是什么样的,我只是这样做是因为这只是一件有趣的事情。

编辑:我对你原来的两个和我的进行了单元测试。看起来我的比你的快一点,igroup但仍然比基于列表的版本慢。在这里,您必须在速度和内存之间做出权衡,这似乎很自然。如果您知道组不会太大,请使用基于列表的版本以提高速度。如果组可能很大,请使用基于生成器的版本来降低内存使用量。

编辑:上面的编辑版本以不同的方式处理中断。如果你跳出子生成器但恢复外部生成器,它将跳过中止组的其余部分并从下一个组开始:

>>> for g in grouper([0, 1, 2, 88, 3, 0, 1, 88, 2, 3, 4, 0, 1, 2, 3, 88, 4], lambda x: x==0):
...     print "Group",
...     for i in g:
...         print i,
...         if i==88:
...             break
...     print
Group 0 1 2 88
Group 0 1 88
Group 0 1 2 3 88
于 2012-10-08T05:32:42.700 回答
0

groupby所以这是另一个版本,它试图将来自with的子组对拼接在一起chain。对于给定的性能测试,它明显更快,但当有很多小组时(比如isstart = lambda x: x % 2 == 0)要慢得多。它欺骗和缓冲重复的标题(你可以用 read-all-but-last 迭代器技巧来解决这个问题)。这也是优雅部门的退步,所以我想我还是更喜欢原版。

def group2(iterable, isstart=lambda x: x):
    groups = itertools.groupby(iterable, isstart)
    start, group = next(groups)
    if not start:                   # Deal with initial non-start group
        yield group
        _, group = next(groups)
    groups = (grp for _, grp in groups)
    while True:                     # group will always be start item(s) now      
        group = list(group)         
        for item in group[0:-1]:    # Back-to-back start items... and hope this doesn't get very big.  :)
            yield iter([item])      
        yield itertools.chain([group[-1]], next(groups, []))       # Start item plus subsequent non-start items
        group = next(groups)
%time deque(group2(xrange(10 ** 7), lambda x: x % 1000 == 0), maxlen=0)
CPU 时间:用户 3.13 秒,系统:0.00 秒,总计:3.13 秒
于 2012-10-09T01:36:24.350 回答