0

我是 Flink 和 kafka 等这些框架的初学者。我想做的是一个以kafka和Flink为基础设施核心的生产者和消费者。信息由 kafka 插入到主题中进行管理,然后从 flink(python so pyflink) 管理的信息中获取它们,然后将修改后的信息发送到使用消费者 kakfa 获取的不同 kafka 主题。我在这个基础设施中错过的是 pyflink 与 kafka 的设置。我怎样才能建立这个链接?我在网上搜索指南,但找到的代码不起作用。有人可以帮我打磨一下吗?

下面我写了我的kafka producer.py

from time import sleep
from json import dumps
from kafka import KafkaProducer, errors
import csv
try:
  producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda x: dumps(x).encode('utf-8')
)
except errors.NoBrokersAvailable:
    print("Waiting for brokers to become available")
    sleep(10)

reader = csv.DictReader( csvfile)
j=0
for row in reader:
  print("Iteration", j)

  producer.send('batch_stream', value=row)

下面我写了我的consumerKafka.py

from kafka import KafkaConsumer
from json import loads
from time import sleep
consumer = KafkaConsumer(
   'output_batch',
   bootstrap_servers=['localhost:9092'],
   auto_offset_reset='earliest',
   enable_auto_commit=True,
   group_id='my-group-id',
   value_deserializer=lambda x: loads(x.decode('utf-8'))
)
for event in consumer:
 event_data = event.value
 # Do whatever you want
 print(event_data)
 sleep(1)

如何使用 kafka 开发 pyflink 连接器,以便使用来自 batch_stream 主题的信息,然后操作并将信息刷新到 output_batch 中?

import os
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, CsvTableSink, DataTypes, EnvironmentSettings
from pyflink.table.descriptors import Schema, Rowtime, Json, Kafka, Elasticsearch
from pyflink.table.window import Tumble
from pyflink.table.udf import udf
import pandas as pd

    


def register_information_source(st_env):
  st_env.connect(Kafka()
        .version("universal")
        .topic("batch_stream")
        .start_from_earliest()
        .property("zookeeper.connect", "zookeeper:2181")
        .property("bootstrap.servers", "kafka:9092")) \
    .with_format( Json()
        .fail_on_missing_field(True)
        .schema(DataTypes.ROW([
        DataTypes.FIELD("Id", DataTypes.BIGINT()),
        DataTypes.FIELD("X", DataTypes.FLOAT()),
        DataTypes.FIELD("Y", DataTypes.FLOAT()),
        DataTypes.FIELD("ANGLE", DataTypes.FLOAT()),
        DataTypes.FIELD("Timestamp", DataTypes.STRING())]))) \
    .with_schema(Schema()
        .field("Id", DataTypes.BIGINT())
        .field("X", DataTypes.FLOAT())
        .field("Y", DataTypes.FLOAT())
        .field("ANGLE", DataTypes.FLOAT())) \
    .in_append_mode() \
    .create_temporary_table("source")


def register_information_sink(st_env):
  st_env \
    .connect(  # declare the external system to connect to
    Kafka()
        .version("universal")
        .topic("output_batch")
        .property("zookeeper.connect", "zookeeper:2181")
        .property("bootstrap.servers", "kafka:9092")) \
    .with_format(  # declare a format for this system
    Json()
        .fail_on_missing_field(True)
        .schema(DataTypes.ROW([ DataTypes.FIELD("Id", DataTypes.BIGINT()),
        DataTypes.FIELD("X", DataTypes.FLOAT()),
        DataTypes.FIELD("Y", DataTypes.FLOAT()),
        DataTypes.FIELD("ANGLE", DataTypes.FLOAT()),
        DataTypes.FIELD("Timestamp", DataTypes.STRING())]))) \
    .with_schema(Schema()
        .field("Id", DataTypes.BIGINT())
        .field("X", DataTypes.FLOAT())
        .field("Y", DataTypes.FLOAT())
        .field("ANGLE", DataTypes.FLOAT())) \
    .in_append_mode() \
    .create_temporary_table("sink")
def job():
  s_env = StreamExecutionEnvironment.get_execution_environment()
  s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
  s_env.set_parallelism(1)

# use blink table planner
st_env = StreamTableEnvironment \
.create(s_env, environment_settings=EnvironmentSettings
    .new_instance()
    .in_streaming_mode()
    .use_blink_planner().build())

# register source and sink
register_information_source(st_env)
register_information_sink(st_env)

# query
st_env.from_path("source").insert_into("sink")

# execute
st_env.execute("FLinkMangment") 
if __name__ == '__main__':
  job()

我尝试像上面那样进行开发,但存在一些问题。

4

0 回答 0