我们使用 Kafka 作为 Jet 工作的来源。我们希望将在 Kafka 中接收到的消息分发到在不同机器上运行的每个 Jet 实例。
我们可以使用 GroupConfig(设置名称和密码)和 JoinConfig(添加 IP 地址)成功地加入来自不同机器的成员。
当我们向 Kafka 源主题发送消息时,要么所有机器读取相同的消息,要么在一台机器上处理相同的消息两次。
例如,我们创建了一个 Kafka 主题,它有4 个分区和 2台不同的机器运行,正好有1 个喷气机集群。当我们运行应用程序并实例化 jet 实例时,我们看到2 个jet 成员已连接,并且分区在2之间拆分。
当发送像“message_one”这样的消息时,会发生以下两种情况之一:
- 两个 Jet 实例都读取消息
- 消息仅由 Jet 实例之一读取两次
在我们的情况下两者都不是必需的,因为我们不希望消息被复制。
如何防止两次处理消息?
下面是代码片段:
JetConfig jetConfig = new JetConfig();
jetConfig.getInstanceConfig().setCooperativeThreadCount(1);
jetConfig.getInstanceConfig().setFlowControlPeriodMs(200);
jetConfig.getHazelcastConfig().getGroupConfig().setName("TEST_NAME");
jetConfig.getHazelcastConfig().getGroupConfig().setPassword("TEST_PASSWORD");
jetConfig.getHazelcastConfig().getNetworkConfig().setReuseAddress(true);
final JoinConfig join = jetConfig.getHazelcastConfig().getNetworkConfig().getJoin();
join.getMulticastConfig().setEnabled(false);
join.getTcpIpConfig().setEnabled(true).addMember("test.ip.address.one");
join.getTcpIpConfig().setEnabled(true).addMember("test.ip.address.two");
jetConfig.getHazelcastConfig().getNetworkConfig().getInterfaces()
.setEnabled(true)
.addInterface("test.ip.address.one")
.addInterface("test.ip.address.two");
final JetInstance jet =Jet.newJetInstance(jetConfig);
final Properties consumeProps = new Properties();
final Properties produceProps=new Properties();
consumeProps.setProperty("bootstrap.servers", kafkaBootstrapServers);
consumeProps.setProperty("key.deserializer", StringDeserializer.class.getCanonicalName());
consumeProps.setProperty("value.deserializer", StringDeserializer.class.getCanonicalName());
consumeProps.setProperty("auto.offset.reset", "latest");
consumeProps.setProperty("enable.auto.commit", "true");
consumeProps.setProperty("group.id", kafkaLoadBalancerGroupName);
consumeProps.setProperty("partition.assignment.strategy", RoundRobinAssignor.class.getCanonicalName());
produceProps.setProperty("bootstrap.servers", kafkaBootstrapServers);
produceProps.setProperty("key.serializer", StringSerializer.class.getCanonicalName());
produceProps.setProperty("value.serializer", StringSerializer.class.getCanonicalName());
final Pipeline p = Pipeline.create();
p.drawFrom(KafkaSources.kafka(consumeProps, kafkaSourceTopic))
.filter(Objects::nonNull)
.map(TestClass::parseMessage)
.filter(Objects::nonNull)
.drainTo(KafkaSinks.kafka(produceProps, kafkaEnrichedTopic));
jet.newJob(p).join();