1

我是 hazelcast-jet 的新手,我的用例是在检查 hazelcastIMDG 中的值后从 Kafka 源读取和过滤。

我什至在创建管道之前就正在获取并加载 IMDG 地图。见下文

 IMap<String, Policy> policyMap =jet.getHazelcastInstance().getMap(POLICY_MAP_NAME);
            Utility.populatePoliciesMap(policyMap);

在 buildPipeline 方法中将 policyMap 作为参数传递。

我创建了如下管道

StreamStage<TimestampedEntry<String, Long>> streamStage = pipeline.drawFrom(KafkaSources.kafka(brokerConsumerProperties(), projectionFn, getIngestTopic()))
                .addTimestamps()
                .flatMap(ingestData -> traverseArray(ingestData.getMapRequestParameterTree().toArray(new String[ingestData.getMapRequestParameterTree().size()])))
                .filter(hash -> policyMap.get(hash)!=null)
                .window(sliding(MINUTES.toMillis(1), SECONDS.toMillis(10)))
                .groupingKey(wholeItem())
                .aggregate(counting())
               .map((TimestampedEntry<String, Long> e) -> entry(e.getKey(), createBlacklistObjectEvent(Utility.fetchPolicy(e.getKey()), e.getTimestamp(), e.getValue())));
        timestampedEntryStreamStage.drainTo(Sinks.map(BL_MAP_NAME));

但有了这个我得到了例外

线程“main”java.lang.IllegalArgumentException 中的异常:“filterFn”必须在 com.hazelcast.jet.impl.pipeline 的 com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:301) 处可序列化。 ComputeStageImplBase.attachFilter(ComputeStageImplBase.java:129) 在 com.hazelcast.jet.impl.pipeline.StreamStageImpl.filter(StreamStageImpl.java:71) 在 com.visa.rls.handler.HazelcastJetIngetstResultHandler.buildPipeline(HazelcastJetIngetstResultHandler.java:120)在 com.visa.rls.handler.HazelcastJetIngetstResultHandler.run(HazelcastJetIngetstResultHandler.java:84) 在 com.visa.rls.handler.HazelcastJetIngetstResultHandler.main(HazelcastJetIngetstResultHandler.java:58) 引起:java.io.NotSerializableException: com.hazelcast .map.impl.proxy.MapProxyImpl 在 java.io.ObjectOutputStream。writeObject0(ObjectOutputStream.java:1184) 在 java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) 在 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java :1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java .io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 在 com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:299) ... 5 更多writeObject0(ObjectOutputStream.java:1174) 在 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 在 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java :1432) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 在 java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 在 com.hazelcast.jet.impl.util.Util.checkSerializable(Util. java:299) ... 还有 5 个writeObject0(ObjectOutputStream.java:1174) 在 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 在 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java :1432) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 在 java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 在 com.hazelcast.jet.impl.util.Util.checkSerializable(Util. java:299) ... 还有 5 个writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:299) ... 还有 5 个writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:299) ... 还有 5 个

4

1 回答 1

0

policyMap在过滤器功能内部使用,但IMap不可序列化。它被 lambda 表达式捕获。您必须在每个远程成员上获取IMap实例,您可以使用它filterUsingContext来代替filter

.filterUsingContext(
    ContextFactory.withCreateFn(jetInstance -> jetInstance.getMap(POLICY_MAP_NAME)),
    (policyMap, hash) -> policyMap.get(hash) != null
)
于 2019-04-05T17:50:40.093 回答