1

我刚刚开始学习 Hazelcast Jet。我的来源是 UDP 数据报。我想在 Jet 的某些节点上并行处理它,并通过“域”将它们重新发送到其他地址。我想使用带有加载程序的 Hazelcast IMDG IMap 通过“源 ip”获取“域”。

DAG dag = new DAG();        
Vertex source = dag.newVertex("datagram-source",
                UdpSocketP.supplier("0.0.0.0", 41813));
        source.localParallelism(1);

        Vertex mapper = dag.newVertex("map",
                map(new DomainMapper(instance.getMap("mysqlNas"))));

        Vertex sink = dag.newVertex("sink",
                Sinks.writeFile("logs"));
        sink.localParallelism(1);

但是当我尝试在 DistributedFunction 使用 IMap 时,我得到了异常

Exception in thread "main" java.lang.IllegalArgumentException: "metaSupplier" must be serializable
    at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:185)
    at com.hazelcast.jet.Vertex.<init>(Vertex.java:101)
    at com.hazelcast.jet.Vertex.<init>(Vertex.java:78)
    at com.hazelcast.jet.DAG.newVertex(DAG.java:79)
    at org.eltex.softwlc.sorm.replicator.JetServer.main(JetServer.java:46)
Caused by: java.io.NotSerializableException: com.hazelcast.jet.stream.impl.MapDecorator

域映射器代码:

package org.eltex.softwlc.sorm.replicator;

import com.hazelcast.core.IMap;
import com.hazelcast.jet.function.DistributedFunction;

import java.io.Serializable;
import java.net.DatagramPacket;

/**
 * Created by mickey on 21.07.17.
 */
public class DomainMapper implements DistributedFunction<DatagramPacket, IpData>, Serializable {

    private final IMap<String, NasValue> map;

    public DomainMapper(IMap<String, NasValue> map) {
        this.map = map;
    }

    @Override
    public IpData apply(DatagramPacket datagramPacket) {
        final IpData d = new IpData(datagramPacket, datagramPacket.getAddress().getHostAddress());
        System.out.println(d);

        final NasValue nasValue = map.get(datagramPacket.getAddress().getHostAddress());
        if (nasValue!=null) {
            d.setDomain(nasValue.getDomain());
        }

        return d;
    }
}

我的错误是什么?或者 Hazelcast Jet 对我的目的来说是错误的选择。

4

1 回答 1

2

问题是您正在尝试序列化整个IMap函数内部。一个直接的解决方法是编写一个自定义处理器,在其方法中访问 Hazelcast Jet 实例init()并从中查找其 IMap。由于init()代码是在目标成员上执行的,因此在所有反序列化之后,这将起作用。

但是,在更一般的层面上,您的目标似乎是“数据丰富”类型。我们希望在 Jet 中支持这一点的方式是通过“散列连接”操作,目前还不是一流的;但是有一个代码示例显示了该方法。您可以将整个IMap内容汇集到一个顶点,将其转换为一个平面HashMap并分发给所有丰富的处理器,或者您可以准备一个 HazelcastReplicatedMap将由丰富的处理器直接使用。

第一种方法意味着您处理 ; 的快照IMapReplicatedMap在第二个中,您可以在作业运行时继续更新。

最好去检查示例:HashMapEnrichmentReplicatedMapEnrichment

于 2017-07-24T09:40:17.593 回答