10

我在多线程环境中使用 Jeromq,如下所示。下面是我的代码,其中的构造函数SocketManager首先连接到所有可用的套接字,然后我将它们放在方法中的liveSocketsByDatacentermap 中connectToZMQSockets。之后,我在同一个构造函数中启动一个后台线程,该线程每 30 秒运行一次,它调用updateLiveSockets方法来 ping 地图中已经存在的所有套接字,并根据这些套接字是否处于活动状态来liveSocketsByDatacenter更新地图。liveSocketsByDatacenter

多个读取器线程同时调用该方法以获取下一个实时可用的套接字,getNextSocket()然后我们使用该套接字在其上发送数据。所以我的问题是我们在多线程环境中正确使用 Jeromq 吗?因为我们刚刚在我们的生产环境中看到了一个异常,当我们试图将数据发送到那个活动套接字时,这个堆栈跟踪所以我不确定它是一个错误还是其他什么?

java.lang.ArrayIndexOutOfBoundsException: 256
at zmq.YQueue.push(YQueue.java:97)
at zmq.YPipe.write(YPipe.java:47)
at zmq.Pipe.write(Pipe.java:232)
at zmq.LB.send(LB.java:83)
at zmq.Push.xsend(Push.java:48)
at zmq.SocketBase.send(SocketBase.java:590)
at org.zeromq.ZMQ$Socket.send(ZMQ.java:1271)
at org.zeromq.ZFrame.send(ZFrame.java:131)
at org.zeromq.ZFrame.sendAndKeep(ZFrame.java:146)
at org.zeromq.ZMsg.send(ZMsg.java:191)
at org.zeromq.ZMsg.send(ZMsg.java:163)

下面是我的代码:

public class SocketManager {
    private static final Random random = new Random();
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>();
    private final ZContext ctx = new ZContext();

    private static class Holder {
        private static final SocketManager instance = new SocketManager();
    }

    public static SocketManager getInstance() {
        return Holder.instance;
    }

    private SocketManager() {
      connectToZMQSockets();
      scheduler.scheduleAtFixedRate(this::updateLiveSockets, 30, 30, TimeUnit.SECONDS);
    }

    // during startup, making a connection and populate once
    private void connectToZMQSockets() {
      Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
      for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
        List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
        liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
      }
    }

    private List<SocketHolder> connect(List<String> addresses, int socketType) {
        List<SocketHolder> socketList = new ArrayList<>();
        for (String address : addresses) {
          try {
            Socket client = ctx.createSocket(socketType);
            // Set random identity to make tracing easier
            String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
            client.setIdentity(identity.getBytes(ZMQ.CHARSET));
            client.setTCPKeepAlive(1);
            client.setSendTimeOut(7);
            client.setLinger(0);
            client.connect(address);

            SocketHolder zmq = new SocketHolder(client, ctx, address, true);
            socketList.add(zmq);
          } catch (Exception ex) {
            // log error
          }
        }
        return socketList;
    }

    // this method will be called by multiple threads concurrently to get the next live socket
    // is there any concurrency or thread safety issue or race condition here?
    public Optional<SocketHolder> getNextSocket() {
      for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
        Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
        if (liveSocket.isPresent()) {
          return liveSocket;
        }
      }
      return Optional.absent();
    }

    private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
      if (!CollectionUtils.isEmpty(listOfEndPoints)) {
        // The list of live sockets
        List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
        for (SocketHolder obj : listOfEndPoints) {
          if (obj.isLive()) {
            liveOnly.add(obj);
          }
        }
        if (!liveOnly.isEmpty()) {
          // The list is not empty so we shuffle it an return the first element
          return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
        }
      }
      return Optional.absent();
    }

    // runs every 30 seconds to ping all the socket to make sure whether they are alive or not
    private void updateLiveSockets() {
      Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;

      for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
        List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
        List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
        for (SocketHolder liveSocket : liveSockets) { // LINE A
          Socket socket = liveSocket.getSocket();
          String endpoint = liveSocket.getEndpoint();
          Map<byte[], byte[]> holder = populateMap();
          Message message = new Message(holder, Partition.COMMAND);

          // pinging to see whether a socket is live or not
          boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
          boolean isLive = (status) ? true : false;

          SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
          liveUpdatedSockets.add(zmq);
        }
        liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
      }
    }
}

