0

我是 Hazelcast Jet 的新手,所以想知道我是否做错了什么。

我正在本地测试 tis。我在本地启动了 2 个 Hazelcast Jet 实例:

Jet.newJetInstance();

这只是通过运行具有 public static void main 两次的单独类。

然后我提交作业,启动具有所有逻辑的新 Jet 实例。我正在打印处理的记录数。我只看到它被打印在一个节点上,而不是等间距,因为它假设在所有节点上运行。我做错了什么还是我错过了任何设置。

这是我的流式处理过程的代码

    package com.geek.hazelcast;

import com.google.common.collect.Lists;
import com.hazelcast.core.IMap;
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.function.DistributedFunctions;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
import com.hazelcast.jet.pipeline.WindowDefinition;
import org.apache.commons.lang.RandomStringUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class JetDemo {

    public static void main( String[] args ) throws Exception {


        JetInstance jet = Jet.newJetClient();

        IMap<String, Long> counts = jet.getMap("testmap");

        Pipeline p = Pipeline.create();

        p.drawFrom(Sources.streamFromProcessor("test", TestProcessor.streamWords()))
                .addTimestamps()
                .window(WindowDefinition.sliding(60_000, 30_000))
                .groupingKey(DistributedFunctions.wholeItem())
                .aggregate(AggregateOperations.counting())
                .drainTo(Sinks.map("testmap"));



        try {
            //JobConfig jcg = new JobConfig();
            //jcg.setProcessingGuarantee()
            Job job = jet.newJob(p);
            job.join();
            counts.entrySet()
                    .stream().forEach(e -> System.out.println(e.getKey() + " " + e.getValue()));
            System.out.println(counts);
        } finally {
            jet.getCluster().shutdown();
        }

    }

    public static class TestProcessor extends AbstractProcessor {

        int total = 100000;
        int processed = 0;
        private Traverser<String> traverser;
        List<String> randomWords;

        public TestProcessor() {
            randomWords = Lists.newArrayListWithExpectedSize(20);
            for(int i = 0; i < 200; i++) {
                randomWords.add(RandomStringUtils.randomAlphabetic(10));
            }
        }

        @Override
        public boolean complete() {

            System.out.println("processed " + processed);

          if(processed == total) {
              return true;
          }

           if(traverser == null) {
              traverser = getWords();
               processed = processed + 1000;
           }

            if(emitFromTraverser(traverser)) {
              traverser = null;
            }


            return false;
        }

        @Override
        public boolean isCooperative() {
            return true;
        }

        private Traverser<String> getWords() {
            Random r = new Random();
            int Low = 0;
            int High = 200;
            List<String> list = new ArrayList<>();
            for(int i = 0; i < 1000; i++) {
                int index = r.nextInt(High-Low) + Low;
                list.add(randomWords.get(index));
            }


            return Traversers.traverseIterable(list);
        }

        public static ProcessorMetaSupplier streamWords() {
            return ProcessorMetaSupplier.forceTotalParallelismOne(ProcessorSupplier.of(() -> new TestProcessor()));
        }
    }


}

谢谢

4

0 回答 0