1

我们使用 Kafka 作为 Jet 工作的来源。我们希望将在 Kafka 中接收到的消息分发到在不同机器上运行的每个 Jet 实例。

我们可以使用 GroupConfig(设置名称和密码)和 JoinConfig(添加 IP 地址)成功地加入来自不同机器的成员。

当我们向 Kafka 源主题发送消息时,要么所有机器读取相同的消息,要么在一台机器上处理相同的消息两次。

例如,我们创建了一个 Kafka 主题,它有4 个分区和 2台不同的机器运行,正好有1 个喷气机集群。当我们运行应用程序并实例化 jet 实例时,我们看到2 个jet 成员已连接,并且分区在2之间拆分。

当发送像“message_one”这样的消息时,会发生以下两种情况之一:

  1. 两个 Jet 实例都读取消息
  2. 消息仅由 Jet 实例之一读取两次

在我们的情况下两者都不是必需的,因为我们不希望消息被复制。

如何防止两次处理消息?

下面是代码片段:

    JetConfig jetConfig = new JetConfig();
    jetConfig.getInstanceConfig().setCooperativeThreadCount(1);
    jetConfig.getInstanceConfig().setFlowControlPeriodMs(200);
    jetConfig.getHazelcastConfig().getGroupConfig().setName("TEST_NAME");
    jetConfig.getHazelcastConfig().getGroupConfig().setPassword("TEST_PASSWORD");
    jetConfig.getHazelcastConfig().getNetworkConfig().setReuseAddress(true);

    final JoinConfig join = jetConfig.getHazelcastConfig().getNetworkConfig().getJoin();
    join.getMulticastConfig().setEnabled(false);
    join.getTcpIpConfig().setEnabled(true).addMember("test.ip.address.one");
    join.getTcpIpConfig().setEnabled(true).addMember("test.ip.address.two");
    jetConfig.getHazelcastConfig().getNetworkConfig().getInterfaces()
            .setEnabled(true)
            .addInterface("test.ip.address.one")
            .addInterface("test.ip.address.two");

    final JetInstance jet =Jet.newJetInstance(jetConfig);


    final Properties consumeProps = new Properties();
    final Properties produceProps=new Properties();
    consumeProps.setProperty("bootstrap.servers", kafkaBootstrapServers);           
    consumeProps.setProperty("key.deserializer", StringDeserializer.class.getCanonicalName());          
    consumeProps.setProperty("value.deserializer", StringDeserializer.class.getCanonicalName());
    consumeProps.setProperty("auto.offset.reset", "latest");
    consumeProps.setProperty("enable.auto.commit", "true");
    consumeProps.setProperty("group.id", kafkaLoadBalancerGroupName);
    consumeProps.setProperty("partition.assignment.strategy", RoundRobinAssignor.class.getCanonicalName());

    produceProps.setProperty("bootstrap.servers", kafkaBootstrapServers);   
    produceProps.setProperty("key.serializer", StringSerializer.class.getCanonicalName());
    produceProps.setProperty("value.serializer", StringSerializer.class.getCanonicalName());

    final Pipeline p = Pipeline.create();
    p.drawFrom(KafkaSources.kafka(consumeProps, kafkaSourceTopic))
     .filter(Objects::nonNull)
     .map(TestClass::parseMessage)
     .filter(Objects::nonNull)
     .drainTo(KafkaSinks.kafka(produceProps, kafkaEnrichedTopic));
    jet.newJob(p).join();
4

0 回答 0