2

我已经启动了 Hazelcast-Jet(jet-start.sh) 的 3 个实例。可以看到集群形成为 3 个节点。

在其中一个集群节点中,我尝试pipeline使用JetClientto Source from KafkatoIMap和 source from IMapto运行很少的Kafka。这是代码:

public static void main(String[] args) throws Exception {
        new SourceKafkaToIMapForEnrichments().run();
    }

    private void run() throws Exception {
        JetConfig cfg = new JetConfig();
        cfg.setInstanceConfig(
                new InstanceConfig().setCooperativeThreadCount(Math.max(1, getRuntime().availableProcessors() / 1)));
        try {

//          jetInstance = Jet.newJetInstance(cfg);
//          Jet.newJetInstance(cfg);
            UsernamePasswordCredentials cred = new UsernamePasswordCredentials("jet", "jet-app");
            jetInstance = Jet.newJetClient(new ClientConfig().setCredentials(cred));

            JobConfig jobConfig = new JobConfig();
            jobConfig.addClass(SourceKafkaToIMapForEnrichments.class);

            initiateMaps(); // initialize maps

            Pipeline sourceFromClient = buildPipelineForClientDataa(); 
            Pipeline sourceFromIP2Location = buildPipelineForIp2Loc();
            Pipeline enrichSinkToKafka = buildPipelineForEnrichData();

            long start = System.nanoTime();
            Job job1 = jetInstance.newJob(sourceFromClient, jobConfig);
            Job job2 = jetInstance.newJob(sourceFromIP2Location, jobConfig);
            Job job3 = jetInstance.newJob(enrichSinkToKafka, jobConfig);
        }
    }

    private static Pipeline buildPipelineForClientDataa() {
        Pipeline p = Pipeline.create();
        p.drawFrom(KafkaSources.kafka(
                props(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS, 
                        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName(), 
                        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName(), 
                        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET),
                record -> com.hazelcast.jet.Util.entry(                     com.hazelcast.jet.datamodel.Tuple2.tuple2(record.partition(), record.offset()), 
                        record.value()),                    SINK_CLINET_DATA)).withoutTimestamps().drainTo(Sinks.map(SINK_CLINET_DATA));

        return p;
    }

    private static Pipeline buildPipelineForIp2Loc() {
        Pipeline p = Pipeline.create();
        p.drawFrom(KafkaSources.kafka(
                props(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS, 
                        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName(), 
                        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName(), 
                        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET),
                record -> com.hazelcast.jet.Util.entry(
                        com.hazelcast.jet.datamodel.Tuple2.tuple2(record.partition(), record.offset()), 
                        record.value()),
                SINK_IP2LOCATION))
        .withoutTimestamps()
        .drainTo(Sinks.map(SINK_IP2LOCATION));
        return p;
    }

    private static Pipeline buildPipelineForEnrichData() {

        Pipeline p = Pipeline.create();
        p.drawFrom(Sources.map(SINK_CLINET_DATA_ENRICHED))
        .drainTo(KafkaSinks.kafka(
                props(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS, 
                        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName(), 
                        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()),
                SINK_CLINET_DATA_ENRICHED));
        return p;
    }

    private static Properties props(String... kvs) {
        final Properties props = new Properties();
        for (int i = 0; i < kvs.length;) {
            props.setProperty(kvs[i++], kvs[i++]);
        }
        return props;
    }

    public static IMap<Tuple2<Integer, Long>, ApacheCommons> clientMapObject = null;
    public static IMap<Tuple2<Integer, Long>, IP2LocationPojo> ip2LocIMapObjectLevel = null;
    public static IMap<String, ApacheCommons> clientMapEnriched = null;

    @SuppressWarnings("deprecation")
    private void initiateMaps() {

        // client data -> topic to Imap
        jetInstance.getMap(SINK_CLINET_DATA).addEntryListener(new KafkaStreamIMapListener(), true);

        // ip2location data -> topic to Imap
        jetInstance.getMap(SINK_IP2LOCATION).addEntryListener(new KafkaIp2LocationListener(), true);

        // client data -> json string to Java object
        clientMapObject = jetInstance.getMap(SINK_CLINET_DATA_OBJECT);
        clientMapObject.addEntryListener(new KafkaStreamObjectIMapListener(), true);

        // ip2location data -> string to JAva object
        ip2LocIMapObjectLevel = jetInstance.getMap(SINK_IP2LOCATION_OBJECT);

        // client data -> enrich data by adding geoloc information
        clientMapEnriched = jetInstance.getMap(SINK_CLINET_DATA_ENRICHED);
//      clientMapEnriched.addEntryListener(new KafkaStreamObjectIMapListener(), true);

    }

