0

我已经用主题'topic1'(在模式注册表中使用模式)测试了 Kafka 集群。

我从这个主题创建了流:

CREATE STREAM topic1_basic_stream
WITH (KAFKA_TOPIC='topic1', VALUE_FORMAT='AVRO', TIMESTAMP='triggertime', TIMESTAMP_FORMAT='yyyyMMddHHmmssX');

然后我创建了聚合表作为选择推送查询(没有窗口),只有今天的数据:

CREATE TABLE topic1_agg_table_JSON WITH(VALUE_FORMAT='JSON') AS
select
   concat(product_type,' - ',region) id
  ,sum(sale_charge) sum_sale_charge
  ,count(1) cnt
from topic1_basic_stream
where TIMESTAMPTOSTRING(rowtime, 'yyyyMMdd') = TIMESTAMPTOSTRING(UNIX_TIMESTAMP(), 'yyyyMMdd')
group by concat(product_type,' - ',region)
EMIT CHANGES;

从 ksqlDB CLI 我运行并看到 OK 结果:

SET 'auto.offset.reset'='earliest';
select * from topic1_agg_table_JSON EMIT CHANGES;
+----------------------+---------------------------+-----+
|id                    |sum_sale_charge            |CNT  |
+----------------------+---------------------------+-----+
|Pen - London          |90.0                       |45   |
|Book - Paris          |45.0                       |9    |
|Pen - Amsterdam       |26.0                       |13   |
|Keyboard - Oslo       |60.0                       |6    |
|Pen - London          |92.0                       |46   |

Press CTRL-C to interrupt

我也可以topic1_agg_table_JSON在主题列表中看到,里面有 json 消息。

目标:我想在 node.js 中编写使用者以将这些消息(使用 websockets)发送到浏览器(客户端)并在客户端(实时)可视化它。

已经尝试过: kafka-node模块。示例代码取自https://github.com/SOHU-Co/kafka-node/blob/master/example/consumer.js使用简单的原始(简单)Kafka 主题,在这段代码中一切正常,但如果我会改变我的主题topic1_agg_table_JSON,它不会抛出任何错误,也不会打印任何消息。

问题:topic1_agg_table_JSON使用 Node.js使用数据的正确方法是什么?

4

1 回答 1

0

解决。问题出在过时的kafka-node模块版本中。在 update( npm update kafka-node) 之后,app 成功地消费了来自 topic 的所有消息,但不像我预期的那样(目标不是接收所有消息,而是只接收每条消息的最新版本,这是另一个问题)。

于 2021-04-15T14:08:26.653 回答