我已经启动了 Hazelcast-Jet(jet-start.sh) 的 3 个实例。可以看到集群形成为 3 个节点。
在其中一个集群节点中,我尝试pipeline
使用JetClient
to Source from Kafka
toIMap
和 source from IMap
to运行很少的Kafka
。这是代码:
public static void main(String[] args) throws Exception {
new SourceKafkaToIMapForEnrichments().run();
}
private void run() throws Exception {
JetConfig cfg = new JetConfig();
cfg.setInstanceConfig(
new InstanceConfig().setCooperativeThreadCount(Math.max(1, getRuntime().availableProcessors() / 1)));
try {
// jetInstance = Jet.newJetInstance(cfg);
// Jet.newJetInstance(cfg);
UsernamePasswordCredentials cred = new UsernamePasswordCredentials("jet", "jet-app");
jetInstance = Jet.newJetClient(new ClientConfig().setCredentials(cred));
JobConfig jobConfig = new JobConfig();
jobConfig.addClass(SourceKafkaToIMapForEnrichments.class);
initiateMaps(); // initialize maps
Pipeline sourceFromClient = buildPipelineForClientDataa();
Pipeline sourceFromIP2Location = buildPipelineForIp2Loc();
Pipeline enrichSinkToKafka = buildPipelineForEnrichData();
long start = System.nanoTime();
Job job1 = jetInstance.newJob(sourceFromClient, jobConfig);
Job job2 = jetInstance.newJob(sourceFromIP2Location, jobConfig);
Job job3 = jetInstance.newJob(enrichSinkToKafka, jobConfig);
}
}
private static Pipeline buildPipelineForClientDataa() {
Pipeline p = Pipeline.create();
p.drawFrom(KafkaSources.kafka(
props(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName(),
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName(),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET),
record -> com.hazelcast.jet.Util.entry( com.hazelcast.jet.datamodel.Tuple2.tuple2(record.partition(), record.offset()),
record.value()), SINK_CLINET_DATA)).withoutTimestamps().drainTo(Sinks.map(SINK_CLINET_DATA));
return p;
}
private static Pipeline buildPipelineForIp2Loc() {
Pipeline p = Pipeline.create();
p.drawFrom(KafkaSources.kafka(
props(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName(),
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName(),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET),
record -> com.hazelcast.jet.Util.entry(
com.hazelcast.jet.datamodel.Tuple2.tuple2(record.partition(), record.offset()),
record.value()),
SINK_IP2LOCATION))
.withoutTimestamps()
.drainTo(Sinks.map(SINK_IP2LOCATION));
return p;
}
private static Pipeline buildPipelineForEnrichData() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.map(SINK_CLINET_DATA_ENRICHED))
.drainTo(KafkaSinks.kafka(
props(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName(),
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()),
SINK_CLINET_DATA_ENRICHED));
return p;
}
private static Properties props(String... kvs) {
final Properties props = new Properties();
for (int i = 0; i < kvs.length;) {
props.setProperty(kvs[i++], kvs[i++]);
}
return props;
}
public static IMap<Tuple2<Integer, Long>, ApacheCommons> clientMapObject = null;
public static IMap<Tuple2<Integer, Long>, IP2LocationPojo> ip2LocIMapObjectLevel = null;
public static IMap<String, ApacheCommons> clientMapEnriched = null;
@SuppressWarnings("deprecation")
private void initiateMaps() {
// client data -> topic to Imap
jetInstance.getMap(SINK_CLINET_DATA).addEntryListener(new KafkaStreamIMapListener(), true);
// ip2location data -> topic to Imap
jetInstance.getMap(SINK_IP2LOCATION).addEntryListener(new KafkaIp2LocationListener(), true);
// client data -> json string to Java object
clientMapObject = jetInstance.getMap(SINK_CLINET_DATA_OBJECT);
clientMapObject.addEntryListener(new KafkaStreamObjectIMapListener(), true);
// ip2location data -> string to JAva object
ip2LocIMapObjectLevel = jetInstance.getMap(SINK_IP2LOCATION_OBJECT);
// client data -> enrich data by adding geoloc information
clientMapEnriched = jetInstance.getMap(SINK_CLINET_DATA_ENRICHED);
// clientMapEnriched.addEntryListener(new KafkaStreamObjectIMapListener(), true);
}
虽然代码中没有,但我正在使用org/apache/kafka/clients/consumer/ConsumerRecord
,在运行这个 jar 时,我收到了这个错误:
WARNING: hz.client_0 [dev] [3.0] [3.12] Job 50e9-3541-4c23-bd58 (name ??) failed because it has received a non-restartable exception during submission
com.hazelcast.client.UndefinedErrorCodeException: Class name: java.lang.NoClassDefFoundError, Message: org/apache/kafka/clients/consumer/ConsumerRecord
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.getDeclaredMethod(Class.java:2128)
at java.lang.invoke.SerializedLambda$1.run(SerializedLambda.java:224)
at java.lang.invoke.SerializedLambda$1.run(SerializedLambda.java:221)
at java.security.AccessController.doPrivileged(Native Method)
at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:221)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject$Serializer.read(CustomClassLoadedObject.java:110)
at com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject$Serializer.read(CustomClassLoadedObject.java:91)
at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:48)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:269)
at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:574)
at com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject.read(CustomClassLoadedObject.java:56)
at com.hazelcast.jet.core.Vertex.readData(Vertex.java:194)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:160)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:106)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:51)
at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:48)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:269)
at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:574)
at com.hazelcast.jet.core.DAG.readData(DAG.java:450)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:160)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:106)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:51)
at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:48)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:187)
at com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject.deserializeWithCustomClassLoader(CustomClassLoadedObject.java:65)
at com.hazelcast.jet.impl.JobCoordinationService.dagToJson(JobCoordinationService.java:732)
at com.hazelcast.jet.impl.JobCoordinationService.lambda$submitJob$2(JobCoordinationService.java:167)
at com.hazelcast.jet.impl.JobCoordinationService.lambda$submitToCoordinatorThread$29(JobCoordinationService.java:878)
at com.hazelcast.jet.impl.JobCoordinationService.lambda$submitToCoordinatorThread$30(JobCoordinationService.java:897)
at com.hazelcast.util.executor.CompletableFutureTask.run(CompletableFutureTask.java:67)
at com.hazelcast.util.executor.CachedExecutorServiceDelegate$Worker.run(CachedExecutorServiceDelegate.java:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
at com.hazelcast.util.executor.HazelcastManagedThread.executeRun(HazelcastManagedThread.java:64)
at com.hazelcast.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:80)
at ------ submitted from ------.(Unknown Source)
at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.resolve(InvocationFuture.java:126)
at com.hazelcast.spi.impl.AbstractInvocationFuture$1.run(AbstractInvocationFuture.java:251)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
at com.hazelcast.util.executor.HazelcastManagedThread.executeRun(HazelcastManagedThread.java:64)
at com.hazelcast.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:80)
虽然我添加了 的依赖项kafka-clients
,但仍然遇到此错误。这是 pom.xml:
<dependencies>
<!-- https://mvnrepository.com/artifact/com.hazelcast.jet/hazelcast-jet-kafka -->
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-jet-kafka</artifactId>
<version>3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.hazelcast.jet/hazelcast-jet -->
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-jet</artifactId>
<version>3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
</dependencies>
也添加kafka-clients.jar
到/Hazelcast-jet-3.0/lib
文件夹中。想知道我是否应该开始Hazelcast IMDG
?
有人请帮助和纠正。谢谢。