目前,我正在使用 JSON 库在发送方(JeroMQ)处序列化数据,并在接收方(C、ZMQ)处反序列化。但是,在解析时,JSON 库开始消耗大量内存,操作系统会终止该进程。所以,我想按原样发送浮点数组,即不使用 JSON。
现有的发件人代码如下(syn0
并且syn1
是Double
数组)。如果syn0
和syn1
每个大约 100 MB,则在解析接收到的数组时会终止该进程,即以下代码段的最后一行:
import org.zeromq.ZMQ
import com.codahale.jerkson
socket.connect("tcp://localhost:5556")
socket.send(json.JSONObject(Map("syn0"->json.JSONArray(List.fromArray(syn0Global)))).toString())
println("SYN0 Request sent”)
val reply_syn0 = socket.recv(0)
println("Response received after syn0: " + new String(reply_syn0))
logInfo("Sending Syn1 request … , size : " + syn1Global.length )
socket.send(json.JSONObject(Map("syn1"->json.JSONArray(List.fromArray(syn1Global)))).toString())
println("SYN1 Request sent")
val reply_syn1 = socket.recv(0)
socket.send(json.JSONObject(Map("foldComplete"->"Done")).toString())
println("foldComplete sent")
// Get the reply.
val reply_foldComplete = socket.recv(0)
val processedSynValuesJson = new String(reply_foldComplete)
val processedSynValues_jerkson = jerkson.Json.parse[Map[String,List[Double]]](processedSynValuesJson)
可以不使用 JSON 传输这些数组吗?
在这里,我在两个 C 程序之间传输一个浮点数组:
//client.c
int main (void)
{
printf ("Connecting to hello world server…\n");
void *context = zmq_ctx_new ();
void *requester = zmq_socket (context, ZMQ_REQ);
zmq_connect (requester, "tcp://localhost:5555");
int request_nbr;
float send_buffer[10];
float recv_buffer[10];
for(int i = 0; i < 10; i++)
send_buffer[i] = i;
for (request_nbr = 0; request_nbr != 10; request_nbr++) {
//char buffer [10];
printf ("Sending Hello %d…\n", request_nbr);
zmq_send (requester, send_buffer, 10*sizeof(float), 0);
zmq_recv (requester, recv_buffer, 10*sizeof(float), 0);
printf ("Received World %.3f\n", recv_buffer[5]);
}
zmq_close (requester);
zmq_ctx_destroy (context);
return 0;
}
//server.c
int main (void)
{
// Socket to talk to clients
void *context = zmq_ctx_new ();
void *responder = zmq_socket (context, ZMQ_REP);
int rc = zmq_bind (responder, "tcp://*:5555");
assert (rc == 0);
float recv_buffer[10];
float send_buffer[10];
while (1) {
//char buffer [10];
zmq_recv (responder, recv_buffer, 10*sizeof(float), 0);
printf ("Received Hello\n");
for(int i = 0; i < 10; i++)
send_buffer[i] = recv_buffer[i]+5;
zmq_send (responder, send_buffer, 10*sizeof(float), 0);
}
return 0;
}
最后,我尝试使用 Scala 做类似的事情没有成功(下面是客户端代码):
def main(args: Array[String]) {
val context = ZMQ.context(1)
val socket = context.socket(ZMQ.REQ)
println("Connecting to hello world server…")
socket.connect ("tcp://localhost:5555")
val msg : Array[Float] = Array(1,2,3,4,5,6,7,8,9,10)
val bbuf = java.nio.ByteBuffer.allocate(4*msg.length)
bbuf.asFloatBuffer.put(java.nio.FloatBuffer.wrap(msg))
for (request_nbr <- 1 to 10) {
socket.sendByteBuffer(bbuf,0)
}
}