我正在尝试在 zmq 上实现“文件调度程序”(实际上是 jeromq,我宁愿避免使用 jni)。
我需要的是将传入文件负载平衡到处理器:
- 每个文件仅由一个处理器处理
- 文件可能很大,所以我需要管理文件传输
理想情况下,我想要类似https://github.com/zeromq/filemq但是
- 具有推/拉行为而不是发布/订阅
- 能够处理接收到的文件而不是将其写入磁盘
我的想法是混合使用 taskvent/tasksink 和 asyncsrv 示例。
客户端:
- 一个 PULL 套接字被通知要处理的文件
- 一个 DEALER 套接字逐块处理(异步)文件传输块
服务器端:
- 一个 PUSH 套接字来分派传入的文件(名称)
- 一个 ROUTER 套接字来处理文件请求
- 一些 DEALER 工作人员管理客户端的文件传输并通过 inproc 代理连接到路由器
我的第一个问题是:这似乎是正确的方法吗?有没有更简单的?
我的第二个问题是:我当前的实现卡在发送实际文件数据上。
- 客户端由服务器通知,并发出请求。
- 服务器工作人员收到请求,并将响应写回 inproc 队列,但响应似乎永远不会离开服务器(在 wireshark 中看不到),客户端卡在 poller.poll 上等待响应。
这不是套接字已满并丢弃数据的问题,我从一次性发送的非常小的文件开始。
有什么见解吗?
谢谢!
===================
按照 raffian 的建议,我简化了我的代码,删除了推/拉额外的插座(现在你说它确实有意义)
我只剩下“不工作”的插座了!
这是我当前的代码。它有许多目前超出范围的缺陷(客户端 ID、下一个块等)
现在,我只是想让两个人大致按照这个顺序交谈
服务器
object FileDispatcher extends App { val context = ZMQ.context(1) // server is the frontend that pushes filenames to clients and receives requests val server = context.socket(ZMQ.ROUTER) server.bind("tcp://*:5565") // backend handles clients requests val backend = context.socket(ZMQ.DEALER) backend.bind("inproc://backend") // files to dispatch given in arguments args.toList.foreach { filepath => println(s"publish $filepath") server.send("newfile".getBytes(), ZMQ.SNDMORE) server.send(filepath.getBytes(), 0) } // multithreaded server: router hands out requests to DEALER workers via a inproc queue val NB_WORKERS = 1 val workers = List.fill(NB_WORKERS)(new Thread(new ServerWorker(context))) workers foreach (_.start) ZMQ.proxy(server, backend, null) } class ServerWorker(ctx: ZMQ.Context) extends Runnable { override def run() { val worker = ctx.socket(ZMQ.DEALER) worker.connect("inproc://backend") while (true) { val zmsg = ZMsg.recvMsg(worker) zmsg.pop // drop inner queue envelope (?) val cmd = zmsg.pop //cmd is used to continue/stop cmd.toString match { case "get" => val file = zmsg.pop.toString println(s"clientReq: cmd: $cmd , file:$file") //1- brute force: ignore cmd and send full file in one go! worker.send("eof".getBytes, ZMQ.SNDMORE) //header indicates this is the last chunk val bytes = io.Source.fromFile(file).mkString("").getBytes //dirty read, for testing only! worker.send(bytes, 0) println(s"${bytes.size} bytes sent for $file: "+new String(bytes)) case x => println("cmd "+x+" not implemented!") } } } }
客户
object FileHandler extends App { val context = ZMQ.context(1) // client is notified of new files then fetches file from server val client = context.socket(ZMQ.DEALER) client.connect("tcp://*:5565") val poller = new ZMQ.Poller(1) //"poll" responses poller.register(client, ZMQ.Poller.POLLIN) while (true) { poller.poll val zmsg = ZMsg.recvMsg(client) val cmd = zmsg.pop val data = zmsg.pop // header is the command/action cmd.toString match { case "newfile" => startDownload(data.toString)// message content is the filename to fetch case "chunk" => gotChunk(data.toString, zmsg.pop.getData) //filename, chunk case "eof" => endDownload(data.toString, zmsg.pop.getData) //filename, last chunk } } def startDownload(filename: String) { println("got notification: start download for "+filename) client.send("get".getBytes, ZMQ.SNDMORE) //command header client.send(filename.getBytes, 0) } def gotChunk(filename: String, bytes: Array[Byte]) { println("got chunk for "+filename+": "+new String(bytes)) //callback the user here client.send("next".getBytes, ZMQ.SNDMORE) client.send(filename.getBytes, 0) } def endDownload(filename: String, bytes: Array[Byte]) { println("got eof for "+filename+": "+new String(bytes)) //callback the user here } }