0

我们的 JavaScript 程序每秒使用 kafka-rest 异步写入 20 条消息。我们尝试对传入消息进行聚合,但它返回一些不一致的结果。

请在下面找到主题、流和聚合结果表定义。

话题:

./bin/kafka-avro-console-producer –broker-list localhost:9092 –topic order_flow –property value.schema='{“type”:”record”,”name”:”myrecord”,”fields”:[{“name”:”OrderID”,”type”:”int”},{“name”:”OrderDate”,”type”:”long”},{“name”:”Status”,”type”:”string”},{“name”:”ProductID”,”type”:”int”}]}’

溪流:

CREATE STREAM ORDERS_SRC WITH (KAFKA_TOPIC=’order_flow’, VALUE_FORMAT=’AVRO’);

NEW STREAM - 此流使用实际事件日期而不是在 kafka 中写入消息的时间。

CREATE STREAM ORDERS WITH (TIMESTAMP =’ORDERDATE’) AS SELECT ORDERDATE,ORDERID, STATUS, PRODUCTID FROM ORDERS_SRC;

现在我们使用以下方法根据状态聚合数据:

CREATE TABLE ORDERS_AGG_SEC as select Status,Count(*) from ORDERS_D WINDOW TUMBLING(SIZE 1 SECONDS) GROUP BY STATUS;

现在当我们运行查询 SELECT * FROM ORDERS_AGG_SEC; 它返回以下结果

1522328177000 | Processing : Window{start=1522328177000 end=-} | Processing | 20
1522328178000 | Processing : Window{start=1522328178000 end=-} | Processing | 20
1522328179000 | Processing : Window{start=1522328179000 end=-} | Processing | 5
1522328179000 | Processing : Window{start=1522328179000 end=-} | Processing | 20
1522328180000 | Processing : Window{start=1522328180000 end=-} | Processing | 20
1522328181000 | Processing : Window{start=1522328181000 end=-} | Processing | 15
1522328181000 | Processing : Window{start=1522328181000 end=-} | Processing | 20
1522328182000 | Processing : Window{start=1522328182000 end=-} | Processing | 20
1522328183000 | Processing : Window{start=1522328183000 end=-} | Processing | 15
1522328183000 | Processing : Window{start=1522328183000 end=-} | Processing | 20
1522328184000 | Processing : Window{start=1522328184000 end=-} | Processing | 20
1522328185000 | Processing : Window{start=1522328185000 end=-} | Processing | 15
1522328185000 | Processing : Window{start=1522328185000 end=-} | Processing | 20
1522328186000 | Processing : Window{start=1522328186000 end=-} | Processing | 20
1522328187000 | Processing : Window{start=1522328187000 end=-} | Processing | 15
1522328187000 | Processing : Window{start=1522328187000 end=-} | Processing | 20
1522328188000 | Processing : Window{start=1522328188000 end=-} | Processing | 20
1522328189000 | Processing : Window{start=1522328189000 end=-} | Processing | 15
1522328189000 | Processing : Window{start=1522328189000 end=-} | Processing | 20
1522328190000 | Processing : Window{start=1522328190000 end=-} | Processing | 20
1522328191000 | Processing : Window{start=1522328191000 end=-} | Processing | 15

预期结果:对于处理状态,我应该每 1 秒获得 20 个计数

实际结果:对于相同的状态,我每 1 秒间隔获得多个记录,如下所示:

1522328179000 | Processing : Window{start=1522328179000 end=-} | Processing | 5
1522328179000 | Processing : Window{start=1522328179000 end=-} | Processing | 20

请在下面找到我的 javascript 代码:

function getRandomInt(min, max) {
    min = Math.ceil(min);
    max = Math.floor(max);
    return Math.floor(Math.random() * (max – min)) + min; //The maximum is exclusive and the minimum is inclusive
}
var orderdate = Date.now();
for (var i = 0; i < 20; i++) {
    var data = {
        "OrderID": getRandomInt(1, 20000),
        "OrderDate": orderdate,
        "Status": "Processing",
        "ProductID": getRandomInt(1, 10)
    }
    node.send({payload:data}); // this function asynchronously call kafka-rest api producer.
}

注意:kafka rest api 使用默认属性运行

4

1 回答 1

4

KSQL 使用 Kafka Streams 运行查询,您描述的行为是聚合结果的预期行为。每次新记录到达时,都会执行查询,并发出相应记录的更新结果。您可以通过设置commit.interval.ms和来配置发出结果的频率buffered.records.per.partition。您在聚合查询的输出中看到的结果确实是到目前为止的最新结果。例如

1522328179000 | Processing : Window{start=1522328179000 end=-} | Processing | 5

表示到目前为止计数为 5(这是部分结果),当您收到同一组和窗口的更多记录时,计数将被更新,并会发出更新的结果:

1522328179000 | Processing : Window{start=1522328179000 end=-} | Processing | 20

这意味着计数现在更新为 20。如果您想获得最新的结果,您可以在表格中具体化结果。这样,该表将始终具有每个组的最新计数。

于 2018-04-02T16:07:31.857 回答