0

成功的浮士德消费者和生产者(没有 CPU 与 concurrent.futures.ProcessPoolExecutor 绑定)

消费者

import faust

app = faust.App('myapp', broker='kafka://127.0.0.1:9092')

class Add(faust.Record):
    a: int
    b: int


topic = app.topic('test', value_type=Add)

@app.agent(topic)
async def adding(stream):
    async for value in stream:

        yield value.a + value.b

app.main()

制片人

"""Simple Faust Producer"""
import faust


class Add(faust.Record):
    a: int
    b: int
</code></pre>
if __name__ == '__main__':
    """Simple Faust Producer"""

    app = faust.App('faust_test_app', broker='kafka://127.0.0.1:9092')
    topic = app.topic('test')

    @app.timer(interval=1.0)
    async def send_message(message):
        await topic.send(value=msg)
    app.main()

faust.readthedocs.io/en/latest/userguide/agents.html 中的示例效果很好。

但是,如果我试图将 CPU 绑定代码(来自 docs.python.org/3/library/asyncio-eventloop.html)添加到我的消费者,那么它看起来像:

使用 run_in_executor 的消费者

import faust
import asyncio
import concurrent.futures
app = faust.App('myapp', broker='kafka://127.0.0.1:9092')

class Add(faust.Record):
    a: int
    b: int
topic = app.topic('test', value_type=Add)


def cpu_bound():
    return sum(i * i for i in range(10 ** 7))

@app.agent(topic)
async def adding(stream):
    async for value in stream:
        loop = asyncio.get_running_loop()
        with concurrent.futures.ProcessPoolExecutor() as pool:
            result = await loop.run_in_executor(pool, cpu_bound)
            print('custom process pool', result)

        yield value.a + value.b

app.main()

我收到了工人崩溃的警告:

Crashed reason=BrokenProcessPool('A process in the process pool was terminated abruptly while the future was running or pending.') 
Traceback (most recent call last):
File "/usr/local/anaconda3/envs/kaf_send/lib/python3.9/site-packages/faust/agents/agent.py", line 647, in _execute_actor
await coro
File "/usr/local/anaconda3/envs/kaf_send/lib/python3.9/site-packages/faust/agents/agent.py", line 664, in _slurp
async for value in it:
File "main.py", line 28, in adding
result = await loop.run_in_executor(
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
[2021-06-28 16:24:34,695] [82856] [INFO] [^----OneForOneSupervisor: (1@0x7f8c929a76d0)]: Restarting dead <Agent*: __main__.adding>! Last crash reason: BrokenProcessPool('A process in the process pool was terminated abruptly while the future was running or pending.') 
NoneType: None

它是否尝试在进程初始化期间使用相同的端口(正如我在 PyCharm 中设置的那样)再启动一个工作程序,或者我在 faust 消费者中的 cpu_bound 代码做错了什么?

4

0 回答 0