我无法使用 C 通过 ZeroMQ 套接字接收在 protobuf 中序列化的消息。我已经序列化了客户端输入的消息,并使用 zhelpers.h 中定义的 s_send() 函数将此缓冲区发送到服务器。服务器代码是与 zeromq 包捆绑的相同测试代码作为示例。
这是我的客户端:
#include "amessage.pb-c.h"
#include "zhelpers.h"
int main (void)
{
AMessage msg = AMESSAGE__INIT; // AMessage
void *buf; // Buffer to store serialized data
unsigned len;
printf ("Connecting to server...\n");
void *context = zmq_ctx_new ();
void *requester = zmq_socket (context, ZMQ_REQ);
char buffer[256] = "";
printf("[client] :");
scanf("%s", buffer );
msg.csmsg = buffer;
len = amessage__get_packed_size(&msg);
buf = malloc(len);
printf("[client]: pack msg len : %d\n ", len);
printf("Sent msg : %d\n", buf);
amessage__pack(&msg,buf);
s_send(requester, buf);
zmq_close (requester);
zmq_ctx_destroy (context);
return 0;
}
和服务器端:
#include "zhelpers.h"
#include <pthread.h>
#include <stdlib.h>
#include "amessage.pb-c.h"
#define MAX_MSG_SIZE 256
static size_t read_buffer (unsigned max_length, unsigned char *out)
{
size_t cur_len = 0, nread;
uint8_t c;
while ((nread=fread(out + cur_len, 1, max_length - cur_len, stdin)) != 0)
{
cur_len += nread;
if (cur_len == max_length)
{
fprintf(stderr, "[server]: max message length exceeded\n");
exit(1);
}
}
return cur_len;
}
static void * worker_routine (void *context)
{
AMessage *msg;
uint8_t buf[MAX_MSG_SIZE];
char buffer[256];
// Socket to talk to dispatcher
void *receiver = zmq_socket (context, ZMQ_REP);
zmq_connect (receiver, "inproc://workers");
while (1) {
uint8_t *string = s_recv (receiver);
if(string == 0)
printf("[server]: Error: In receiving msg.\n");
else
{
size_t msg_len = read_buffer (MAX_MSG_SIZE, string);
printf("[server]: client msg len is: %d.\n", msg_len);
msg = amessage__unpack(NULL, msg_len, string);
if (msg == NULL)
{
fprintf(stderr, "[server]: error unpacking incoming message\n");
exit(1);
}
printf ("[client]: %s \n", msg->csmsg);
}
amessage__free_unpacked(msg, NULL);
free (string);
// Do some 'work'
sleep (1);
}
zmq_close (receiver);
return NULL;
}
int main (void)
{
void *context = zmq_ctx_new ();
void *clients = zmq_socket (context, ZMQ_ROUTER);
zmq_bind (clients, "tcp://*:5555");
void *workers = zmq_socket (context, ZMQ_DEALER);
zmq_bind (workers, "inproc://workers");
// Launch pool of worker threads
int thread_nbr;
for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) {
pthread_t worker;
pthread_create (&worker, NULL, worker_routine, context);
}
// Connect work threads to client threads via a queue proxy
zmq_proxy (clients, workers, NULL);
zmq_close (clients);
zmq_close (workers);
zmq_ctx_destroy (context);
return 0;
}
知道我做错了什么吗?