0

我想在一个包含 group by 语句的表中使用 Flink SQL 进行简单查询。但在结果中,group by 语句中指定的列有重复的行。那是因为我使用流媒体环境并且它不记得状态吗?

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// configure Kafka consumer
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092"); // Broker default host:port
props.setProperty("group.id", "flink-consumer"); // Consumer group ID

FlinkKafkaConsumer011<BlocksTransactions> flinkBlocksTransactionsConsumer = new FlinkKafkaConsumer011<>(args[0], new BlocksTransactionsSchema(), props);
flinkBlocksTransactionsConsumer.setStartFromEarliest();

DataStream<BlocksTransactions> blocksTransactions = env.addSource(flinkBlocksTransactionsConsumer);


tableEnv.registerDataStream("blocksTransactionsTable", blocksTransactions);

Table sqlResult
        = tableEnv.sqlQuery(
                "SELECT block_hash, count(tx_hash) " +
                "FROM blocksTransactionsTable " +
                "GROUP BY block_hash");

DataStream<Test> resultStream = tableEnv
        .toRetractStream(sqlResult, Row.class)
        .map(t -> {
            Row r = t.f1;
            String field2 = r.getField(0).toString();
            long count = Long.valueOf(r.getField(1).toString());
            return new Test(field2, count);
        })
        .returns(Test.class);

resultStream.print();

resultStream.addSink(new FlinkKafkaProducer011<>("localhost:9092", "TargetTopic", new TestSchema()));

env.execute();

我对 block_hash 列使用 group by 语句,但我有好几次相同的 block_hash。这是 print() 的结果:

Test{field2='0x2c4a021d514e4f8f0beb8f0ce711652304928528487dc7811d06fa77c375b5e1', count=1} Test{field2='0x2c4a021d514e4f8f0beb8f0ce711652304928528487dc7811d06fa77c375b5e1', count=1} Test{field2='0x2c4a021d514e4f8f0beb8f0ce711652304928528487dc7811d06fa77c375b5e1', count=2} Test{field2='0x780aadc08c294da46e174fa287172038bba7afacf2dff41fdf0f6def03906e60', count=1} Test{ field2 ='0x182D31BD491527E93C4E44686057207EEE9EE90C6A84283308A2BD7BD2E10E53',count = 1} test = 1} test2 ='0x182D31BD491BD4911527E11527E1152ARINGER

如何在不使用 BatchEnvironment 的情况下解决此问题?

4

1 回答 1

3

在流上运行的GROUP BY查询必须产生更新。考虑以下示例:

SELECT user, COUNT(*) FROM clicks GROUP BY user;

每次,clicks表接收到一个新行,相应的计数user需要递增和更新。

当您将 a 转换Table为 aDataStream时,这些更新必须在流中进行编码。Flink 使用撤回和添加消息来做到这一点。通过调用tEnv.toRetractStream(table, Row.class),您将 转换Table tableDataStream<Tuple2<Boolean, Row>。该Boolean标志很重要,它指示Row是从结果表中添加还是撤回。

给定上面的示例查询和输入表clicks

user | ...
------------
Bob  | ...
Liz  | ...
Bob  | ...

您将收到以下撤稿流

(+, (Bob, 1)) // add first result for Bob
(+, (Liz, 1)) // add first result for Liz
(-, (Bob, 1)) // remove outdated result for Bob
(+, (Bob, 2)) // add updated result for Bob

您需要自己主动维护结果,并按照Boolean撤回流的标志指示添加和删除行。

于 2018-08-31T09:29:30.380 回答