我正在尝试使用 Pyflink 的 JdbcSink 连接到 Oracle 的 ADB 实例。我可以在 Flink 的官方文档中找到使用 java 的 JdbcSink 示例。但是没有为 Python API 提供相同的内容。我试图将 JdbcSink 可以在 java 中实现的方式复制到 python 中,但是 Pyflink 的 JdbcSink 的方法签名与 Java 的 JdbcSink 不同。而且我找不到任何关于使用 Pyflink 的 JdbcSink 的示例或文档。我是 pyflink 的新手。这是我一直在尝试做的事情:
env = StreamExecutionEnvironment.get_execution_environment()
kafka_consumer = FlinkKafkaConsumer(
topics='TestStream',
deserialization_schema=SimpleStringSchema(),
properties={'bootstrap.servers': bootstrapServers,
'group.id': consumerGroupName,
'enable.auto.commit': 'false',
'session.timeout.ms': "6000",
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'auto.offset.reset': 'earliest',
'sasl.jaas.config': value
})
ds = env.add_source(kafka_consumer)
ds.add_sink(JdbcSink.sink("insert into log_data (log_line) values (?)",
Types.ROW([Types.STRING()]),
JdbcConnectionOptions.JdbcConnectionOptionsBuilder().with_url(jdbc_url).with_password(jdbc_password).with_user_name(jdbc_user).with_driver_name(jdbc_driver).build(),
JdbcExecutionOptions.builder().with_batch_size(10).with_batch_interval_ms(200).with_max_retries(5).build()))
env.execute()