0

我有一个名为客户的主题,我为它创建了一个流

CREATE STREAM customers_stream (customerId INT, isActive BOOLEAN)
  WITH (KAFKA_TOPIC='customers', VALUE_FORMAT='json');

我的customers主题制作人正在生成一个整数键和一个 json 值。但是当我看到行键被设置为某个二进制值时

ksql> print 'customers';
Format:JSON
{"ROWTIME":1570305904984,"ROWKEY":"\u0000\u0000\u0003�","customerId":1001,"isActive":true}
{"ROWTIME":1570307584257,"ROWKEY":"\u0000\u0000\u0003�","customerId":1002,"isActive":true}

现在,如果我创建一个表,它会产生一行(可能是因为行键相同??)

CREATE TABLE customers (customerId INT, isActive BOOLEAN)
  WITH (KAFKA_TOPIC='customers', KEY='customerId',VALUE_FORMAT='json');

在网上搜索后,我偶然发现了这篇文章https://www.confluent.io/stream-processing-cookbook/ksql-recipes/setting-kafka-message-key并通过对密钥重新分区创建了一个新流

CREATE STREAM customers_stream2 AS \
 SELECT * FROM customers_stream \
 PARTITION BY customerId;

那么如何创建一个包含最新客户数据值的表呢?

从流创建表会导致错误

CREATE TABLE customers_2_table_active AS
  SELECT CUSTOMERID,ISACTIVE
  FROM customers_stream2;

Invalid result type. Your SELECT query produces a STREAM. Please use CREATE STREAM AS SELECT statement instead.

我需要各个行的最新值,以便另一个微服务可以查询新表。

先感谢您

4

1 回答 1

1

重新设置密钥似乎是正确的方法,但是,您不能直接将 a 转换STREAM为 a TABLE

请注意,您的重新加密的流customers_stream2被写入相应的主题。因此,您应该能够TABLE从流的主题中创建一个新的以获取每个键的最新值。

于 2019-10-07T03:47:23.973 回答