0

我正在使用下面的代码将数据流从 VM 发送到 Kafka 的测试主题(在 192.168.0.12 IP 的主机操作系统上运行)

public class WriteToKafka {

    public  static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Use ingestion time => TimeCharacteristic == EventTime + IngestionTimeExtractor
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        DataStream<JoinedStreamEvent> joinedStreamEventDataStream = env
                .addSource(new JoinedStreamGenerator()).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());

        Properties properties = new Properties();

        properties.setProperty("bootstrap.servers", "192.168.0.12:9092");
        properties.setProperty("zookeeper.connect", "192.168.0.12:2181");
        properties.setProperty("group.id", "test");

        DataStreamSource<JoinedStreamEvent> stream = env.addSource(new JoinedStreamGenerator());
        stream.addSink(new FlinkKafkaProducer09<JoinedStreamEvent>("test", new TypeInformationSerializationSchema<>(stream.getType(),env.getConfig()), properties));

        env.execute();
    }

JoinedStreamEvent是类型DataSream<Tuple3<Integer,Integer,Integer>>它基本上加入了 2 个流respirationRateStreamheartRateStream

 public JoinedStreamEvent(Integer patient_id, Integer heartRate, Integer respirationRate) {
        Patient_id = patient_id;
        HeartRate = heartRate;
        RespirationRate = respirationRate;

Host OS 上运行的另一个 Flink 程序试图从 kafka 读取数据流。我在这里使用 localhost,因为 kafka 和 zookeper 正在主机操作系统上运行。

public class ReadFromKafka {


    public static void main(String[] args) throws Exception {
        // create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "test");



       DataStream<String> message = env.addSource(new FlinkKafkaConsumer09<String>("test", new SimpleStringSchema(), properties));

       /* DataStream<JoinedStreamEvent> message = env.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test",
                new , properties));*/

        message.print();


        env.execute();


    } //main
} //ReadFromKafka

我得到这样的输出

在此处输入图像描述

我想我需要实现类型的反序列化器JoinedStreamEvent。有人可以给我一个想法我应该怎么写,类型的反序列JoinedStreamEvent化器DataSream<Tuple3<Integer, Integer, Integer>>

请让我知道是否需要做其他事情。

PS - 我想写下面的反序列化器,但我认为它不正确

DataStream<JoinedStreamEvent> message = env.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test",
                new TypeInformationSerializationSchema<JoinedStreamEvent>() , properties));
4

1 回答 1

0

通过为VM和主机OS程序编写自定义序列化器和反序列化器,我能够以相同格式接收事件VM,如下所述

public class JoinSchema implements DeserializationSchema<JoinedStreamEvent> , SerializationSchema<JoinedStreamEvent> {


    @Override
    public JoinedStreamEvent deserialize(byte[] bytes) throws IOException {
        return JoinedStreamEvent.fromstring(new String(bytes));
    }

    @Override
    public boolean isEndOfStream(JoinedStreamEvent joinedStreamEvent) {
        return false;
    }

    @Override
    public TypeInformation<JoinedStreamEvent> getProducedType() {
        return TypeExtractor.getForClass(JoinedStreamEvent.class);
    }

    @Override
    public byte[] serialize(JoinedStreamEvent joinedStreamEvent) {
        return joinedStreamEvent.toString().getBytes();
    }
} //JoinSchema

请注意,您可能必须在事件类型方法中编写 fromstring() 方法,正如我在下面添加的 fromString() JoinedStreamEvent 类

public static JoinedStreamEvent fromstring(String line){

        String[] token = line.split(",");

        JoinedStreamEvent joinedStreamEvent = new JoinedStreamEvent();


        Integer val1 = Integer.valueOf(token[0]);
        Integer val2 = Integer.valueOf(token[1]);
        Integer val3 = Integer.valueOf(token[2]);

        return  new JoinedStreamEvent(val1,val2,val3);


    } //fromstring

使用以下代码从 VM 发送事件

stream.addSink(new FlinkKafkaProducer09<JoinedStreamEvent>("test", new JoinSchema(), properties));

使用以下代码收到事件

public static void main(String[] args) throws Exception {
    // create execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("zookeeper.connect", "localhost:2181");
    properties.setProperty("group.id", "test");


    DataStream<JoinedStreamEvent> message = env.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test",
            new JoinSchema(), properties));

    message.print();



    env.execute();


} //main
于 2017-08-23T05:24:30.543 回答