0

PS 开始了一个问题https://github.com/robinhood/faust/issues/702

开发浮士德-app:

from concurrent.futures import ProcessPoolExecutor, as_completed

import faust

app = faust.App('my-app-name', broker='kafka://localhost:9092')
sink = app.topic('topic')


@app.task()
async def check():
    # 3 is amount of different folders where archives are laced
    with ProcessPoolExecutor(max_workers=3) as executor:
        fs = [executor.submit(handle, directory) for directory in ['dir1', 'dir2', 'dir3']]
        for future in as_completed(fs):
            future.result()


def handle(directory):
    # finding archives in directory
    # unpacking 7z with mdb-files
    # converting mdb tables to csv
    # reading csv to dataframe
    # some data manipulating
    # and at last sending dataframe records to kafka
    f = sink.send_soon(value={'ts': 1234567890, 'count': 10})  # always in pending status

当方法sink.send_soon返回始终处于挂起状态的FutureMessage(asyncio.Future, Awaitable[RecordMetadata])时遇到问题。

这就是未来在另一个未来中的情况。

笔记。函数句柄应该是同步的,因为不能将异步函数传递给 ProcessPollExecutor。方法send_soon是同步方法。根据此示例https://github.com/robinhood/faust/blob/b5e159f1d104ad4a6aa674d14b6ba0be19b5f9f5/examples/windowed_aggregation.py#L47不一定等待。

如果有什么方法可以处理未决的未来?

也试过这个:

import asyncio
from concurrent.futures import ProcessPoolExecutor

import faust

loop = asyncio.get_event_loop()

app = faust.App('my-app-name', broker='kafka://localhost:9092', loop=loop)
sink = app.topic('topic')


@app.task()
async def check():
    tasks = []
    with ProcessPoolExecutor(max_workers=3) as executor:
        for dir_ in ['dir1', 'dir2', 'dir3']:
            task = asyncio.create_task(run_dir_handling(executor, dir_))
            tasks.append(task)

        await asyncio.gather(*tasks)


async def run_dir_handling(executor, dir_):
    print('running blocking')
    await loop.run_in_executor(executor, handle, dir_)


def handle(directory):
    print('Handle directory')

    # finding archives in directory
    # unpacking 7z with mdb-files
    # converting mdb tables to csv
    # reading csv to dataframe
    # some data manipulating
    # and at last sending dataframe records to kafka

    # `send_soon` is not non-`async def but `send` is async
    # async `soon` cannot be implemented because of
    #    `await loop.run_in_executor(executor, handle, dir_) TypeError: cannot pickle 'coroutine' object` error
    f = sink.send_soon(value={'ts': 1234567890, 'count': 10, 'dir': directory})
    print(f)  # always <FutureMessage pending>

但它也没有奏效。

似乎循环甚至没有机会运行 send_soon 方法。

4

1 回答 1

0

为此更改了代码结构:

import asyncio
from concurrent.futures import ProcessPoolExecutor

import faust

loop = asyncio.get_event_loop()

app = faust.App('my-app-name', broker='kafka://localhost:9092')
sink = app.topic('topic1')


@app.task()
async def check():
    tasks = []

    with ProcessPoolExecutor(max_workers=3) as executor:
        for dir_ in ['dir1', 'dir2', 'dir3']:
            task = asyncio.create_task(run_dir_handling(executor, dir_))
            tasks.append(task)

        await asyncio.gather(*tasks)


async def run_dir_handling(executor, dir_):
    directory = await loop.run_in_executor(executor, handle, dir_)
    await sink.send(value={'dir': directory})  
    

def handle(directory):
    print('Handle directory')

    # finding archives in directory
    # unpacking 7z with mdb-files
    # converting mdb tables to csv
    # reading csv to dataframe
    # some data manipulating
    # and at last sending dataframe records to kafka

    return directory
于 2021-01-25T14:28:53.813 回答