1

我正在使用 PyFlink 创建一个流处理器。当我将 Kafka 连接到 Flink 时,一切正常。但是当我将 json 数据发送到 kafka 时,PyFlink 会接收到它,但反序列化器会将其转换为 null。PyFlink 代码是

from pyflink.common.serialization import Encoder
from pyflink.datastream.connectors import StreamingFileSink
from pyflink.common.serialization import JsonRowDeserializationSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common import Row


# Starting Flink app
def start_flink_app():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)

    env.add_jars(
        "file:///Users/samprabin/Documents/xealei_fall_detector/dataProcessorAndClassifier/jar/flink-sql-connector-kafka_2.11-1.12.3.jar")
    deserialization_schema = JsonRowDeserializationSchema.builder() \
        .type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()



    kafka_consumer = FlinkKafkaConsumer(
        topics='quickstart-events',
        deserialization_schema=deserialization_schema,
        properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})

    ds = env.add_source(kafka_consumer)
    ds.print()

    env.execute("tutorial_job1")

if __name__ == "__main__":
    print('Main program started...')
    start_flink_app()

卡夫卡生产者代码是

from kafka import KafkaProducer
from json import dumps

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x: dumps(x).encode('utf-8'))

data = {"name":"tom"}

producer.send('quickstart-events', value=data)

请让我知道如何在 PyFlink 中接收 json 数据

4

2 回答 2

0

我面临同样的问题。我所做的是使用 flink kafka producer示例中给出的相同序列化器/反序列化器并在主题中生成输出。我发现我们必须使用以下格式:

{"f0": 123, "f1": "ddd"}

然后它按预期工作而没有给出它之前给出的 null null 。

于 2021-07-21T15:41:27.267 回答
0

通过使用 Types.ROW_NAMED 而不是 Types.ROW 解决了问题。然后提供您的字段名称。

    deserialization_schema = JsonRowDeserializationSchema.builder().type_info(
                             type_info=Types.ROW_NAMED(
                             ["abc","xyz"], [Types.STRING(), Types.STRING()])).build()
于 2021-10-20T11:42:57.360 回答