我已经用主题'topic1'
(在模式注册表中使用模式)测试了 Kafka 集群。
我从这个主题创建了流:
CREATE STREAM topic1_basic_stream
WITH (KAFKA_TOPIC='topic1', VALUE_FORMAT='AVRO', TIMESTAMP='triggertime', TIMESTAMP_FORMAT='yyyyMMddHHmmssX');
然后我创建了聚合表作为选择推送查询(没有窗口),只有今天的数据:
CREATE TABLE topic1_agg_table_JSON WITH(VALUE_FORMAT='JSON') AS
select
concat(product_type,' - ',region) id
,sum(sale_charge) sum_sale_charge
,count(1) cnt
from topic1_basic_stream
where TIMESTAMPTOSTRING(rowtime, 'yyyyMMdd') = TIMESTAMPTOSTRING(UNIX_TIMESTAMP(), 'yyyyMMdd')
group by concat(product_type,' - ',region)
EMIT CHANGES;
从 ksqlDB CLI 我运行并看到 OK 结果:
SET 'auto.offset.reset'='earliest';
select * from topic1_agg_table_JSON EMIT CHANGES;
+----------------------+---------------------------+-----+
|id |sum_sale_charge |CNT |
+----------------------+---------------------------+-----+
|Pen - London |90.0 |45 |
|Book - Paris |45.0 |9 |
|Pen - Amsterdam |26.0 |13 |
|Keyboard - Oslo |60.0 |6 |
|Pen - London |92.0 |46 |
Press CTRL-C to interrupt
我也可以topic1_agg_table_JSON
在主题列表中看到,里面有 json 消息。
目标:我想在 node.js 中编写使用者以将这些消息(使用 websockets)发送到浏览器(客户端)并在客户端(实时)可视化它。
已经尝试过: kafka-node
模块。示例代码取自https://github.com/SOHU-Co/kafka-node/blob/master/example/consumer.js使用简单的原始(简单)Kafka 主题,在这段代码中一切正常,但如果我会改变我的主题topic1_agg_table_JSON
,它不会抛出任何错误,也不会打印任何消息。
问题:topic1_agg_table_JSON
使用 Node.js使用数据的正确方法是什么?