我们的 JavaScript 程序每秒使用 kafka-rest 异步写入 20 条消息。我们尝试对传入消息进行聚合,但它返回一些不一致的结果。
请在下面找到主题、流和聚合结果表定义。
话题:
./bin/kafka-avro-console-producer –broker-list localhost:9092 –topic order_flow –property value.schema='{“type”:”record”,”name”:”myrecord”,”fields”:[{“name”:”OrderID”,”type”:”int”},{“name”:”OrderDate”,”type”:”long”},{“name”:”Status”,”type”:”string”},{“name”:”ProductID”,”type”:”int”}]}’
溪流:
CREATE STREAM ORDERS_SRC WITH (KAFKA_TOPIC=’order_flow’, VALUE_FORMAT=’AVRO’);
NEW STREAM - 此流使用实际事件日期而不是在 kafka 中写入消息的时间。
CREATE STREAM ORDERS WITH (TIMESTAMP =’ORDERDATE’) AS SELECT ORDERDATE,ORDERID, STATUS, PRODUCTID FROM ORDERS_SRC;
现在我们使用以下方法根据状态聚合数据:
CREATE TABLE ORDERS_AGG_SEC as select Status,Count(*) from ORDERS_D WINDOW TUMBLING(SIZE 1 SECONDS) GROUP BY STATUS;
现在当我们运行查询 SELECT * FROM ORDERS_AGG_SEC; 它返回以下结果
1522328177000 | Processing : Window{start=1522328177000 end=-} | Processing | 20
1522328178000 | Processing : Window{start=1522328178000 end=-} | Processing | 20
1522328179000 | Processing : Window{start=1522328179000 end=-} | Processing | 5
1522328179000 | Processing : Window{start=1522328179000 end=-} | Processing | 20
1522328180000 | Processing : Window{start=1522328180000 end=-} | Processing | 20
1522328181000 | Processing : Window{start=1522328181000 end=-} | Processing | 15
1522328181000 | Processing : Window{start=1522328181000 end=-} | Processing | 20
1522328182000 | Processing : Window{start=1522328182000 end=-} | Processing | 20
1522328183000 | Processing : Window{start=1522328183000 end=-} | Processing | 15
1522328183000 | Processing : Window{start=1522328183000 end=-} | Processing | 20
1522328184000 | Processing : Window{start=1522328184000 end=-} | Processing | 20
1522328185000 | Processing : Window{start=1522328185000 end=-} | Processing | 15
1522328185000 | Processing : Window{start=1522328185000 end=-} | Processing | 20
1522328186000 | Processing : Window{start=1522328186000 end=-} | Processing | 20
1522328187000 | Processing : Window{start=1522328187000 end=-} | Processing | 15
1522328187000 | Processing : Window{start=1522328187000 end=-} | Processing | 20
1522328188000 | Processing : Window{start=1522328188000 end=-} | Processing | 20
1522328189000 | Processing : Window{start=1522328189000 end=-} | Processing | 15
1522328189000 | Processing : Window{start=1522328189000 end=-} | Processing | 20
1522328190000 | Processing : Window{start=1522328190000 end=-} | Processing | 20
1522328191000 | Processing : Window{start=1522328191000 end=-} | Processing | 15
预期结果:对于处理状态,我应该每 1 秒获得 20 个计数
实际结果:对于相同的状态,我每 1 秒间隔获得多个记录,如下所示:
1522328179000 | Processing : Window{start=1522328179000 end=-} | Processing | 5
1522328179000 | Processing : Window{start=1522328179000 end=-} | Processing | 20
请在下面找到我的 javascript 代码:
function getRandomInt(min, max) {
min = Math.ceil(min);
max = Math.floor(max);
return Math.floor(Math.random() * (max – min)) + min; //The maximum is exclusive and the minimum is inclusive
}
var orderdate = Date.now();
for (var i = 0; i < 20; i++) {
var data = {
"OrderID": getRandomInt(1, 20000),
"OrderDate": orderdate,
"Status": "Processing",
"ProductID": getRandomInt(1, 10)
}
node.send({payload:data}); // this function asynchronously call kafka-rest api producer.
}
注意:kafka rest api 使用默认属性运行