4

我正在尝试在一段时间后将浮士德表的数据(计数)发布到 kafka 主题。当我发布一些简单的字符串时,计时器正在工作,但它无法以某种方式访问​​表的数据。以下是定时器的代码:

@app.timer(interval=10.0)
async def publish_to_anomaly_topic():
            await anomaly_topic.send(
            value=str(page_views['total'].value())
          )
@app.agent(page_view_topic)
async def count_page_views(views):
    async for view in views.group_by(PageView.id):
        total=0
        page_views[view.id]+=1
        for everykey in list(page_views.keys()):
            if everykey != 'total':
                total+=page_views[everykey].value()
        page_views['total'] = total

代理工作正常。我能够正确地看到这些值。

4

2 回答 2

1

在我尝试做同样的事情时发现了这个问题,这就是我能够弄清楚的方法。

https://faust.readthedocs.io/en/latest/userguide/tables.html

您不能在流操作之外修改表;这意味着您只能从 async for event in stream: block 中对表进行变异。我们需要这样做以将表的分区与流的分区对齐,并确保源主题分区在发生故障时正确地重新平衡到不同的工作人员,以及任何必要的表分区。

在流之外修改表会引发错误:

该文档说您无法在流操作之外访问/修改表。

为了解决这个问题,您可以将计时器功能分成两部分:

@app.timer(10)
async def my_timer_function():
    # value does not matter as much as the send operation
    await my_calling_function.send(value="send data now!") 

@app.agent()
async def my_calling_function(stream_from_timer_func):
    async for message in stream_from_timer_func:
        print(message) # this will print "send data now!"
        table_data = my_table['key']
        # Here is where you can access your table data and finish sending the 
        # message to the topic you want
        await my_topic.send(value=table_data)

如您所见,如果您使用计时器功能向代理发送消息,您可以访问您想要的表,它只需要在一个

async for event in stream:

代码块。

于 2021-05-21T18:42:16.773 回答
0

经过大量实验,事实证明您无法与应用计时器一起访问表的值(即使您在创建表时指定了relative_field选项)。此问题的解决方法是创建另一个表来维护消息的时间戳并在业务逻辑中使用它们。

   if view.timestamp-page_views_timer[view.id+'_first_timestamp'] > 60:
         await anomaly_topic.send(value={//the data to be sent})

其中 page_views_timer 是创建的新表。

于 2019-10-24T07:20:52.563 回答