我正在尝试使用 PyFlink 读取和写入 Kafka 来运行一个简单的示例。我收到的消息类型如下:
{"value": "tails", "eventTime": "2021-10-31 10:52:33.816026"}
{"value": "head", "eventTime": "2021-10-31 12:10:47.221865"}
我收到的错误是:
pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
at ......
at ......
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Reason: Required context properties mismatch.
The following properties are requested:
connector.properties.bootstrap.servers=<IP>:9092
connector.properties.group.id=test_group
connector.startup-mode=latest-offset
connector.topic=headsortails
connector.type=kafka
connector.version=1.14
format.type=json
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=value
schema.1.data-type=VARCHAR(2147483647)
schema.1.name=eventTime
schema.2.data-type=TIMESTAMP(3)
schema.2.expr=TO_TIMESTAMP(`eventTime`)
schema.2.name=rt
schema.watermark.0.rowtime=rt
schema.watermark.0.strategy.data-type=TIMESTAMP(3)
schema.watermark.0.strategy.expr=`rt` - INTERVAL '2' SECOND
我看过其他 Stackoverflow 问题,他们专注于可能缺少的那种罐子。但是,我已经添加了 Kafka 连接器 Jar,然后是 Json 一个。我不确定我是否缺少更多的罐子,因为我是 Flink 的新手。
如前所述,尽管未来的目标是根据每条消息的 EventTime 分析窗口,但现在我只是尝试将我从源主题收到的完全相同的消息发送到不同的 Kafka 主题. 到目前为止,请找到我根据找到的一些文档构建的代码。我试图删除水印(这是源表和接收表之间的唯一区别),但我仍然得到完全相同的错误:
from pyflink.common.serialization import Encoder, JsonRowDeserializationSchema, JsonRowSerializationSchema, SimpleStringSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.datastream.connectors import StreamingFileSink, FlinkKafkaConsumer, FlinkKafkaProducer
from pyflink.table import EnvironmentSettings, StreamTableEnvironment, DataTypes
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.add_jars("file:////home/<userName>/HadoopEcosystem/Flink/kafkaConnector/flink-sql-connector-kafka_2.12-1.14.0.jar")
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=env_settings)
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)
t_env.get_config().get_configuration().set_string("pipeline.jars", "file:////home/<userName>/HadoopEcosystem/Flink/kafkaConnector/flink-json-1.14.0.jar;file:////home/<userName>/HadoopEcosystem/Flink/kafkaConnector/flink-sql-connector-kafka_2.12-1.14.0.jar")
source = f"""
CREATE TABLE sourcetable(
`value` VARCHAR,
eventTime VARCHAR,
rt as TO_TIMESTAMP(eventTime),
WATERMARK FOR rt as rt - INTERVAL '2' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.version' = '1.14',
'connector.topic' = 'headsortails',
'connector.properties.bootstrap.servers' = '<IP>:9092',
'connector.properties.group.id' = 'test_group',
'connector.startup-mode' = 'latest-offset',
'format.type' = 'json'
)
"""
sink = f"""
CREATE TABLE sinktable (
`value` VARCHAR,
eventTime VARCHAR,
rt as TO_TIMESTAMP(eventTime)
) with (
'connector.type' = 'kafka',
'connector.version' = '1.14',
'connector.topic' = 'sinktopic',
'connector.properties.bootstrap.servers' = '<IP>:9092',
'connector.properties.group.id' = 'test_group',
'connector.startup-mode' = 'latest-offset',
'format.type' = 'json'
)
"""
t_env.sql_update(source)
t_env.sql_update(sink)
query = """
SELECT *
FROM sourcetable
"""
t_env.sql_query(query).insert_into("sinktable")
t_env.execute("trial")
如果有人可以在这个问题上帮助我,我将不胜感激,这应该与依赖关系有关,但信息不清楚。我试图更改代码,但到目前为止没有任何效果。
就像澄清一样,<userName>
并由<IP>
我的脚本中的正确值更改。
编辑:
我使用的 PyFlink 版本是apache-flink 1.12.5
我用 Pip3 安装的。我之前尝试过使用最新版本的 PyFlink (1.14.0),但我意识到Dataproc当前运行 Flink 1.12.5