1

我想将数据流从 VM 发送到主机,我正在使用writeToSocket()如下所示的方法:

joinedStreamEventDataStream.writeToSocket("192.168.1.10", 6998) ;

这里joinedStreamEventDataStream是类型DataStream<Integer,Integer>

有人可以告诉我应该如何将序列化程序传递给上述方法。

提前致谢

4

2 回答 2

3

这在一定程度上取决于您希望如何从套接字读取数据。如果您希望它是数据的字符串表示形式,那么您可以通过以下方式进行:

joinedStreamEventDataStream.map(new MapFunction<Type, String>() {
    @Override
    public String map(Type value) throws Exception {
        return value.toString();
    }
}).writeToSocket(hostname, port, new SimpleStringSchema());

如果你想保持 Flink 的序列化格式,那么你可以这样写:

joinedStreamEventDataStream.writeToSocket(
    hostname, 
    port, 
    new TypeInformationSerializationSchema<>(
        joinedStreamEventDataStream.getType(), 
        env.getConfig()));

SerializationSchema如果您想以自己的序列化格式输出它,那么您必须按照 Alex 的指出实现自己的。

于 2017-08-22T13:27:12.727 回答
1

writeToSocket()方法采用 3 个参数:套接字主机和端口,以及SerializationSchema用于序列化数据的接口的实现。所以你的实现可能是这样的:

joinedStreamEventDataStream.writeToSocket(
    "192.168.1.10",  // host name
    6998,  // port
    new SerializationSchema<Integer>() {

        @Override
        public byte[] serialize(Integer element) {
            return ByteBuffer.allocate(4).putInt(element).array();
        }
    }
);

如果joinedStreamEventDataStreamDataStream<Integer>类型,则为真。

于 2017-08-22T12:14:24.310 回答