成功的浮士德消费者和生产者(没有 CPU 与 concurrent.futures.ProcessPoolExecutor 绑定)
消费者
import faust
app = faust.App('myapp', broker='kafka://127.0.0.1:9092')
class Add(faust.Record):
a: int
b: int
topic = app.topic('test', value_type=Add)
@app.agent(topic)
async def adding(stream):
async for value in stream:
yield value.a + value.b
app.main()
制片人
"""Simple Faust Producer"""
import faust
class Add(faust.Record):
a: int
b: int
</code></pre>
if __name__ == '__main__':
"""Simple Faust Producer"""
app = faust.App('faust_test_app', broker='kafka://127.0.0.1:9092')
topic = app.topic('test')
@app.timer(interval=1.0)
async def send_message(message):
await topic.send(value=msg)
app.main()
faust.readthedocs.io/en/latest/userguide/agents.html 中的示例效果很好。
但是,如果我试图将 CPU 绑定代码(来自 docs.python.org/3/library/asyncio-eventloop.html)添加到我的消费者,那么它看起来像:
使用 run_in_executor 的消费者
import faust
import asyncio
import concurrent.futures
app = faust.App('myapp', broker='kafka://127.0.0.1:9092')
class Add(faust.Record):
a: int
b: int
topic = app.topic('test', value_type=Add)
def cpu_bound():
return sum(i * i for i in range(10 ** 7))
@app.agent(topic)
async def adding(stream):
async for value in stream:
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_bound)
print('custom process pool', result)
yield value.a + value.b
app.main()
我收到了工人崩溃的警告:
Crashed reason=BrokenProcessPool('A process in the process pool was terminated abruptly while the future was running or pending.')
Traceback (most recent call last):
File "/usr/local/anaconda3/envs/kaf_send/lib/python3.9/site-packages/faust/agents/agent.py", line 647, in _execute_actor
await coro
File "/usr/local/anaconda3/envs/kaf_send/lib/python3.9/site-packages/faust/agents/agent.py", line 664, in _slurp
async for value in it:
File "main.py", line 28, in adding
result = await loop.run_in_executor(
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
[2021-06-28 16:24:34,695] [82856] [INFO] [^----OneForOneSupervisor: (1@0x7f8c929a76d0)]: Restarting dead <Agent*: __main__.adding>! Last crash reason: BrokenProcessPool('A process in the process pool was terminated abruptly while the future was running or pending.')
NoneType: None
它是否尝试在进程初始化期间使用相同的端口(正如我在 PyCharm 中设置的那样)再启动一个工作程序,或者我在 faust 消费者中的 cpu_bound 代码做错了什么?