0

我有一个实现NATS 队列
的小项目 这是代码:

import * as NATS from '../node_modules/nats'              // for typescript
var nats = require('nats');

var config = require('./config');
var opt: NATS.ClientOpts = {'noRandomize': false, 'reconnect': true, 
'maxReconnectAttempts': -1, 'servers':config.nat.servers, 'user': 
config.nat.user , 'pass': config.nat.pass };
var nc: NATS.Client = nats.connect(opt);

nc.on('error', function(e) {
    console.error('Error nats: ' + e);
});

nc.on('close', function() {
    console.error('CLOSED nats');
});

nc.subscribe('serviceA', { 'queue': 'A' }, function (request, replyTo) {
    console.debug('I exec serviceA via nats:');
    j = JSON.parse(request);
    console.debug(j);
    let ok = {"res": "i am response for service A"}
    nc.publish(replyTo, JSON.stringify(ok));
}

let cmd = '{"a": 5, "b": "i am sample"}';
nc.requestOne('serviceA', cmd, {}, 15000, function (response) {
    if (response.code && response.code === nats.REQ_TIMEOUT) {
        console.error('server timeout');
        console.error(response.code);
    } else {
        console.log('A see this response: ' + response);
    }
});

nc.subscribe('serviceB', { 'queue': 'B' }, function (request, replyTo) {
    console.debug('I exec serviceB via nats:');
    j = JSON.parse(request);
    console.debug(j);
    let ok = {"res": "i am response for service B"}
    nc.publish(replyTo, JSON.stringify(ok));
}

let cmd = '{"c": 5, "d": "i am sample"}';
nc.requestOne('serviceB', cmd, {}, 15000, function (response) {
    if (response.code && response.code === nats.REQ_TIMEOUT) {
        console.error('server timeout');
        console.error(response.code);
    } else {
        console.log('B see this response: ' + response);
    }
});

如您所见,有 2 个服务 - serviceAon queueAserviceBon queueB以及 2 个客户端:第一个调用服务 A,第二个调用服务 B

NATS 实现 Subject ('serviceA''serviceB')

现在,我想尝试使用 ØMQ 转换示例 我发现使用 ZeroMQ 的类似示例

但我在主题上找不到任何样本。

或许ØMQROUTER用来实现一个主题

您能帮我将主题实现到 ZeroMQ 示例中吗?

4

1 回答 1

2

问:可以在 ZeroMQ 中使用 Subject 吗?
答:是的,它是:

长话短说 - 这样做不需要任何东西zmq.ROUTER,只需使用PUB / SUB正式模式即可。

注意: ZeroMQ Socket()-instance不是tcp-socket-as-you-know-it。

最好
阅读[ ZeroMQ 层次结构在不到 5 秒内]部分中的主要概念差异。

发布方:

import zmq
aCtx = zmq.Context()
aPub = aCtx.Socket( zmq.PUB )
aPub.bind( "tcp://123.456.789.012:3456" )
aPub.setsockopt(    zmq.LINGER,   0 )
aPub.setsockopt(    zmq.<whatever needed to fine-tune the instance>, <val> )
i = 0
while true:
      try:
         aPub.send( "ServiceA::[#{0:_>12d}] a Hello World Message.".format( i ) )
         aPub.send( "ServiceABCDEFGHIJKLMNOPQRSTUVWXYZ........" )
         aPub.send( "ServiceB::[#{0:_>12d}] another message...".format( i  / 2 ) ) if ( i == ( 2 * ( i / 2 ) ) ) else pass
         sleep( 1 ); i += 1

      except KeyboardInterrupt:
          print( "---< will exit >---" )
          break
print( "---< will terminate ZeroMQ resources >---" )
aPub.close()
aCtx.term()

订阅方:

import zmq
aCtx = zmq.Context()
aSub = aCtx.Socket( zmq.SUB )
aSub.connect( "tcp://123.456.789.012:3456" )
aSub.setsockopt( zmq.LINGER, 0 )
aSub.setsockopt( zmq.SUBSCRIBE, "ServiceA" ) # Subject ( 'serviceA' and 'serviceB' ) 
aSub.setsockopt( zmq.SUBSCRIBE, "ServiceB" ) # Kindly see the comments below
#                                            # Kindly see API on subscription management details
#
#                                            # Yet another "dimension"
#                                            #     to join ingress
#                                            #     from multiple sources
#Sub.connect( "<transport-class>://<addr>:<port>" )
#              <transport-class :: { inproc | ipc | tcp | pgm | epgm | vmci }
#   .connect()-s the same local SUB-AccessPoint to another PUB-side-AccessPoint
#                to allow the PUB/SUB Scalable Formal Communication Archetype Pattern
#                join a message flow from different source PUB-sides
#Sub.setsockopt( zmq.SUBSCRIBE, "ServiceZ" )

while true:
      try:
          print( "<<< (((_{0:s}_)))".format( aSub.recv() ) )
      except KeyboardInterrupt:
          print( "---< will exit >---" )
          break

print( "---< will terminate ZeroMQ resources >---" )
aSub.close()
aCtx.term()
于 2017-11-29T17:33:31.753 回答