这种新的编程范式对我来说非常新。我想用给定类中.map()
的定义替换匿名函数。DistributedFunction
但我不确定如何创建新功能。
我有以下管道:
p.drawFrom(KafkaSources.kafka(properties, topic, "topicX", "topicY"))
.map(e -> {
Gson gson = new Gson();
KafkaMessage kafkaMessage = gson.fromJson(e.getValue().toString(),
KafkaMessage.class);
byte[] encodedData = Base64.getDecoder().decode(kafkaMessage.getData());
try {
kafkaMessage.setData(new String(encodedData, "utf-8"));
} catch (Exception e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
return kafkaMessage;
})
.map(m -> m.getData())
.drainTo(Sinks.logger());
基于一些 Jet 示例,我最终得到以下结果:
p.drawFrom(KafkaSources.kafka(properties, topic, "topicX", "topicY"))
.map(KafkaHelper::decodeKafkaMessage)
.map(m -> m.getData())
.drainTo(Sinks.logger());
KafkaHelper 类:
public final class KafkaHelper implements Serializable {
private static final long serialVersionUID = -3556269069192202060L;
public static KafkaMessage decodeKafkaMessage(Map.Entry<Object,Object> entry) {
Gson gson = new Gson();
KafkaMessage kafkaMessage = gson.fromJson(entry.getValue().toString(), KafkaMessage.class);
byte[] encodedData = Base64.getDecoder().decode(kafkaMessage.getData());
try {
kafkaMessage.setData(new String(encodedData, "utf-8"));
} catch (UnsupportedEncodingException e) {
System.out.println(e.getMessage());
e.printStackTrace();
}
return kafkaMessage;
}
}
DistributedFunction
这种方法是否遵循将 a 传递给的规范/要求.map()
?如果是,为什么?如果没有,我应该对其进行哪些更改?