我正在使用 Flink 处理来自某些数据源(如 Kafka、Pravega 等)的数据。
就我而言,数据源是 Pravega,它为我提供了一个 flink 连接器。
我的数据源正在向我发送一些 JSON 数据,如下所示:
{"key": "value"}
{"key": "value2"}
{"key": "value3"}
...
...
这是我的一段代码:
PravegaDeserializationSchema<ObjectNode> adapter = new PravegaDeserializationSchema<>(ObjectNode.class, new JavaSerializer<>());
FlinkPravegaReader<ObjectNode> source = FlinkPravegaReader.<ObjectNode>builder()
.withPravegaConfig(pravegaConfig)
.forStream(stream)
.withDeserializationSchema(adapter)
.build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> dataStream = env.addSource(source).name("Pravega Stream");
dataStream.map(new MapFunction<ObjectNode, String>() {
@Override
public String map(ObjectNode node) throws Exception {
return node.toString();
}
})
.keyBy("word") // ERROR
.timeWindow(Time.seconds(10))
.sum("count");
如您所见,我使用了FlinkPravegaReader和 一个适当的反序列化器来获取来自 Pravega 的 JSON 流。
然后我尝试将 JSON 数据转换为字符串,KeyBy然后对它们进行计数。
但是,我收到一个错误:
The program finished with the following exception:
Field expression must be equal to '*' or '_' for non-composite types.
org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:342)
org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:340)
myflink.StreamingJob.main(StreamingJob.java:114)
似乎KeyBy抛出了这个异常。
好吧,我不是 Flink 专家,所以我不知道为什么。我已经阅读了官方示例的源代码WordCount。在该示例中,有一个自定义拆分器,用于将字符串数据拆分为单词。
所以我在想在这种情况下是否也需要使用某种分离器?如果是这样,我应该使用什么样的分离器?你能给我举个例子吗?如果没有,为什么会出现这样的错误以及如何解决?