6

目前,我正在使用 JSON 库在发送方(JeroMQ)处序列化数据,并在接收方(C、ZMQ)处反序列化。但是,在解析时,JSON 库开始消耗大量内存,操作系统会终止该进程。所以,我想按原样发送浮点数组,即不使用 JSON。

现有的发件人代码如下(syn0并且syn1Double数组)。如果syn0syn1每个大约 100 MB,则在解析接收到的数组时会终止该进程,即以下代码段的最后一行:

import org.zeromq.ZMQ
import com.codahale.jerkson
socket.connect("tcp://localhost:5556")

socket.send(json.JSONObject(Map("syn0"->json.JSONArray(List.fromArray(syn0Global)))).toString())
println("SYN0 Request sent”)
val reply_syn0 = socket.recv(0)
println("Response received after syn0: " + new String(reply_syn0))
logInfo("Sending Syn1 request … , size : " + syn1Global.length )

socket.send(json.JSONObject(Map("syn1"->json.JSONArray(List.fromArray(syn1Global)))).toString())
println("SYN1 Request sent")
val reply_syn1 = socket.recv(0)

socket.send(json.JSONObject(Map("foldComplete"->"Done")).toString())
println("foldComplete sent")
//  Get the reply.
val reply_foldComplete = socket.recv(0)
val processedSynValuesJson = new String(reply_foldComplete)
val processedSynValues_jerkson =   jerkson.Json.parse[Map[String,List[Double]]](processedSynValuesJson)

可以不使用 JSON 传输这些数组吗?

在这里,我在两个 C 程序之间传输一个浮点数组:

//client.c
int main (void)
{
printf ("Connecting to hello world server…\n");
void *context = zmq_ctx_new ();
void *requester = zmq_socket (context, ZMQ_REQ);
zmq_connect (requester, "tcp://localhost:5555");

int request_nbr;
float send_buffer[10];
float recv_buffer[10];

for(int i = 0; i < 10; i++)
    send_buffer[i] = i;

for (request_nbr = 0; request_nbr != 10; request_nbr++) {
    //char buffer [10];
    printf ("Sending Hello %d…\n", request_nbr);
    zmq_send (requester, send_buffer, 10*sizeof(float), 0);
    zmq_recv (requester, recv_buffer, 10*sizeof(float), 0);
    printf ("Received World %.3f\n", recv_buffer[5]);
}
zmq_close (requester);
zmq_ctx_destroy (context);
return 0;
}

//server.c

int main (void)
{
//  Socket to talk to clients
void *context = zmq_ctx_new ();
void *responder = zmq_socket (context, ZMQ_REP);
int rc = zmq_bind (responder, "tcp://*:5555");
assert (rc == 0);
float recv_buffer[10];
float send_buffer[10];
while (1) {
    //char buffer [10];
    zmq_recv (responder, recv_buffer, 10*sizeof(float), 0);
    printf ("Received Hello\n");
    for(int i = 0; i < 10; i++)
            send_buffer[i] = recv_buffer[i]+5;
    zmq_send (responder, send_buffer, 10*sizeof(float), 0);
}
return 0;
}

最后,我尝试使用 Scala 做类似的事情没有成功(下面是客户端代码):

def main(args: Array[String]) {
val context = ZMQ.context(1)
val socket = context.socket(ZMQ.REQ)

println("Connecting to hello world server…")
socket.connect ("tcp://localhost:5555")
val msg : Array[Float] = Array(1,2,3,4,5,6,7,8,9,10)
val bbuf = java.nio.ByteBuffer.allocate(4*msg.length)
bbuf.asFloatBuffer.put(java.nio.FloatBuffer.wrap(msg))


for (request_nbr <- 1 to 10)  {
    socket.sendByteBuffer(bbuf,0)

}
}
4

2 回答 2

4

SER/DES ? 尺寸 ?
不,与运输哲学相关的潜在约束很重要。

您已开始0.1 GB调整传输有效负载的大小,并报告了一个JSON-library 分配以导致您的 O/S 终止该进程。

接下来,在其他帖子中,您请求了0.762 GB传输有效负载的大小。

但是在ZeroMQ传输编排中还有一个比选择外部数据序列化器SER/DES策略更重要的问题。

没有人会禁止您尝试发送尽可能大的邮件BLOB,而JSON-decorated 字符串已经向您展示了这种方法的阴暗面,还有其他理由不继续这种方式。

ZeroMQ 毫无疑问是一个强大而强大的工具箱。仍然需要一些时间才能获得真正智能且高性能的代码部署所必需的洞察力,从而最大限度地利用这个强大的主力。

功能丰富的内部生态系统“幕后”的副作用之一是隐藏在消息传递概念中的不太为人所知的策略。

一个人可以发送任何大小合理的消息,但不能保证送达。它要么完全交付,要么根本没有任何东西,如上所述,没有任何保证。

哎哟?!

的,不保证。

基于这一核心零保证理念,在决定步骤和措施时应格外小心,如果您打算尝试搬迁,则更应谨慎Gigabyte BEASTs

从这个意义上说,它可能会得到实际SUT测试的定量支持,即小型消息可能会传输(如果您确实仍然需要移动 GB(请参阅上面的评论,在 OP 下)并且别无选择)数据被分割成更小的部分,具有容易出错的重新组装措施,这导致比尝试使用哑力并指示代码将大约 GB 的数据转储到任何资源上更快、更安全的端到端解决方案实际上是可用的(零拷贝原则ZeroMQ不能也不会在这些努力中拯救你)。

有关另一个隐藏陷阱的详细信息,与不完全零复制实现有关,请阅读 Martin SUSTRIK 的共同之父ZeroMQ关于零复制“直到内核边界”的评论(因此,至少两倍的内存空间分配意料之中……)。


解决方案:

重新设计架构以传播小型消息,如果不保持原始数据结构在远程进程中“镜像”,而不是尝试保持一次性千兆传输的可生存性。


最好的下一步?

虽然它不能用几个SLOC-s 解决您的问题,但最好的办法是,如果您认真地将您的智力投入到分布式处理中,那么最好阅读 Pieter HINTJEN 的可爱书“Code Connected, Vol.1”

是的,产生自己的洞察力需要一些时间,但这会在许多方面将您提升到另一个专业代码设计水平。值得时间。值得努力。

于 2016-03-29T11:15:14.350 回答
3

您需要以某种形式或方式序列化数据 - 最终您在内存中获取结构并指导另一侧如何重建该结构(使用两种不同语言的奖励积分,其中结构在内存中无论如何可能不同)。我建议您使用新的 JSON 库,因为这似乎是问题所在,但您可以使用更有效的协议。 Protocol Buffers在许多语言中都享有良好的支持,这可能是我要开始的地方。

于 2016-03-24T18:51:33.920 回答