虽然代码中没有,但我正在使用org/apache/kafka/clients/consumer/ConsumerRecord,在运行这个 jar 时,我收到了这个错误:

WARNING: hz.client_0 [dev] [3.0] [3.12] Job 50e9-3541-4c23-bd58 (name ??) failed because it has received a non-restartable exception during submission
    com.hazelcast.client.UndefinedErrorCodeException: Class name: java.lang.NoClassDefFoundError, Message: org/apache/kafka/clients/consumer/ConsumerRecord
            at java.lang.Class.getDeclaredMethods0(Native Method)
            at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
            at java.lang.Class.getDeclaredMethod(Class.java:2128)
            at java.lang.invoke.SerializedLambda$1.run(SerializedLambda.java:224)
            at java.lang.invoke.SerializedLambda$1.run(SerializedLambda.java:221)
            at java.security.AccessController.doPrivileged(Native Method)
            at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:221)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
            at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
            at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
            at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
            at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
            at com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject$Serializer.read(CustomClassLoadedObject.java:110)
            at com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject$Serializer.read(CustomClassLoadedObject.java:91)
            at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:48)
            at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:269)
            at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:574)
            at com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject.read(CustomClassLoadedObject.java:56)
            at com.hazelcast.jet.core.Vertex.readData(Vertex.java:194)
            at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:160)
            at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:106)
            at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:51)
            at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:48)
            at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:269)
            at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:574)
            at com.hazelcast.jet.core.DAG.readData(DAG.java:450)
            at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:160)
            at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:106)
            at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:51)
            at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:48)
            at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:187)
            at com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject.deserializeWithCustomClassLoader(CustomClassLoadedObject.java:65)
            at com.hazelcast.jet.impl.JobCoordinationService.dagToJson(JobCoordinationService.java:732)
            at com.hazelcast.jet.impl.JobCoordinationService.lambda$submitJob$2(JobCoordinationService.java:167)
            at com.hazelcast.jet.impl.JobCoordinationService.lambda$submitToCoordinatorThread$29(JobCoordinationService.java:878)
            at com.hazelcast.jet.impl.JobCoordinationService.lambda$submitToCoordinatorThread$30(JobCoordinationService.java:897)
            at com.hazelcast.util.executor.CompletableFutureTask.run(CompletableFutureTask.java:67)
            at com.hazelcast.util.executor.CachedExecutorServiceDelegate$Worker.run(CachedExecutorServiceDelegate.java:227)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
            at com.hazelcast.util.executor.HazelcastManagedThread.executeRun(HazelcastManagedThread.java:64)
            at com.hazelcast.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:80)
            at ------ submitted from ------.(Unknown Source)
            at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.resolve(InvocationFuture.java:126)
            at com.hazelcast.spi.impl.AbstractInvocationFuture$1.run(AbstractInvocationFuture.java:251)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
            at com.hazelcast.util.executor.HazelcastManagedThread.executeRun(HazelcastManagedThread.java:64)
            at com.hazelcast.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:80)

虽然我添加了 的依赖项kafka-clients,但仍然遇到此错误。这是 pom.xml:

<dependencies>

        <!-- https://mvnrepository.com/artifact/com.hazelcast.jet/hazelcast-jet-kafka -->
        <dependency>
            <groupId>com.hazelcast.jet</groupId>
            <artifactId>hazelcast-jet-kafka</artifactId>
            <version>3.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.hazelcast.jet/hazelcast-jet -->
        <dependency>
            <groupId>com.hazelcast.jet</groupId>
            <artifactId>hazelcast-jet</artifactId>
            <version>3.0</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.2.0</version>
        </dependency>
    </dependencies>

也添加kafka-clients.jar/Hazelcast-jet-3.0/lib文件夹中。想知道我是否应该开始Hazelcast IMDG

有人请帮助和纠正。谢谢。

4

0 回答 0