我正在使用 PyFlink 创建一个流处理器。当我将 Kafka 连接到 Flink 时,一切正常。但是当我将 json 数据发送到 kafka 时,PyFlink 会接收到它,但反序列化器会将其转换为 null。PyFlink 代码是
from pyflink.common.serialization import Encoder
from pyflink.datastream.connectors import StreamingFileSink
from pyflink.common.serialization import JsonRowDeserializationSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common import Row
# Starting Flink app
def start_flink_app():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.add_jars(
"file:///Users/samprabin/Documents/xealei_fall_detector/dataProcessorAndClassifier/jar/flink-sql-connector-kafka_2.11-1.12.3.jar")
deserialization_schema = JsonRowDeserializationSchema.builder() \
.type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()
kafka_consumer = FlinkKafkaConsumer(
topics='quickstart-events',
deserialization_schema=deserialization_schema,
properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})
ds = env.add_source(kafka_consumer)
ds.print()
env.execute("tutorial_job1")
if __name__ == "__main__":
print('Main program started...')
start_flink_app()
卡夫卡生产者代码是
from kafka import KafkaProducer
from json import dumps
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8'))
data = {"name":"tom"}
producer.send('quickstart-events', value=data)
请让我知道如何在 PyFlink 中接收 json 数据