我的应用程序与 ZeroMQ 交换消息,我将使用序列化解决方案 (Apache Avro) 在应用程序之间交换消息。.
然而,我面临着意想不到的困难。有人审查我的源代码吗?
订阅者在 "avro_record_get(person, "isincode", &isincode);" 处生成核心转储
#include "zhelpers.h"
#include <avro.h>
avro_schema_t person_schema;
const char PERSON_SCHEMA[] =
"{\"type\":\"record\",\
\"name\":\"A0\",\
\"fields\":[\
{\"name\": \"datatype\", \"type\": \"int\"},\
{\"name\": \"market\", \"type\": \"int\"},\
{\"name\": \"infotype\", \"type\": \"int\"},\
{\"name\": \"isincode\", \"type\": \"string\"},\
{\"name\": \"lastprice\", \"type\": \"int\"}]}";
int main(int argc, char *argv []) {
// Socket to talk to server
init_schema();
zmq_msg_t msg;
avro_datum_t person = avro_record(person_schema);
void *context = zmq_ctx_new();
void *publisher = zmq_socket(context, ZMQ_PUB);
int rc = zmq_bind(publisher, "tcp://*:5563");
assert(rc == 0);
for (;;) {
avro_datum_t datatype = avro_int32(1);
avro_datum_t market = avro_int32(2);
avro_datum_t infotype = avro_int32(1);
avro_datum_t isincode = avro_string("KR7005930003");
avro_datum_t lastprice = avro_int64(1500000);
avro_record_set(person, "datatype", datatype);
avro_record_set(person, "market", market);
avro_record_set(person, "infotype", infotype);
avro_record_set(person, "isincode", isincode);
avro_record_set(person, "lastprice", lastprice);
int rc3 = zmq_msg_init_size(&msg, sizeof (msg));
memcpy(zmq_msg_data(&msg), person, sizeof (msg));
int ret3 = zmq_msg_send(&msg, publisher, 0);
zmq_msg_close(&msg);
printf("%d\n", ret3);
avro_datum_t person2, isincode2;
char *p;
avro_record_get(person, "isincode", &isincode2);
avro_string_get(isincode, &p);
fprintf(stdout, "%12s | ", p);
}
zmq_close(publisher);
zmq_ctx_destroy(context);
return 0;
}
**[Subscriber]**
#include "zhelpers.h"
#include <avro.h>
avro_schema_t person_schema;
const char PERSON_SCHEMA[] =
"{\"type\":\"record\",\
\"name\":\"A0\",\
\"fields\":[\
{\"name\": \"datatype\", \"type\": \"int\"},\
{\"name\": \"market\", \"type\": \"int\"},\
{\"name\": \"infotype\", \"type\": \"int\"},\
{\"name\": \"isincode\", \"type\": \"string\"},\
{\"name\": \"lastprice\", \"type\": \"int\"}]}";
/* Parse schema into a schema data structure */
void init_schema(void) {
if (avro_schema_from_json_literal(PERSON_SCHEMA, &person_schema)) {
fprintf(stderr, "Unable to parse person schema\n");
exit(EXIT_FAILURE);
}
}
int main(int argc, char *argv []) {
init_schema();
avro_datum_t person = avro_record(person_schema);
init_schema();
void *context = zmq_ctx_new();
void *subscriber = zmq_socket(context, ZMQ_SUB);
int rc = zmq_connect(subscriber, "tcp://localhost:5563");
assert(rc == 0);
rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, 0, 0);
assert(rc == 0);
while (1) {
// avro_datum_t person, isincode;
avro_datum_t isincode;
char *p;
zmq_msg_t reply;
zmq_msg_init(&reply);
printf("zmq_msg_init complete \n");
int recv_size = zmq_msg_recv(&reply, subscriber, 0);
printf("zmq_msg_recv %d\n", recv_size);
int length = zmq_msg_size(&reply);
printf("zmq_msg_size %d\n", length);
memcpy(person, zmq_msg_data(&reply), length);
// zmq_msg_close(&reply);
avro_record_get(person, "isincode", &isincode);
avro_string_get(isincode, &p);
fprintf(stdout, "%12s | ", p);
}
zmq_close(subscriber);
zmq_ctx_destroy(context);
return 0;
}