以下是我如何从多个阅读器线程同时使用类getNextSocket()方法:SocketManager

// this method will be called from multiple threads
public boolean sendAsync(final long addr, final byte[] reco) {
  Optional<SocketHolder> liveSockets = SocketManager.getInstance().getNextSocket();
  return sendAsync(addr, reco, liveSockets.get().getSocket(), false);
}

public boolean sendAsync(final long addr, final byte[] reco, final Socket socket,
    final boolean messageA) {
  ZMsg msg = new ZMsg();
  msg.add(reco);
  boolean sent = msg.send(socket);
  msg.destroy();
  retryHolder.put(addr, reco);
  return sent;
}

  public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
    boolean sent = sendAsync(address, encodedRecords, socket, true);
    // if the record was sent successfully, then only sleep for timeout period
    if (sent) {
      try {
        TimeUnit.MILLISECONDS.sleep(500);
      } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
      }
    }
    // ...
    return sent;
  } 

我不认为这是正确的,我相信。似乎getNextSocket()可以返回一个0MQ socketto thread A。同时,计时器线程可以访问0MQ socket它以对其进行 ping 操作。在这种情况下thread A和定时器线程发生变异相同0MQ socket,这会导致问题。那么解决此问题的最佳和有效方法是什么?

注意: SocketHolder 是一个不可变的类

更新:

我只是注意到我的另一个盒子上发生了同样的问题,ArrayIndexOutOfBoundsException但这次是"YQueue"文件中的 71 行号。唯一一致的是256总是。所以肯定有一些与 256 相关的东西,我无法弄清楚这里的 256 是什么?

java.lang.ArrayIndexOutOfBoundsException: 256
    at zmq.YQueue.backPos(YQueue.java:71)
    at zmq.YPipe.write(YPipe.java:51)
    at zmq.Pipe.write(Pipe.java:232)
    at zmq.LB.send(LB.java:83)
    at zmq.Push.xsend(Push.java:48)
    at zmq.SocketBase.send(SocketBase.java:590)
    at org.zeromq.ZMQ$Socket.send(ZMQ.java:1271)
    at org.zeromq.ZFrame.send(ZFrame.java:131)
    at org.zeromq.ZFrame.sendAndKeep(ZFrame.java:146)
    at org.zeromq.ZMsg.send(ZMsg.java:191)
    at org.zeromq.ZMsg.send(ZMsg.java:163)
4

1 回答 1

2

事实 #0:ZeroMQ 不是线程安全的——根据定义

虽然 ZeroMQ 文档和 Pieter HINTJENS 的优秀书籍“Code Connected. Volume 1”并没有忘记尽可能提醒这一事实,但在线程之间返回甚至共享 ZeroMQ 套接字实例的想法不时出现。当然,类实例的方法可能会将这种几乎“隐藏”在其内部方法和属性中,但适当的设计努力应该防止任何此类副作用,没有例外,没有任何借口。

共享,如果有定量事实的合理支持,可能zmq.Context()Context().配置和性能偏好的相应组合。

那么解决此问题的最佳和有效方法是什么?

永远不要共享 ZeroMQ 套接字。从来没有,确实。即使最新的发展开始承诺在不久的将来会在这个方向发生一些变化。用共享污染任何高性能、低延迟的分布式系统设计是一个坏习惯。不共享是该领域的最佳设计原则。


是的,我可以看到我们不应该在线程之间共享套接字,但是在我的代码中
,您认为解决此问题的最佳方法是什么?

是的,解决这个问题的最好和最有效的方法是永远不要共享 ZeroMQ 套接字。

