我最近在 Pyflink 中看到了可以通过 Table API 在 flink 中使用 pandas 数据帧的文档。因此,我的目标是:
- 从 Kafka 源接收数据流
- 转换为表 API 实例 -> 然后可以转换为 Pandas
- --- Pandas 处理逻辑
- 将 pandas 数据帧转换回 Table 实例
- 然后将其转换回数据流并沉入kafka
根据 flink 文档,我引用了转换Datastream <-> Table Instance和Table <-> pandas之间的代码。
import os
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.serialization import Encoder
from pyflink.common.serialization import JsonRowDeserializationSchema, JsonRowSerializationSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FileSink, StreamingFileSink
from pyflink.datastream.connectors import FlinkKafkaConsumer, FlinkKafkaProducer
from pyflink.datastream.data_stream import DataStream
from pyflink.datastream.functions import MapFunction
from pyflink.table.table import Table
from pyflink.table.table_environment import StreamTableEnvironment
KAFKA_SERVERS = os.getenv('KAFKA_BS_SERVERS',"kafka:9094").split(',')
KAFKA_USERNAME = "user"
KAFKA_PASSWORD = "123"
KAFKA_SOURCE_TOPIC = 'topic_one'
KAFKA_SINK_TOPIC = 'topic_two'
# creating a kafka source for the pipeline
def create_kafka_source(usern: str, password: str, topic: str):
kafka_props = {
'bootstrap.servers': ','.join(KAFKA_SERVERS),
'group.id': 'testgroup12',
'auto.offset.reset': 'earliest',
'sasl.mechanism': 'PLAIN',
'sasl.jaas.config' : f"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{usern}\" password=\"{password}\";", # correct one
'security.protocol': 'SASL_PLAINTEXT',
"enable.auto.commit": "true",
"auto.commit.enable": "true",
"auto.commit.interval.ms": "1000",
"session.timeout.ms": "30000",
"value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
"key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
}
kafka_source = FlinkKafkaConsumer(
topics=[topic],
deserialization_schema=SimpleStringSchema(),
properties= kafka_props
)
kafka_source.set_commit_offsets_on_checkpoints(True)
return kafka_source
# creating a kafka sink for the pipeline
def create_kafka_sink(usern: str, password: str, topic: str):
kafka_producer = FlinkKafkaProducer(
topic= topic,
serialization_schema=SimpleStringSchema(),
producer_config= {
'bootstrap.servers': ','.join(KAFKA_SERVERS),
'group.id': 'testgroup12',
'sasl.mechanism': 'PLAIN',
'sasl.jaas.config' : f"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{usern}\" password=\"{password}\";", # correct one
'security.protocol': 'SASL_PLAINTEXT'
}
)
return kafka_producer
# the pipeline which will run
def pipeline():
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///opt/flink/lib_py/kafka-clients-2.4.1.jar")
env.add_jars("file:///opt/flink/lib_py/flink-connector-kafka_2.12-1.14.0.jar")
env.set_parallelism(1)
env.enable_checkpointing(5000)
t_env = StreamTableEnvironment.create(stream_execution_environment= env)
kafka_source = create_kafka_source(KAFKA_USERNAME, KAFKA_PASSWORD, KAFKA_SOURCE_TOPIC)
ds = env.add_source(kafka_source)
# Stream to Table
table : Table = t_env.from_data_stream(ds)
pd = table.to_pandas()
# custom pandas logic
pd["Testing"] = 'new Vals'
# Table to stream
table = t_env.from_pandas(pd,pd.columns.tolist())
t_env.create_temporary_view("InputTable", table)
res_table = t_env.sql_query("SELECT * FROM InputTable")
res_ds = t_env.to_data_stream(res_table)
# Sink to file and Kafka
res_ds.add_sink(StreamingFileSink
.for_row_format('/opt/flink/outputs_dumps', Encoder.simple_string_encoder())
.build())
kafka_sink = create_kafka_sink(KAFKA_USERNAME, KAFKA_PASSWORD, KAFKA_SINK_TOPIC)
res_ds.add_sink(kafka_sink)
env.execute("proto_1")
if __name__ == '__main__':
pipeline()
将其提交给 flink 时,将创建作业而不会出现任何错误或异常:
$ /opt/flink/bin/flink 运行 --python script.py
但是在flink UI上可以看到job name没有分别注册
以及未反映在输出主题上的熊猫逻辑。(1)作为源数据包接收的 json 数据包,(2) pandas 本质上为数据包添加了一个新值,(3)然后应该将此数据包接收回输出主题
收到的源主题:
{“气缸”:8.0,“排量”:360.0,“马力”:215.0,“重量”:4615.0,“加速度”:14.0,“车型年”:70.0,“美国”:1.0,“欧洲”:0.0, “日本”:0.0}
目标主题的输出('testing': 'New vals' not added):
{“气缸”:8.0,“排量”:360.0,“马力”:215.0,“重量”:4615.0,“加速度”:14.0,“车型年”:70.0,“美国”:1.0,“欧洲”:0.0, “日本”:0.0}
如果我的方法不正确,有人可以告诉我正确的实施方式吗?这应该作为无界流操作工作(不是作为批处理操作,如果我的术语在这里正确......)