1

我试图通过打开读取流来读取一些文件,并通过 ZMQ 将文件块发送到另一个进程以使用它们。流按应有的方式工作,但是当我启动工作程序时,它看不到已发送的数据。

我尝试每 500 毫秒通过套接字发送数据,而不是在回调中,当我启动 worker 时,它会收集所有以前的数据块:

sender = zmq.socket('push')
setInterval(() ->
  console.log('sending work');
  sender.send('some work')
, 500)

receiver = zmq.socket("pull")
receiver.on "message", (msg) ->
  console.log('work is here: %s', msg.toString())

输出:

sending work
sending work
sending work
sending work
sending work
// here I start the worker
sending work
work is here: some work
work is here: some work
work is here: some work
work is here: some work
work is here: some work
work is here: some work
sending work
work is here: some work
sending work
work is here: some work
sending work
work is here: some work

因此,当工作人员启动时,它首先会提取所有以前的数据,然后每次有新数据进来时都会提取它。这在我这样做时不适用:

readStream = fs.createReadStream("./data/pg2701.txt", {'bufferSize': 100 * 1024})
readStream.on "data", (data) ->
  console.log('sending work');
  sender.send('some work'); // I'd send 'data' if it worked..

在这种情况下,工作人员根本不会提取任何数据。这些套接字是否应该创建队列?我在这里想念什么?

4

1 回答 1

0

是的,推送套接字在达到 HWM 之前一直处于阻塞状态,并且没有人可以发送。也许发件人还没有绑定,试试这样的:

sender.bind('address', function(err) {
  if (err) throw err;
  console.log('sender bound!');

  // the readStream code.
}

您的代码示例中也缺少 a connect,我敢打赌它在那里,但也许您忘记了。

于 2012-11-08T05:17:41.287 回答