0

Kafka、Flink 和 Tidb 的更新版本。假设我有三个源 MySql 表s_as_b和,并且s_c想要实时收集记录以定位 TiDb 表。映射规则是t_at_b

`s_a`  --> `t_a`                   
`s_b` union `s_c`   ---> `t_b`  with some transformation (e.g., field remapping).

我采用的方案是kafka + Flink with Tidb sink,binlog变化订阅Kafka topic;Flink 消费该主题并将转换后的结果写入 Tidb。flink代码部分对我来说的问题是:

  1. 如何可以轻松地将从 kafka 轮询的 json 字符串(具有操作信息、表信息)恢复为不同类型的 DTO 操作(例如,插入/创建t_at_b)。我找到了一个名为DebeziumKafka&Flink 连接器的工具,但它看起来需要源表和目标表之间的相等性。

  2. VKDataMapper如果我有多个目标表,如何编写转换。我很难定义它,T因为它可以是t_aDTO(数据传输对象)或t_bDTO。

我现有的示例代码如下:

//主程序。

   StreamExecutionEnvironment environment =
                StreamExecutionEnvironment.getExecutionEnvironment();
       //consume is FlinkkafkaConsumer. TopicFilter returns true. 
        environment.addSource(consumer).filter(new TopicFilter()).map(new VKDataMapper())
                .addSink(new TidbSink());

        try {
            environment.execute();
        } catch (Exception e) {
            log.error("exception {}", e);
        }
 
 
 public class VKDataMapper implements MapFunction<String, T> {

    @Override
    public T map(String value) throws Exception {
        //How T can represents both `T_a data DTO` `T_b`...., 
        return null;
    }

}
4

1 回答 1

0

为什么不试试 Flink SQL?这样,你只需要在 Flink 中创建一些表,然后通过 sql 定义你的任务,如:

insert into t_a select * from s_a;
insert into t_b select * from s_b union select * from s_c;

请参阅https://github.com/LittleFall/flink-tidb-rdw中的一些示例,请随时提出任何让您感到困惑的问题。

于 2020-11-27T08:56:59.923 回答