这意味着永远不要返回任何对象,其属性是 ZeroMQ 套接字(您主动构建并从类方法中大量返回.connect(){...}。在您的情况下,似乎保留了所有类方法private,这可能会融合问题允许“其他线程”接触类私有套接字实例,但同样的原则也必须在所有属性级别上得到认可,以便有效。最后,这个“融合”得到了
public static SocketManager getInstance()捷径
并被提供任何外部询问者以直接访问共享 ZeroMQ 套接字的类私有实例。

如果某些文档在几乎每一章中都明确警告不要分享东西,那么一个人宁愿不分享这些东西。

因此,重新设计方法,使其SocketManager获得更多功能,因为它是类方法,它将执行嵌入的必备功能,从而明确防止任何外部世界线程接触不可共享的实例,如 ZeroMQ 出版物中所述。

接下来是资源清单:您的代码似乎每 30 秒重新检查一次所有感兴趣的数据中心中的世界状态。这实际上每分钟创建两次新的 List 对象。虽然您可能会推测性地让 Garbage Collector 整理所有抖动,但不会从任何地方进一步引用,但这对于嵌入在您之前重新检查运行的 List-s 中的 ZeroMQ 相关对象来说不是一个好主意。ZeroMQ 对象仍然从内部引用Zcontext()- ZeroMQ Context()-core-factory 实例化的 I/O-thread(s),它也可以被视为 ZeroMQ 套接字库存资源管理器。因此,所有new-created 的套接字实例不仅从 -side 获得外部句柄java,而且从内部获得内部句柄(Z)Context(). 到目前为止,一切都很好。但是,在代码中的任何地方都没有看到任何方法,该方法将取消委托对象实例中的任何和所有 ZeroMQ 套接字,这些套接字已从 -side 解除关联java,但仍从 -side 引用(Z)Context()。分配资源的明确资源退役是一种公平的设计方面的做法,对于有限或受其他限制的资源来说更是如此。{ "cheap" | 的方法可能会有所不同。“昂贵的”}-这种资源管理处理的维护成本(ZeroMQ 套接字实例作为一些轻量级的“消耗品/一次性”处理非常昂贵......但这是另一回事)。

因此,还要添加一组适当的资源重用/资源拆除方法,这将使new-created 套接字的总量回到您的控制责任范围内(您的代码负责(Z)Context()-domain内有多少套接字处理程序-of-resources-control 可能会被创建并且必须保持被管理——无论是否有意)。

有人可能会反对自动检测和(可能被很好地延迟)垃圾收集可能会带来一些“承诺”,但是,您的代码仍然负责适当的资源管理,即使 LMAX 人也永远不会获得如此勇敢的表现,如果他们依赖来自标准 gc 的“承诺”。你的问题比 LMAX 顶级性能要解决的问题要糟糕得多。您的代码(到目前为止已发布)对 ZeroMQ 相关资源没有任何.close()作用.term()。在一个消费不受控制(分布式需求)的生态系统中,这是完全不可能的做法。您必须保护您的船免于超载超出您知道它可以安全处理和动态卸载每个箱子的限制,这些箱子在“对面海岸”没有接收者。

那是船长(你的代码设计师)的责任。

没有明确告诉负责最低级别( ZeroMQ -floor )的库存管理的水手Context()一些箱子要卸载,问题仍然是你的。标准gc的命令链不会“自动”执行此操作,无论“承诺”看起来如何,它都不会。因此,对您的 ZeroMQ 资源管理要明确,根据要采取的这些步骤来评估返回代码,并适当地处理在您的代码显式控制下执行这些资源管理操作所引发的任何和所有异常。

较低的(如果不是最低可实现的)资源利用率-信封和更高的(如果不是最高可实现的)性能是正确完成这项工作的奖励。LMAX 家伙是一个很好的例子,在这方面做得非常好,超出了标准的 java“承诺”,因此人们可以向最好的中的最好学习。


声明的调用签名与使用的调用签名似乎不匹配:
虽然我在这一点上可能是错的,因为我的大部分设计工作都不是在多态调用接口中,签名中似乎存在不匹配,发布为:

private List<SocketHolder> connect( Datacenters  dc,                     // 1-st
                                    List<String> addresses,              // 2-nd
                                    int          socketType              // 3-rd
                                    ) {
        ... /* implementation */
}

以及
实际的方法调用,仅通过以下方式在方法
内部调用connectToZMQSockets()

        List<SocketHolder> addedColoSockets = connect( entry.getValue(), // 1-st
                                                       ZMQ.PUSH          // 2-nd
                                                       );
于 2017-11-03T13:29:33.530 回答