0

我们使用 IBM AS400 作为源表。我想做的是:

  1. 使用描述性column_name(如“收盘价”)而不是system_column_name(“CPHKD001”)
  2. 将时间戳格式“2020-03-10 18:25:31.123456000000”转换为“2020-03-10T18:25:31.123Z”

我想我应该处理文件KcopLiveAuditSingleRowIntegrated.java。对于时间戳,它可能在那里有解决方案,但我认为我无法在那里找到答案。

4

1 回答 1

2

KCOP 基础架构允许您以编程方式控制 Kafka 生产者记录。对于源上的每个操作,您可以确定将多少条消息写入 Kafka,它们也包含哪些主题,以及键和值字节是什么。

在 KCOP 中,您可以使用 java 将时间戳重新定义为您喜欢的任何格式。这是因为对 createProducerRecords 的每次调用都会为您提供建议的 Avro 通用记录,该记录具有允许您识别表、列及其类型的架构。

根据开源 Avro 文档中记录的 Avro 通用记录行为,然后您可以选择您感兴趣的相关值并使用转换后的值创建一个新的 Avro 通用记录。然后将这个新的 Avro Generic Record 传递到 KCOP 的其余部分。

请注意,审计 KCOP 包含执行此操作的代码,单行 Avro 审计 KCOP 就是一个很好的例子。您可以在产品安装的 samples.jar 文件中找到我们所有集成 KCOP 的代码。

单行 Avro 审计 KCOP 采用之前和之后的 Avro 通用记录,并生成由它们组合而成的新 Avro 通用记录。如果在复制值时检查了列的类型,则可以识别时间戳并更改要放入新的复合 Avro 通用记录中的值。

但是,根据以下链接,我们确实提供了一些预格式化的灵活性

https://www.ibm.com/support/knowledgecenter/en/SSTRGZ_11.4.0/com.ibm.cdcdoc.mcadminguide.doc/refs/mirror_timestamp_write_format_Kafka.html

请注意,要使用此可选的预格式化程序,您需要“要启用此参数,您必须将 mirror_write_format 参数设置为 DYNAMIC。”

您会注意到,可以设置一个数据存储参数,该参数通常允许对您的 KCOP 接收的时间戳数据进行所需的自定义。

“AVRO(默认)

将 TIMESTAMP 列值格式化为 UNIX 纪元 1970 年 1 月 1 日(ISO 日历)的微秒数。出于此计算的目的,时间戳假定为 UTC。

对于精度大于微秒的 TIMESTAMP 列,值被格式化为字符串。您可以使用 timestamp_format 数据存储参数指定字符串格式。timestamp_format 的默认值为 yyyy-MM-dd HH:mm:ss.SSSNNNnnnppp。

TIMESTAMP WITH TIMEZONE 列值被格式化为字符串。您可以使用 timestamp_tz_format 数据存储参数指定字符串格式。timestamp_tz_format 的默认值为 yyyy-MM-dd HH:mm:ss.SSSNNNnnnppp T。"

如果这不能为您提供所需的确切格式,那么您可以选择最接近的格式并修改 KCOP 中时间戳列的值。

修改KCOP如下...

https://www.ibm.com/support/knowledgecenter/en/SSTRGZ_11.4.0/com.ibm.cdcdoc.mcadminguide.doc/tasks/createkafkacop.html

关于列名的问题是相同的。如果使用的 KCOP 使用 Avro 模式注册表,那么您可以以编程方式更改向模式注册表注册的模式。如果它是 JSON 字符串,您可以在创建 JSON 字符串后更改它,或者在调用 Avro to JSON 方法之前进行更改。

或者,如果您的源支持派生列,我相信在管理控制台中,您可以使用新名称定义派生列,该名称只是原始列的值。我的回忆是您可以取消选择原始列,因此您的名称将被更改。

于 2020-03-10T20:40:30.433 回答