在教堂团队解决并重新确认之前,请仅使用有效负载测试任何教堂ZMQ 模块服务,int
并可能避免使用PUB/SUB
原型(由于字符串匹配未决问题)。
正如@Nick最近在这里披露的那样,还有一种方法可以让ZMQ
服务满足 ZeroMQ API 合规性并完全打开通向异构分布式系统的交叉兼容之门:
为了发送字符串,Chapel 发送一条带有字符串大小的消息,然后发送另一条带有字节缓冲区的消息;接收工作类似。
这意味着您的一次呼叫<aSocket>.recv( string )
实际上是在后台进行两次背靠背呼叫zmq_recv()
。使用该REQ/REP
模式,这两个背靠背zmq_recv()
调用将 ZeroMQ 状态机置于无效状态,因此出现错误消息。
这绝对是 ChapelZMQ
模块的一个错误。
几个步骤可以让场景更加清晰:
在诊断根本原因之前,让我提出一些要采取的措施。ZeroMQ 是一个非常强大的框架,几乎找不到比REQ/REP
.
内部有限状态自动机(实际上是分布式FSA)都是阻塞的(设计使然,以强制在连接的对等方(不必只是前两个)之间传递类似钟摆的消息,以便 SEQ [A]- .send()
- .recv()
- .send()
- .recv()
-... 在一侧 [A] 与 [B]- .recv()
- .send()
- .recv()
-...的 SEQ 匹配双方进入等待状态,其中 [A] 和 [B] 都希望从通道的另一侧接收下一条消息。
这就是说,我的建议是首先进行一个最简单的测试 - 使用一对不受限制的单工通道(无论是 [A] PUSH
/[B] PULL
+ [B] PUSH
/[A] PULL
,还是更复杂的方案PUB/SUB
)。
不进入完全网格化、多代理基础设施的设置,而是一个简化版本(不需要和打算使用通道,但如果扩展模型方案ROUTER/DEALER
,可能会复制(反向) -s):PUSH/PULL
由于当前的教堂实施限制,尚需在隐含的限制上花费更多的努力:
在 Chapel 中,在 a 上发送或接收消息时Socket
使用多部分消息和Reflection
模块来序列化原始数据类型和用户定义的数据类型。目前,该ZMQ
模块序列化原始数字类型、字符串和由这些类型组成的记录。字符串被编码为长度(如int),后跟字符数组(以字节为单位)。
如果这些评论不仅仅是线级内部性并扩展到顶级 ZeroMQ 消息/信号层(参考管理订阅的详细信息,其中 ZeroMQ 主题-过滤器匹配基于与接收到的消息的左侧精确匹配等)。
python端享有更广泛的设计自由度:
#
# python
# #########
import time
import zmq; context = zmq.Context()
print( "INF: This Agent uses ZeroMQ v.{0:}".format( zmq.__version__ ) )
dataAB = context.socket( zmq.REQ )
dataAB.setsockopt( zmq.LINGER, 0 ) # ( a must in pre v4.0+ )
dataAB.connect( "tcp://localhost:5555" )
heartB = context.socket( zmq.SUB )
heartB.setsockopt( zmq.LINGER, 0 ) # ( a must in pre v4.0+ )
heartB.setsockopt( zmq.CONFLATE, 0 ) # ( ignore history, keep just last )
heartB.connect( "tcp://localhost:6666" )
heartB.setsockopt( zmq.SUBSCRIBE, "[chapel2python.HB]" )
heartB.setsockopt( zmq.SUBSCRIBE, "" ) # in case [Chapel] complicates serialisation
# -------------------------------------------------------------------
while ( True ):
pass; print( "INF: waiting for a [Chapel] HeartBeat-Message" )
hbIN = heartB.recv( zmq.NOBLOCK );
if len( hbIN ) > 0:
pass; print( "ACK: [Chapel] Heart-Beat-Message .recv()-ed" )
break
else:
time.sleep( 0.5 )
# -------------------------------------------------------------------
for request in range(10):
pass; print( "INF: Sending a request %s to [Chapel] ..." % request )
dataAB.send( str( "Yo" ) )
pass; print( "INF: a blocking .recv(), [Chapel] is to answer ..." )
message = dataAB.recv()
pass; print( "INF: [Chapel] said %s" % message )
# -------------------------------------------------------------------
dataAB.close()
heartB.close()
context.term()
# -------------------------------------------------------------------
try:/except:/finally:
应该为KeyboardInterrupt
无限while()
循环等的 -s 提供一些进一步的构造,但为了清楚起见,这里省略了这些构造。
在教堂方面,我们将尽最大努力跟上 API 的步伐,原样:
文档按原样无助于决定用户代码是否有控制选项,对.send()
/.recv()
方法的调用是否总是隐式阻塞,而您的代码假定它在阻塞模式下运行(对于任何分布式系统设计,我始终强烈反对这种做法,阻塞是一种糟糕的做法 -更多内容请参见此处)。
虽然 C 级调用zmq_send()
可能是阻塞调用(取决于套接字类型和标志参数),但希望语义阻塞调用Socket.send()
允许在任务层支持的 OS 线程上调度其他 Chapel 任务。在内部,ZMQ 模块使用非阻塞调用来传输数据,并在调用会阻塞时通过 chpl_task_yield() 让出给zmq_send()
任务层。zmq_recv()
资源
use ZMQ;
use Reflection;
var context: Context;
var dataBA = context.socket( ZMQ.REP ),
heartB = context.socket( ZMQ.PUB );
var WAITms = 0; // setup as explicit int
dataBA.setsockopt( ZMQ.LINGER, WAITms );// a must
heartB.setsockopt( ZMQ.LINGER, WAITms );// a preventive step
dataBA.bind( "tcp://*:5555" ); // may reverse .bind()/.connect()
writeln( "INF: This Agent uses ZeroMQ v.", ZMQ.version() );
// /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/
config var MAX_LOOPS = 120; // --MAX_LOOPS = 10 set on cmdline
var i = 0;
while ( i < MAX_LOOPS ) {
// --------------------------------------- // .send HeartBeat
heartB.send( "[chapel2python.HB]" );
i += 1;
writeln( "INF: Sent HeartBeat # ", i );
// --------------------------------------- // .send HeartBeat
var msg = dataBA.recv( string ); // .recv() from python
// - - - - - - - - - - - - - - - - - - - - // - - - - -WILL-[BLOCK]!!!
// ( ref. src )
writeln( "INF: [Chapel] got: ",
getField( msg, 1 )
);
dataBA.send( "back from chapel" ); // .send() to python
}
writeln( "INF: MAX_LOOPS were exhausted,",
" will exit-{} & .close()",
" channels' sockets before",
" [Chapel] exits to system."
);
// /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/
dataBA.close( WAITms ); // explicit graceful termination
heartB.close( WAITms ); // explicit graceful termination
context.deinit(); // explicit context termination
// as not yet sure
// on auto-termination
// warranties