0

我有一个流程,从 IBM 大型机 IIDR,我将记录发送到 Kafka 主题。进入 Kafka 主题的value_format消息是 AVRO,密钥也是 AVRO 格式。记录被推送到 Kafka 主题中。我有一个与该主题相关的流。但是记录不会传递到流中。主题示例test_iidr-

rowtime: 5/30/20 7:06:34 PM UTC, key: {"col1": "A", "col2": 1}, value: {"col1": "A", "col2": 11, "col3": 2, "iidr_tran_type": "QQ", "iidr_a_ccid": "0", "iidr_a_user": " ", "iidr_src_upd_ts": "2020-05-30 07:06:33.262931000", "iidr_a_member": " "}

流中的 value_format 是 AVRO 并且列名都被检查。

流创建查询 -

CREATE STREAM test_iidr (
  col1 STRING, 
  col2 DECIMAL(2,0),
  col3 DECIMAL(1,0),
  iidr_tran_type STRING,
  iidr_a_ccid STRING,
  iidr_a_user STRING,
  iidr_src_upd_ts STRING,
  iidr_a_member STRING) 
WITH (KAFKA_TOPIC='test_iidr', PARTITIONS=1, REPLICAS=3, VALUE_FORMAT='AVRO');

KEY由于声明中未提及,它是否无法从主题加载到流中WITH?模式注册表在其中注册了test_iidr-valuetest_iidr-key主题。

泊坞窗中的key.converterand设置为 - 。这是在制造这个问题吗?value.converterKafka-connectorg.apache.kafka.connect.json.JsonConverterJsonConverter

我用不同的流创建了一个完全不同的管道,并使用insert into语句手动插入了相同的数据。有效。只有 IIDR 流不起作用,并且记录不会从主题推送到流中。

我正在使用 Confluent kafka 5.5.0 版。

4

1 回答 1

1

连接配置中的JsonConverter很可能会将您的 Avro 数据转换为 JSON。

要确定键和值序列化格式,您可以使用PRINT命令(我可以看到您已经运行)。 PRINT运行时会输出键值格式。例如:

ksql> PRINT some_topic FROM BEGINNING LIMIT 1;
Key format: JSON or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 5/30/20 7:06:34 PM UTC, key: {"col1": "A", "col2": 1}, value: {"col1": "A", "col2": 11, "col3": 2, "iidr_tran_type": "QQ", "iidr_a_ccid": "0", "iidr_a_user": " ", "iidr_src_upd_ts": "2020-05-30 07:06:33.262931000", "iidr_a_member": " "}

因此,首先要检查的是 PRINT 为键和值输出的格式,然后CREATE相应地更新您的语句。

请注意,ksqlDB尚不支持 Avro/Json 键,因此您可能希望/需要重新分区数据,请参阅:https ://docs.ksqldb.io/en/latest/developer-guide/syntax-reference/#what-待办事项,如果您的密钥未设置或采用不同格式

旁注:如果值的架构存储在架构注册表中,那么您不需要在 CREATE 语句中定义列,因为 ksqlDB 将从架构注册表加载列

旁注:您不需要PARTITIONS=1, REPLICAS=3WITH现有主题的子句中,只有当您希望 ksqlDB 为您创建主题时。

于 2020-06-02T10:35:00.993 回答