Kafka、Flink 和 Tidb 的更新版本。假设我有三个源 MySql 表s_a
、s_b
和,并且s_c
想要实时收集记录以定位 TiDb 表。映射规则是t_a
t_b
`s_a` --> `t_a`
`s_b` union `s_c` ---> `t_b` with some transformation (e.g., field remapping).
我采用的方案是kafka + Flink with Tidb sink,binlog变化订阅Kafka topic;Flink 消费该主题并将转换后的结果写入 Tidb。flink代码部分对我来说的问题是:
如何可以轻松地将从 kafka 轮询的 json 字符串(具有操作信息、表信息)恢复为不同类型的 DTO 操作(例如,插入/创建
t_a
或t_b
)。我找到了一个名为Debezium
Kafka&Flink 连接器的工具,但它看起来需要源表和目标表之间的相等性。VKDataMapper
如果我有多个目标表,如何编写转换。我很难定义它,T
因为它可以是t_a
DTO(数据传输对象)或t_b
DTO。
我现有的示例代码如下:
//主程序。
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
//consume is FlinkkafkaConsumer. TopicFilter returns true.
environment.addSource(consumer).filter(new TopicFilter()).map(new VKDataMapper())
.addSink(new TidbSink());
try {
environment.execute();
} catch (Exception e) {
log.error("exception {}", e);
}
public class VKDataMapper implements MapFunction<String, T> {
@Override
public T map(String value) throws Exception {
//How T can represents both `T_a data DTO` `T_b`....,
return null;
}
}