如何按惯用方式批量处理序列的元素?
例如,对于序列“abcdef”和批量大小为 2,我想做如下的事情:
for x, y in "abcdef":
print "%s%s\n" % (x, y)
ab
cd
ef
当然,这不起作用,因为它期望列表中的单个元素本身包含 2 个元素。
什么是一种不错的、简短的、干净的、pythonic 的方式来处理批处理中列表的下 n 个元素,或者来自较大字符串的长度为 n 的子字符串(两个类似的问题)?
生成器函数会很整洁:
def batch_gen(data, batch_size):
for i in range(0, len(data), batch_size):
yield data[i:i+batch_size]
示例使用:
a = "abcdef"
for i in batch_gen(a, 2): print i
印刷:
ab
cd
ef
我有另一种方法,适用于没有已知长度的迭代。
def groupsgen(seq, size):
it = iter(seq)
while True:
values = ()
for n in xrange(size):
values += (it.next(),)
yield values
它通过按大小组迭代序列(或其他迭代器),收集元组中的值来工作。在每个组的末尾,它产生元组。
当迭代器用完值时,它会产生一个 StopIteration 异常,然后向上传播,表明 groupsgen 没有值。
它假设这些值以大小集合(2、3 等集合)的形式出现。如果没有,任何剩余的值都将被丢弃。
不要忘记 zip() 函数:
a = 'abcdef'
for x,y in zip(a[::2], a[1::2]):
print '%s%s' % (x,y)
我相信有人会想出更多的“Pythonic”,但是怎么样:
for y in range(0, len(x), 2):
print "%s%s" % (x[y], x[y+1])
请注意,这只有在您知道这一点时才有效len(x) % 2 == 0;
但更一般的方法是(受此答案的启发):
for i in zip(*(seq[i::size] for i in range(size))):
print(i) # tuple of individual values
然后总是有文档。
def pairwise(iterable):
"s -> (s0,s1), (s1,s2), (s2, s3), ..."
a, b = tee(iterable)
try:
b.next()
except StopIteration:
pass
return izip(a, b)
def grouper(n, iterable, padvalue=None):
"grouper(3, 'abcdefg', 'x') --> ('a','b','c'), ('d','e','f'), ('g','x','x')"
return izip(*[chain(iterable, repeat(padvalue, n-1))]*n)
注意:当给定字符串序列作为输入时,这些生成元组而不是子字符串。
>>> a = "abcdef"
>>> size = 2
>>> [a[x:x+size] for x in range(0, len(a), size)]
['ab', 'cd', 'ef']
..或者,不是作为列表理解:
a = "abcdef"
size = 2
output = []
for x in range(0, len(a), size):
output.append(a[x:x+size])
或者,作为生成器,如果多次使用最好(对于一次性的东西,列表理解可能是“最好的”):
def chunker(thelist, segsize):
for x in range(0, len(thelist), segsize):
yield thelist[x:x+segsize]
..它的用法:
>>> for seg in chunker(a, 2):
... print seg
...
ab
cd
ef
您可以创建以下生成器
def chunks(seq, size):
a = range(0, len(seq), size)
b = range(size, len(seq) + 1, size)
for i, j in zip(a, b):
yield seq[i:j]
并像这样使用它:
for i in chunks('abcdef', 2):
print(i)
来自more_itertools的文档:more_itertools.chunked()
more_itertools.chunked(iterable, n)
将一个可迭代对象分解为给定长度的列表:
>>> list(chunked([1, 2, 3, 4, 5, 6, 7], 3))
[[1, 2, 3], [4, 5, 6], [7]]
如果 iterable 的长度不能被 n 整除,则最后返回的列表会更短。
s = 'abcdefgh'
for e in (s[i:i+2] for i in range(0,len(s),2)):
print(e)
itertools 文档对此有一个秘诀:
from itertools import izip_longest
def grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx
args = [iter(iterable)] * n
return izip_longest(fillvalue=fillvalue, *args)
用法:
>>> l = [1,2,3,4,5,6,7,8,9]
>>> [z for z in grouper(l, 3)]
[(1, 2, 3), (4, 5, 6), (7, 8, 9)]
除了两个答案外,我看到很多批次的过早实现和下标(不适用于所有迭代器)。因此我想出了这个替代方案:
def iter_x_and_n(iterable, x, n):
yield x
try:
for _ in range(n):
yield next(iterable)
except StopIteration:
pass
def batched(iterable, n):
if n<1: raise ValueError("Can not create batches of size %d, number must be strictly positive" % n)
iterable = iter(iterable)
try:
for x in iterable:
yield iter_x_and_n(iterable, x, n-1)
except StopIteration:
pass
令我震惊的是,没有针对此问题的单行或少行解决方案(据我所知)。关键问题是外部生成器和内部生成器都需要正确处理 StopIteration。外部生成器应该只在至少剩下一个元素时才产生一些东西。检查这一点的直观方法是执行 next(...) 并捕获 StopIteration。
改编自Python 3 的这个答案:
def groupsgen(seq, size):
it = iter(seq)
iterating = True
while iterating:
values = ()
try:
for n in range(size):
values += (next(it),)
except StopIteration:
iterating = False
if not len(values):
return None
yield values
如果它们的数字不能被 整除,它将安全终止并且不会丢弃值size
。
迭代工具怎么样?
from itertools import islice, groupby
def chunks_islice(seq, size):
while True:
aux = list(islice(seq, 0, size))
if not aux: break
yield "".join(aux)
def chunks_groupby(seq, size):
for k, chunk in groupby(enumerate(seq), lambda x: x[0] / size):
yield "".join([i[1] for i in chunk])
给定
from __future__ import print_function # python 2.x
seq = "abcdef"
n = 2
代码
while seq:
print("{}".format(seq[:n]), end="\n")
seq = seq[n:]
输出
ab
cd
ef
这是一个解决方案,它产生一系列迭代器,每个迭代器迭代n 个项目。
def groupiter(thing, n):
def countiter(nextthing, thingiter, n):
yield nextthing
for _ in range(n - 1):
try:
nextitem = next(thingiter)
except StopIteration:
return
yield nextitem
thingiter = iter(thing)
while True:
try:
nextthing = next(thingiter)
except StopIteration:
return
yield countiter(nextthing, thingiter, n)
我使用它如下:
table = list(range(250))
for group in groupiter(table, 16):
print(' '.join('0x{:02X},'.format(x) for x in group))
请注意,它可以处理对象的长度不是n的倍数。
一种解决方案,尽管我挑战某人做得更好;-)
a = 'abcdef'
b = [[a[i-1], a[i]] for i in range(1, len(a), 2)]
for x, y in b:
print "%s%s\n" % (x, y)