4

我正在涉足 Python 线程。我创建了一个供应商线程,它通过队列从 *nix(串行)/dev 向我返回字符/行数据。

作为练习,我想一次一行地使用队列中的数据(使用 '\n' 作为行终止符)。

我当前的(简单的)解决方案是一次只将 1 个字符放入队列中,因此消费者一次只能获取()一个字符。(这是一个安全的假设吗?)这种方法目前允许我执行以下操作:

...
return_buffer = []
while True:
    rcv_data = queue.get(block=True)
    return_buffer.append(rcv_data)        
    if rcv_data == "\n":
        return return_buffer

这似乎有效,但是当我一次 put() 2 个字符时,我肯定会导致它失败。

我想让接收逻辑更通用,并且能够处理多字符 put()。

我的下一个方法是 rcv_data.partition("\n"),将“剩余”放在另一个缓冲区/列表中,但这需要在队列旁边处理临时缓冲区。(我想另一种方法是一次只 put() 一行,但那有什么乐趣呢?)

有没有更优雅的方式从队列中一次读取一行?

4

4 回答 4

3

这对于生成器来说可能是一个很好的用途。它会在产量后准确地从中断的地方恢复,因此减少了您需要的存储量和缓冲区交换(我无法谈论它的性能)。

def getLineGenerator(queue, splitOn):
    return_buffer = []
    while True:
        rcv_data = queue.get(block=True) # We can pull any number of characters here.
        for c in rcv_data:
            return_buffer.append(c)
            if c == splitOn:
                yield return_buffer
                return_buffer = []


gen = getLineGenerator(myQueue, "\n")
for line in gen:
    print line.strip()

编辑:

一旦 JF Sebastian 指出行分隔符可能是多字符,我也必须解决这种情况。我还使用了 jdi 答案中的 StringIO 。我再次无法谈论效率,但我相信它在所有情况下都是正确的(至少是我能想到的那些)。这是未经测试的,因此可能需要一些调整才能实际运行。感谢 JF Sebastian 和 jdi 的回答,最终导致了这个问题。

def getlines(chunks, splitOn="\n"):
    r_buffer = StringIO()
    for chunk in chunks
        r_buffer.write(chunk)
        pos = r_buffer.getvalue().find(splitOn) # can't use rfind see the next comment
        while pos != -1: # A single chunk may have more than one separator
            line = r_buffer.getvalue()[:pos + len(splitOn)]
            yield line
            rest = r_buffer.getvalue().split(splitOn, 1)[1]
            r_buffer.seek(0)
            r_buffer.truncate()
            r_buffer.write(rest)
            pos = rest.find(splitOn) # rest and r_buffer are equivalent at this point. Use rest to avoid an extra call to getvalue

    line = r_buffer.getvalue();
    r_buffer.close() # just for completeness
    yield line # whatever is left over.

for line in getlines(iter(queue.get, None)): # break on queue.put(None)
    process(line)
于 2012-09-05T21:28:34.033 回答
2

如果您的特定用例生产者需要逐个字符地放入队列中,那么我想我看不出让它们在消费者中循环有什么问题。但是您可能可以通过使用StringIO对象作为缓冲区来获得更好的性能。

from cStringIO import StringIO
# python3: from io import StringIO

buf = StringIO()

该对象如果是类似文件的,那么您可以write随时对其进行查找、查找和调用getvalue()以获取缓冲区中的完整字符串值。与必须不断增长列表、将其连接到字符串并清除它相比,这很可能会给您带来更好的性能。

return_buffer = StringIO()
while True:
    rcv_data = queue.get(block=True)
    return_buffer.write(rcv_data)        
    if rcv_data == "\n":
        ret = return_buffer.getvalue()
        return_buffer.seek(0)
        # truncate, unless you are counting bytes and
        # reading the data directly each time
        return_buffer.truncate()

        return ret
于 2012-09-05T21:20:00.577 回答
1

队列完全返回您放入其中的内容。如果你放碎片,你会得到碎片。如果你放线,你就会得到线。

如果允许输入中的部分行并且可以稍后完成,则要逐行使用,您需要一个显式或隐式缓冲区来存储部分行:

def getlines(fragments, linesep='\n'):
    buff = []
    for fragment in fragments:
        pos = fragment.rfind(linesep)
        if pos != -1: # linesep in fragment
           lines = fragment[:pos].split(linesep)
           if buff: # start of line from previous fragment
              line[0] = ''.join(buff) + line[0] # prepend
              del buff[:] # clear buffer
           rest = fragment[pos+len(linesep):]
           if rest:
              buff.append(rest)
           yield from lines
        elif fragment: # linesep not in fragment, fragment is not empty
           buff.append(fragment)

    if buff:
       yield ''.join(buff) # flush the rest

它允许片段,任意长度的行。linesep 不应跨越多个片段。

用法:

for line in getlines(iter(queue.get, None)): # break on queue.put(None)
    process(line)
于 2012-09-05T22:07:39.787 回答
1

重要的是要注意队列中可能有多行。此函数将返回(并可选择打印)给定队列中的所有行:

def getQueueContents(queue, printContents=True):
    contents = ''
    # get the full queue contents, not just a single line
    while not queue.empty():
        line = queue.get_nowait()
        contents += line
        if printContents:
            # remove the newline at the end
            print line[:-1]
    return contents
于 2015-10-12T21:12:31.733 回答