我在 Hazelcast Jet 使用 UDP 套接字作为数据源。
@Override
protected void init(Context context) throws Exception {
super.init(context);
jobFuture = context.jobFuture();
try {
getLogger().info("Connecting to socket " + hostAndPort());
final DatagramChannel acctchannel = DatagramChannel.open();
socket = acctchannel.socket();
socket.setSoTimeout(0);
socket.bind(new InetSocketAddress(host, port));
getLogger().info("Connected to socket " + hostAndPort());
} catch (IOException e) {
throw sneakyThrow(e);
}
}
@Override
public void close() {
if (socket != null) {
getLogger().info("Closing socket " + hostAndPort());
socket.close();
}
}
@Override
public boolean complete() {
try {
while (!jobFuture.isDone() && socket != null) {
final byte replyBytes[] = new byte[4096];
final DatagramPacket reply = new DatagramPacket(replyBytes, replyBytes.length);
socket.receive(reply);
emit(reply);
}
return true;
} catch (IOException e) {
throw sneakyThrow(e);
} finally {
close();
}
}
它仅在一个节点上正常工作。当我启动另一个节点(使用 hazelcase 多播连接)时,我的第一个节点尝试再次绑定 UDP 套接字。我还没有看到“关闭套接字”日志。
Hazelcast 日志在第一个节点:
七月 24, 2
017 6:54:24 PM com.hazelcast.internal.cluster.impl.MulticastJoiner
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2]
Members [1] {
Member [192.168.26.225]:5701 - af116710-cac6-4d5e-9c30-2ab04861c310 this
}
Jul 24, 2017 6:54:24 PM com.hazelcast.core.LifecycleService
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] [192.168.26.225]:5701 is STARTED
Filling Map
Starting job
Jul 24, 2017 6:54:24 PM com.hazelcast.internal.partition.impl.PartitionStateManager
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] Initializing cluster partition table arrangement...
Jul 24, 2017 6:54:24 PM com.hazelcast.jet.impl.operation.ExecuteJobOperation
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] Start executing job 0: dag
.vertex("datagram-source").localParallelism(1)
.vertex("map")
.vertex("sink").localParallelism(1)
.edge(between("datagram-source", "map").distributed())
.edge(between("map", "sink"))
Jul 24, 2017 6:54:24 PM com.hazelcast.jet.impl.operation.InitOperation
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] Initializing execution plan for job 0 from [192.168.26.225]:5701.
Jul 24, 2017 6:54:24 PM com.hazelcast.jet.impl.operation.ExecuteOperation
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] Start execution of plan for job 0 from caller [192.168.26.225]:5701.
Jul 24, 2017 6:54:24 PM org.eltex.softwlc.sorm.replicator.processor.UdpSource.datagram-source#0
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] Connecting to socket 0.0.0.0:41813
Jul 24, 2017 6:54:24 PM org.eltex.softwlc.sorm.replicator.processor.UdpSource.datagram-source#0
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] Connected to socket 0.0.0.0:41813
Jul 24, 2017 6:54:26 PM com.hazelcast.nio.tcp.SocketAcceptorThread
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] Accepting socket connection from /192.168.26.206:48830
Jul 24, 2017 6:54:26 PM com.hazelcast.nio.tcp.TcpIpConnectionManager
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] Established socket connection between /192.168.26.225:5701 and /192.168.26.206:48830
Jul 24, 2017 6:54:32 PM com.hazelcast.internal.cluster.ClusterService
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2]
Members [2] {
Member [192.168.26.225]:5701 - af116710-cac6-4d5e-9c30-2ab04861c310 this
Member [192.168.26.206]:5701 - d5f107bb-9cf9-491b-b902-9e964d7efa17
}
Jul 24, 2017 6:54:32 PM com.hazelcast.internal.partition.impl.MigrationManager
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] Re-partitioning cluster data... Migration queue size: 271
Jul 24, 2017 6:54:34 PM com.hazelcast.jet.impl.operation.InitOperation
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] Initializing execution plan for job 10000 from [192.168.26.206]:5701.
Jul 24, 2017 6:54:34 PM com.hazelcast.internal.partition.impl.MigrationThread
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] All migration tasks have been completed, queues are empty.
Jul 24, 2017 6:54:34 PM com.hazelcast.jet.impl.operation.ExecuteOperation
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] Start execution of plan for job 10000 from caller [192.168.26.206]:5701.
Jul 24, 2017 6:54:34 PM org.eltex.softwlc.sorm.replicator.processor.UdpSource.datagram-source#0
INFO: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] Connecting to socket 0.0.0.0:41813
Jul 24, 2017 6:54:34 PM com.hazelcast.jet.impl.execution.ExecutionService
WARNING: [192.168.26.225]:5701 [sorm-replicator] [0.4] [3.8.2] Exception in ProcessorTasklet{vertex=datagram-source, processor=org.eltex.softwlc.sorm.replicator.processor.UdpSource@931ff2f}
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method)
更改集群成员后关闭 udp 套接字的正确方法是什么?