2

我正在尝试使用 Pyflink 的 JdbcSink 连接到 Oracle 的 ADB 实例。我可以在 Flink 的官方文档中找到使用 java 的 JdbcSink 示例。但是没有为 Python API 提供相同的内容。我试图将 JdbcSink 可以在 java 中实现的方式复制到 python 中,但是 Pyflink 的 JdbcSink 的方法签名与 Java 的 JdbcSink 不同。而且我找不到任何关于使用 Pyflink 的 JdbcSink 的示例或文档。我是 pyflink 的新手。这是我一直在尝试做的事情:

env = StreamExecutionEnvironment.get_execution_environment()

kafka_consumer = FlinkKafkaConsumer(
    topics='TestStream',
    deserialization_schema=SimpleStringSchema(),
    properties={'bootstrap.servers': bootstrapServers,
                'group.id': consumerGroupName,
                'enable.auto.commit': 'false',
                'session.timeout.ms': "6000",
                'security.protocol': 'SASL_SSL',
                'sasl.mechanism': 'PLAIN',
                'auto.offset.reset': 'earliest',
                'sasl.jaas.config': value
                })
ds = env.add_source(kafka_consumer)

ds.add_sink(JdbcSink.sink("insert into log_data (log_line) values (?)",
                          Types.ROW([Types.STRING()]),
                          JdbcConnectionOptions.JdbcConnectionOptionsBuilder().with_url(jdbc_url).with_password(jdbc_password).with_user_name(jdbc_user).with_driver_name(jdbc_driver).build(),
                          JdbcExecutionOptions.builder().with_batch_size(10).with_batch_interval_ms(200).with_max_retries(5).build()))

env.execute()
4

0 回答 0