0

我正在尝试使用 apache Beam 将消息从 kafka 消费者流式传输到 30 秒窗口。使用 beam_nuggets.io 读取 kafka 主题。

你可以在下面看到我的代码:

with beam.Pipeline(options=PipelineOptions()) as p:
    consumer_message = (p | "Reading messages from Kafka" >> kafkaio.KafkaConsume(consumer_config=consumer_config)
                        | 'window' >> beam.WindowInto(window.FixedWindows(30))
                        | 'groupBy' >> beam.GroupByKey()
                        | beam.Map(print))

GroupByKey 仍然不产生任何输出。

我的消费信息:

(None, '{"userId": null, "visitorId": "1cb8b48d-6495-44fc-9ba5-ba28d71933a7", "ip": "10.212.134.89", "userAgent": "Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.0 Mobile/14E304 Safari/602.1", "referer": "https://test.xxx.com/", "clientName": "xxx.com", "clientTypeId": "0", "sequenceAtSession": "1", "sessionId": "8f098d91-9049-49d0-ae52-63dffda76936", "url": null, "dimension": null, "event": {"category": null, "action": "pageview", "label": null, }, "startDate": "2021-10-18T07:05:46.9244107+00:00", "endDate": "", "pageType": "homePage", "countryCode": "ZZ", "isp": "Private network", "usageType": "reserved", "organization": "Rfc 1918"}')

GroupByKey() 可以做到这一点,因为我所有消息的密钥都是“无”,如果我错了,请帮忙。谢谢

4

1 回答 1

0

看起来触发器没有被触发。由于您已隐式使用默认触发器,因此它应该在窗口结束时触发,并且允许延迟。

这可能是水印不前进的结果。您是否尝试在窗口结束后发送新事件?

于 2021-11-14T03:10:12.913 回答