1

我正在尝试在 zmq 上实现“文件调度程序”(实际上是 jeromq,我宁愿避免使用 jni)。

我需要的是将传入文件负载平衡到处理器:

  • 每个文件仅由一个处理器处理
  • 文件可能很大,所以我需要管理文件传输

理想情况下,我想要类似https://github.com/zeromq/filemq但是

  • 具有推/拉行为而不是发布/订阅
  • 能够处理接收到的文件而不是将其写入磁盘

我的想法是混合使用 taskvent/tasksink 和 asyncsrv 示例。

客户端:

  • 一个 PULL 套接字被通知要处理的文件
  • 一个 DEALER 套接字逐块处理(异步)文件传输块

服务器端:

  • 一个 PUSH 套接字来分派传入的文件(名称)
  • 一个 ROUTER 套接字来处理文件请求
  • 一些 DEALER 工作人员管理客户端的文件传输并通过 inproc 代理连接到路由器

我的第一个问题是:这似乎是正确的方法吗?有没有更简单的?

我的第二个问题是:我当前的实现卡在发送实际文件数据上。

  • 客户端由服务器通知,并发出请求。
  • 服务器工作人员收到请求,并将响应写回 inproc 队列,但响应似乎永远不会离开服务器(在 wireshark 中看不到),客户端卡在 poller.poll 上等待响应。

这不是套接字已满并丢弃数据的问题,我从一次性发送的非常小的文件开始。

有什么见解吗?

谢谢!

===================

按照 raffian 的建议,我简化了我的代码,删除了推/拉额外的插座(现在你说它确实有意义)

我只剩下“不工作”的插座了!

这是我当前的代码。它有许多目前超出范围的缺陷(客户端 ID、下一个块等)

现在,我只是想让两个人大致按照这个顺序交谈

  • 服务器

    object FileDispatcher extends App
    {
      val context = ZMQ.context(1)
    
      // server is the frontend that pushes filenames to clients and receives requests
      val server = context.socket(ZMQ.ROUTER)
      server.bind("tcp://*:5565")
      // backend handles clients requests
      val backend = context.socket(ZMQ.DEALER)
      backend.bind("inproc://backend")
    
      // files to dispatch given in arguments
      args.toList.foreach { filepath =>
        println(s"publish $filepath")
        server.send("newfile".getBytes(), ZMQ.SNDMORE)
        server.send(filepath.getBytes(), 0)
      }
    
      // multithreaded server: router hands out requests to DEALER workers via a inproc queue
      val NB_WORKERS = 1
      val workers = List.fill(NB_WORKERS)(new Thread(new ServerWorker(context)))
      workers foreach (_.start)
      ZMQ.proxy(server, backend, null)
    }
    
    class ServerWorker(ctx: ZMQ.Context) extends Runnable 
    {
      override def run() 
      {
        val worker = ctx.socket(ZMQ.DEALER)
        worker.connect("inproc://backend")
    
        while (true) 
        {
          val zmsg = ZMsg.recvMsg(worker) 
          zmsg.pop // drop inner queue envelope (?)
          val cmd = zmsg.pop //cmd is used to continue/stop
          cmd.toString match {
            case "get" => 
              val file = zmsg.pop.toString
              println(s"clientReq: cmd: $cmd , file:$file")
              //1- brute force: ignore cmd and send full file in one go!
              worker.send("eof".getBytes, ZMQ.SNDMORE) //header indicates this is the last chunk
              val bytes = io.Source.fromFile(file).mkString("").getBytes //dirty read, for testing only!
              worker.send(bytes, 0)
              println(s"${bytes.size} bytes sent for $file: "+new String(bytes))
            case x => println("cmd "+x+" not implemented!")
          }
        }
      }
    }
    
  • 客户

    object FileHandler extends App
    {
      val context = ZMQ.context(1)
    
      // client is notified of new files then fetches file from server
      val client = context.socket(ZMQ.DEALER)
      client.connect("tcp://*:5565")
      val poller = new ZMQ.Poller(1) //"poll" responses
      poller.register(client, ZMQ.Poller.POLLIN)
    
      while (true) 
      {
        poller.poll
        val zmsg = ZMsg.recvMsg(client)
        val cmd = zmsg.pop
        val data = zmsg.pop
        // header is the command/action
        cmd.toString match {
          case "newfile" => startDownload(data.toString)// message content is the filename to fetch
          case "chunk" => gotChunk(data.toString, zmsg.pop.getData) //filename, chunk
          case "eof" => endDownload(data.toString, zmsg.pop.getData) //filename, last chunk
        }
      }
    
      def startDownload(filename: String) 
      {
        println("got notification: start download for "+filename)
        client.send("get".getBytes, ZMQ.SNDMORE) //command header
        client.send(filename.getBytes, 0) 
      }
    
      def gotChunk(filename: String, bytes: Array[Byte]) 
      {
        println("got chunk for "+filename+": "+new String(bytes)) //callback the user here
        client.send("next".getBytes, ZMQ.SNDMORE)
        client.send(filename.getBytes, 0) 
      }
    
      def endDownload(filename: String, bytes: Array[Byte]) 
      {
        println("got eof for "+filename+": "+new String(bytes)) //callback the user here
      }
    }
    
4

1 回答 1

3

在客户端上,您不需要PULLwith DEALER。DEALER 是PUSHPULL组合的,所以只使用 DEALER,你的代码会更简单。

服务器也是如此,除非您正在做一些特殊的事情,否则您不需要PUSHwith ROUTER,路由器是双向的。

服务器工作人员收到请求,并将响应写回 inproc 队列,但响应似乎永远不会离开服务器(在 wireshark 中看不到),客户端卡在 poller.poll 上等待响应。

代码问题

在服务器中,您args.toList.foreach在启动代理之前发送文件,这可能就是为什么没有任何东西离开服务器。先启动代理,再使用;此外,一旦您调用ZMQProxy(..),代码会无限期地阻塞,因此您需要一个单独的线程来发送文件路径。

客户端可能对轮询器有问题。轮询的典型模式是:

ZMQ.Poller items = new ZMQ.Poller (1);
items.register(receiver, ZMQ.Poller.POLLIN);
while (true) {

  items.poll(TIMEOUT);
  if (items.pollin(0)) {
    message = receiver.recv(0);

在上面的代码中,1)轮询直到超时,2)然后检查消息,如果可用,3)用receiver.recv(0). 但是在您的代码中,您轮询然后直接进入recv()而不检查。您需要在调用之前检查轮询器是否有该轮询套接字的消息recv(),否则,如果没有消息,接收器将挂起。

于 2013-12-02T22:20:16.880 回答