0

我们正在尝试从 DB-cdc 连接器(upsert 行为)表加入。使用“kafka”事件源,通过现有的 cdc 数据键丰富这些事件。kafka-source (id, B, C) + cdc (id, D, E, F) = result(id, B, C, D, E, F) 放入kafka sink(追加)

INSERT INTO sink (zapatos, naranjas, device_id, account_id, user_id) 
SELECT zapatos, naranjas, source.device_id, account_id, user_id FROM source 
JOIN mongodb_source ON source.device_id = mongodb_source._id

问题是,这仅在我们的 kafka sink 是“upsert-kafka”时才有效。但这在数据库中删除时创建了墓碑。我们只需要表现得像普通事件,而不是变更日志。但我们不能只使用“kafka”接收器,因为 db 连接器是 upsert 所以不兼容......

这样做的方法是什么?将 upsert 转换为仅附加事件?

s_env = StreamExecutionEnvironment.get_execution_environment()
    s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
    s_env.set_parallelism(1)
    # use blink table planner
    st_env = StreamTableEnvironment \
        .create(s_env, environment_settings=EnvironmentSettings
                .new_instance()
                .in_streaming_mode()
                .use_blink_planner().build())

   ddl = """CREATE TABLE sink (
            `zapatos` INT,
            `naranjas` STRING,
            `account_id` STRING,
            `user_id` STRING,
            `device_id` STRING,
            `time28` INT,
            PRIMARY KEY (device_id) NOT ENFORCED
        ) WITH (
            'connector' = 'upsert-kafka',
            'topic' = 'as-test-output-flink-topic',
            'properties.bootstrap.servers' = 'kafka:9092',
            'properties.group.id' = 'testGroup',
            'key.format' = 'raw',
            'value.format' = 'json',
            'value.fields-include' = 'EXCEPT_KEY'
        )
        """
    st_env.sql_update(ddl)
    
    ddl = """CREATE TABLE source (
            `device_id` STRING,
            `timestamp` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
            `event_type` STRING,
            `payload` ROW<`zapatos` INT, `naranjas` STRING, `time28` INT, `device_id` STRING>,
            `trace_id` STRING
        ) WITH (
            'connector' = 'kafka',
            'topic' = 'as-test-input-flink-topic',
            'properties.bootstrap.servers' = 'kafka:9092',
            'properties.group.id' = 'testGroup',
            'key.format' = 'raw',
            'key.fields' = 'device_id',
            'value.format' = 'json',
            'value.fields-include' = 'EXCEPT_KEY'
        )
        """
    st_env.sql_update(ddl)
    
    ddl = """
    CREATE TABLE mongodb_source (
        `_id` STRING PRIMARY KEY, 
        `account_id` STRING,
        `user_id` STRING,
        `device_id` STRING
    ) WITH (
        'connector' = 'mongodb-cdc',
        'uri' = '******',
        'database' = '****',
        'collection' = 'testflink'
    )
    """
    st_env.sql_update(ddl)


    st_env.sql_update("""
        INSERT INTO sink (zapatos, naranjas, device_id, account_id, user_id) 
        SELECT zapatos, naranjas, source.device_id, account_id, user_id FROM source 
        JOIN mongodb_source ON source.device_id = mongodb_source._id
     """)

# execute
    st_env.execute("kafka_to_kafka")

不要介意 Mongo-cdc 连接器,它是新的,但可以用作 mysql-cdc 或 postgre-cdc。

谢谢你的帮助!

4

1 回答 1

0

您是否尝试过使用 LEFT JOIN 而不是 JOIN?如果您的目的只是丰富卡夫卡事件(如果有来自 mongo 的事件),那么它不应该创建墓碑......</p>

于 2022-02-26T23:54:24.070 回答