0

这种新的编程范式对我来说非常新。我想用给定类中.map()的定义替换匿名函数。DistributedFunction但我不确定如何创建新功能。

我有以下管道:

p.drawFrom(KafkaSources.kafka(properties, topic, "topicX", "topicY"))
 .map(e -> {
    Gson gson = new Gson();

    KafkaMessage kafkaMessage = gson.fromJson(e.getValue().toString(), 
    KafkaMessage.class);

    byte[] encodedData = Base64.getDecoder().decode(kafkaMessage.getData());

    try {
       kafkaMessage.setData(new String(encodedData, "utf-8"));
    } catch (Exception e1) {
       // TODO Auto-generated catch block
       e1.printStackTrace();
    }

    return kafkaMessage;             
  })
 .map(m -> m.getData())
 .drainTo(Sinks.logger());

基于一些 Jet 示例,我最终得到以下结果:

p.drawFrom(KafkaSources.kafka(properties, topic, "topicX", "topicY"))
 .map(KafkaHelper::decodeKafkaMessage)
 .map(m -> m.getData())
 .drainTo(Sinks.logger());

KafkaHelper 类:

public final class KafkaHelper implements Serializable {

    private static final long serialVersionUID = -3556269069192202060L;

    public static KafkaMessage decodeKafkaMessage(Map.Entry<Object,Object> entry) {

        Gson gson = new Gson();

        KafkaMessage kafkaMessage = gson.fromJson(entry.getValue().toString(), KafkaMessage.class);

        byte[] encodedData = Base64.getDecoder().decode(kafkaMessage.getData());

        try {
            kafkaMessage.setData(new String(encodedData, "utf-8"));
        } catch (UnsupportedEncodingException e) {
            System.out.println(e.getMessage());
            e.printStackTrace();
        }

         return kafkaMessage;            
    }   

}

DistributedFunction这种方法是否遵循将 a 传递给的规范/要求.map()?如果是,为什么?如果没有,我应该对其进行哪些更改?

4

1 回答 1

1

是的,在您的两个示例中,您都在创建和传递DistributedFunctionto的实例map()。Java 8 有一个规则,根据该规则,您的第一个示例中的 lambda 函数和第二个示例中的方法引用用于创建一个合成子类型,DistributedFunction该子类型使用您提供的代码实现其单一抽象方法(“SAM”)。

KafkaHelper不必是Serializable因为你从不实例化它。您也可以将静态方法decodeKafkaMessage放在任何其他类中,因为它不依赖于类实例。

于 2018-01-28T11:54:08.440 回答