PS 开始了一个问题https://github.com/robinhood/faust/issues/702
开发浮士德-app:
from concurrent.futures import ProcessPoolExecutor, as_completed
import faust
app = faust.App('my-app-name', broker='kafka://localhost:9092')
sink = app.topic('topic')
@app.task()
async def check():
# 3 is amount of different folders where archives are laced
with ProcessPoolExecutor(max_workers=3) as executor:
fs = [executor.submit(handle, directory) for directory in ['dir1', 'dir2', 'dir3']]
for future in as_completed(fs):
future.result()
def handle(directory):
# finding archives in directory
# unpacking 7z with mdb-files
# converting mdb tables to csv
# reading csv to dataframe
# some data manipulating
# and at last sending dataframe records to kafka
f = sink.send_soon(value={'ts': 1234567890, 'count': 10}) # always in pending status
当方法sink.send_soon返回始终处于挂起状态的FutureMessage(asyncio.Future, Awaitable[RecordMetadata])时遇到问题。
这就是未来在另一个未来中的情况。
笔记。函数句柄应该是同步的,因为不能将异步函数传递给 ProcessPollExecutor。方法send_soon是同步方法。根据此示例https://github.com/robinhood/faust/blob/b5e159f1d104ad4a6aa674d14b6ba0be19b5f9f5/examples/windowed_aggregation.py#L47不一定等待。
如果有什么方法可以处理未决的未来?
也试过这个:
import asyncio
from concurrent.futures import ProcessPoolExecutor
import faust
loop = asyncio.get_event_loop()
app = faust.App('my-app-name', broker='kafka://localhost:9092', loop=loop)
sink = app.topic('topic')
@app.task()
async def check():
tasks = []
with ProcessPoolExecutor(max_workers=3) as executor:
for dir_ in ['dir1', 'dir2', 'dir3']:
task = asyncio.create_task(run_dir_handling(executor, dir_))
tasks.append(task)
await asyncio.gather(*tasks)
async def run_dir_handling(executor, dir_):
print('running blocking')
await loop.run_in_executor(executor, handle, dir_)
def handle(directory):
print('Handle directory')
# finding archives in directory
# unpacking 7z with mdb-files
# converting mdb tables to csv
# reading csv to dataframe
# some data manipulating
# and at last sending dataframe records to kafka
# `send_soon` is not non-`async def but `send` is async
# async `soon` cannot be implemented because of
# `await loop.run_in_executor(executor, handle, dir_) TypeError: cannot pickle 'coroutine' object` error
f = sink.send_soon(value={'ts': 1234567890, 'count': 10, 'dir': directory})
print(f) # always <FutureMessage pending>
但它也没有奏效。
似乎循环甚至没有机会运行 send_soon 方法。