1

我在 Hazelcast Jet 使用 UDP 套接字作为数据源。

@Override
protected void init(Context context) throws Exception {
    super.init(context);
    jobFuture = context.jobFuture();

    try {
        getLogger().info("Connecting to socket " + hostAndPort());

        final DatagramChannel acctchannel = DatagramChannel.open();
        socket = acctchannel.socket();
        socket.setSoTimeout(0);
        socket.bind(new InetSocketAddress(host, port));

        getLogger().info("Connected to socket " + hostAndPort());
    } catch (IOException e) {
        throw sneakyThrow(e);
    }
}

@Override
public void close() {
    if (socket != null) {
        getLogger().info("Closing socket " + hostAndPort());
        socket.close();
    }
}

@Override
public boolean complete() {
    try {
        while (!jobFuture.isDone() && socket != null) {
            final byte replyBytes[] = new byte[4096];
            final DatagramPacket reply = new DatagramPacket(replyBytes, replyBytes.length);

            socket.receive(reply);
            emit(reply);
        }
        return true;
    } catch (IOException e) {
        throw sneakyThrow(e);
    } finally {
        close();
    }
}

它仅在一个节点上正常工作。当我启动另一个节点(使用 hazelcase 多播连接)时,我的第一个节点尝试再次绑定 UDP 套接字。我还没有看到“关闭套接字”日志。

Hazelcast 日志在第一个节点:

七月 24, 2

017 6:54:24 PM com.hazelcast.internal.cluster.impl.MulticastJoiner
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] 


Members [1] {
    Member [192.168.26.225]:5701 - af116710-cac6-4d5e-9c30-2ab04861c310 this
}

Jul 24, 2017 6:54:24 PM com.hazelcast.core.LifecycleService
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] [192.168.26.225]:5701 is STARTED
Filling Map
Starting job
Jul 24, 2017 6:54:24 PM com.hazelcast.internal.partition.impl.PartitionStateManager
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] Initializing cluster partition table arrangement...
Jul 24, 2017 6:54:24 PM com.hazelcast.jet.impl.operation.ExecuteJobOperation
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] Start executing job 0: dag
    .vertex("datagram-source").localParallelism(1)
    .vertex("map")
    .vertex("sink").localParallelism(1)
    .edge(between("datagram-source", "map").distributed())
    .edge(between("map", "sink"))

Jul 24, 2017 6:54:24 PM com.hazelcast.jet.impl.operation.InitOperation
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] Initializing execution plan for job 0 from [192.168.26.225]:5701.
Jul 24, 2017 6:54:24 PM com.hazelcast.jet.impl.operation.ExecuteOperation
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] Start execution of plan for job 0 from caller [192.168.26.225]:5701.
Jul 24, 2017 6:54:24 PM org.eltex.softwlc.sorm.replicator.processor.UdpSource.datagram-source#0
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] Connecting to socket 0.0.0.0:41813
Jul 24, 2017 6:54:24 PM org.eltex.softwlc.sorm.replicator.processor.UdpSource.datagram-source#0
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] Connected to socket 0.0.0.0:41813
Jul 24, 2017 6:54:26 PM com.hazelcast.nio.tcp.SocketAcceptorThread
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] Accepting socket connection from /192.168.26.206:48830
Jul 24, 2017 6:54:26 PM com.hazelcast.nio.tcp.TcpIpConnectionManager
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] Established socket connection between /192.168.26.225:5701 and /192.168.26.206:48830
Jul 24, 2017 6:54:32 PM com.hazelcast.internal.cluster.ClusterService
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] 

Members [2] {
    Member [192.168.26.225]:5701 - af116710-cac6-4d5e-9c30-2ab04861c310 this
    Member [192.168.26.206]:5701 - d5f107bb-9cf9-491b-b902-9e964d7efa17
}

Jul 24, 2017 6:54:32 PM com.hazelcast.internal.partition.impl.MigrationManager
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] Re-partitioning cluster data... Migration queue size: 271
Jul 24, 2017 6:54:34 PM com.hazelcast.jet.impl.operation.InitOperation
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] Initializing execution plan for job 10000 from [192.168.26.206]:5701.
Jul 24, 2017 6:54:34 PM com.hazelcast.internal.partition.impl.MigrationThread
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] All migration tasks have been completed, queues are empty.
Jul 24, 2017 6:54:34 PM com.hazelcast.jet.impl.operation.ExecuteOperation
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] Start execution of plan for job 10000 from caller [192.168.26.206]:5701.
Jul 24, 2017 6:54:34 PM org.eltex.softwlc.sorm.replicator.processor.UdpSource.datagram-source#0
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] Connecting to socket 0.0.0.0:41813
Jul 24, 2017 6:54:34 PM com.hazelcast.jet.impl.execution.ExecutionService
WARNING: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] Exception in ProcessorTasklet{vertex=datagram-source, processor=org.eltex.softwlc.sorm.replicator.processor.UdpSource@931ff2f}
java.net.BindException: Address already in use
    at sun.nio.ch.Net.bind0(Native Method)

更改集群成员后关闭 udp 套接字的正确方法是什么?

4

0 回答 0