2

目标:在一次迭代中向所有订阅者发送大量消息。我有 40k-100k 条消息。我已经开始使用 PUB/SUB 套接字类型。

问题:订阅者收到的消息数量低于发布者发送的消息数量。如果我在发送每条消息后添加一个 Thread.Sleep(1),那么所有消息都会被传递,但是需要传递的消息数量很多,这意味着 40-100 秒的延迟。这是不可接受的。

下面的代码在 NetMQ (3.0.0) 中,它是 alpha 版本,但它只是作为示例,因为我已经使用 libzmq 3.2.4 (stable) 在 c 中实现了相同的代码。和simptoms是一样的。

发布者/服务器端:

using (var dbConn = new OracleConnection(ConfigurationManager.AppSettings["ConnString"]))
using (NetMQContext ctx = NetMQContext.Create())
{
  using (var publisher = ctx.CreatePublisherSocket())
  {
    publisher.Bind(ConfigurationManager.AppSettings["PubSocket"]);
    dbConn.Open();
    NetMQMessage m = new NetMQMessage();
    while (true)
    {
      var updateIds = new List<int>();
      var deletedIds = new List<int>();

      var changedRules = GetChangedItems(dbConn, ref updateIds);
      var deletedRules = GetDeletedItems(dbConn, ref deletedIds);

      foreach (var kvPair in changedRules)
      {
        var item= kvPair.Value;
        publisher.Send(ToCsvLine(item));
        //Thread.Sleep(1);
      }

      foreach (var kvPair in deletedRules)
      {
        var item = kvPair.Value;

        publisher.Send(ToCsvLine(item));
        //Thread.Sleep(1);
      }
      Thread.Sleep(1);
      publisher.Send("end");

      Console.WriteLine("Sent updated: {0}", updateIds.Count);
      Console.WriteLine("Sent deleted: {0}", deletedIds.Count);
      Thread.Sleep(6000);
    }
  }

订户/客户端:

using (NetMQContext ctx = NetMQContext.Create())
{
    using (var consumer = ctx.CreateSubscriberSocket())
    {
      consumer.Connect("tcp://192.168.1.122:6005");
      consumer.Subscribe("");

      int count = 0;
      while (true)
      {
        try
        {
          count++;
          string msg = consumer.ReceiveString();
          if (msg == "end")
          {
            Console.WriteLine("Count: {0}", count);
            count = 0;
          }
        }
        catch (Exception ex)
        {
          Console.WriteLine(ex.Message);
          Console.ReadLine();
        }
      }
    }
}
4

1 回答 1

5

ZMQ 默认将High Water Mark设置为 1000。它也不是 100% 准确的。我可以假设你可以在一个批次中达到或超过至少一半吗?如果是这样,那可能是你的问题。将您的 HWM 设置得更高,设置为 10,000 或 50,000(或者,出于测试目的,只需将其设置为 0 即可将其关闭),然后查看结果如何变化。

于 2014-06-02T17:55:22.230 回答