我想将数据流从 VM 发送到主机,我正在使用writeToSocket()
如下所示的方法:
joinedStreamEventDataStream.writeToSocket("192.168.1.10", 6998) ;
这里joinedStreamEventDataStream
是类型DataStream<Integer,Integer>
。
有人可以告诉我应该如何将序列化程序传递给上述方法。
提前致谢
我想将数据流从 VM 发送到主机,我正在使用writeToSocket()
如下所示的方法:
joinedStreamEventDataStream.writeToSocket("192.168.1.10", 6998) ;
这里joinedStreamEventDataStream
是类型DataStream<Integer,Integer>
。
有人可以告诉我应该如何将序列化程序传递给上述方法。
提前致谢
这在一定程度上取决于您希望如何从套接字读取数据。如果您希望它是数据的字符串表示形式,那么您可以通过以下方式进行:
joinedStreamEventDataStream.map(new MapFunction<Type, String>() {
@Override
public String map(Type value) throws Exception {
return value.toString();
}
}).writeToSocket(hostname, port, new SimpleStringSchema());
如果你想保持 Flink 的序列化格式,那么你可以这样写:
joinedStreamEventDataStream.writeToSocket(
hostname,
port,
new TypeInformationSerializationSchema<>(
joinedStreamEventDataStream.getType(),
env.getConfig()));
SerializationSchema
如果您想以自己的序列化格式输出它,那么您必须按照 Alex 的指出实现自己的。
该writeToSocket()
方法采用 3 个参数:套接字主机和端口,以及SerializationSchema
用于序列化数据的接口的实现。所以你的实现可能是这样的:
joinedStreamEventDataStream.writeToSocket(
"192.168.1.10", // host name
6998, // port
new SerializationSchema<Integer>() {
@Override
public byte[] serialize(Integer element) {
return ByteBuffer.allocate(4).putInt(element).array();
}
}
);
如果joinedStreamEventDataStream
有DataStream<Integer>
类型,则为真。