1

我的应用程序与 ZeroMQ 交换消息,我将使用序列化解决方案 (Apache Avro) 在应用程序之间交换消息。.

然而,我面临着意想不到的困难。有人审查我的源代码吗?

订阅者在 "avro_record_get(person, "isincode", &isincode);" 处生成核心转储

#include "zhelpers.h"
#include <avro.h>

avro_schema_t person_schema;

const char PERSON_SCHEMA[] =
        "{\"type\":\"record\",\
  \"name\":\"A0\",\
  \"fields\":[\
     {\"name\": \"datatype\", \"type\": \"int\"},\
     {\"name\": \"market\", \"type\": \"int\"},\
     {\"name\": \"infotype\", \"type\": \"int\"},\
     {\"name\": \"isincode\", \"type\": \"string\"},\
     {\"name\": \"lastprice\", \"type\": \"int\"}]}";


int main(int argc, char *argv []) {
    //  Socket to talk to server


    init_schema();
    zmq_msg_t msg;

    avro_datum_t person = avro_record(person_schema);

    void *context = zmq_ctx_new();
    void *publisher = zmq_socket(context, ZMQ_PUB);
    int rc = zmq_bind(publisher, "tcp://*:5563");
    assert(rc == 0);

    for (;;) {

        avro_datum_t datatype = avro_int32(1);
        avro_datum_t market = avro_int32(2);
        avro_datum_t infotype = avro_int32(1);
        avro_datum_t isincode = avro_string("KR7005930003");
        avro_datum_t lastprice = avro_int64(1500000);

        avro_record_set(person, "datatype", datatype);
        avro_record_set(person, "market", market);
        avro_record_set(person, "infotype", infotype);
        avro_record_set(person, "isincode", isincode);
        avro_record_set(person, "lastprice", lastprice);

        int rc3 = zmq_msg_init_size(&msg, sizeof (msg));
        memcpy(zmq_msg_data(&msg), person, sizeof (msg));

        int ret3 = zmq_msg_send(&msg, publisher, 0);
        zmq_msg_close(&msg);

        printf("%d\n", ret3);

        avro_datum_t person2,   isincode2;
        char *p;

        avro_record_get(person, "isincode", &isincode2);
        avro_string_get(isincode, &p);
        fprintf(stdout, "%12s | ", p);

    }
    zmq_close(publisher);
    zmq_ctx_destroy(context);
    return 0;
}


**[Subscriber]**


#include "zhelpers.h"
#include <avro.h>


avro_schema_t person_schema;

const char PERSON_SCHEMA[] =
        "{\"type\":\"record\",\
  \"name\":\"A0\",\
  \"fields\":[\
     {\"name\": \"datatype\", \"type\": \"int\"},\
     {\"name\": \"market\", \"type\": \"int\"},\
     {\"name\": \"infotype\", \"type\": \"int\"},\
     {\"name\": \"isincode\", \"type\": \"string\"},\
     {\"name\": \"lastprice\", \"type\": \"int\"}]}";

/* Parse schema into a schema data structure */
void init_schema(void) {
    if (avro_schema_from_json_literal(PERSON_SCHEMA, &person_schema)) {
        fprintf(stderr, "Unable to parse person schema\n");
        exit(EXIT_FAILURE);
    }
}

int main(int argc, char *argv []) {

    init_schema();
    avro_datum_t person = avro_record(person_schema);

    init_schema();
    void *context = zmq_ctx_new();
    void *subscriber = zmq_socket(context, ZMQ_SUB);
    int rc = zmq_connect(subscriber, "tcp://localhost:5563");
    assert(rc == 0);

    rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, 0, 0);
    assert(rc == 0);

    while (1) {

        //  avro_datum_t person, isincode;
        avro_datum_t isincode;
        char *p;

        zmq_msg_t reply;

        zmq_msg_init(&reply);


        printf("zmq_msg_init complete \n");

        int recv_size = zmq_msg_recv(&reply, subscriber, 0);

        printf("zmq_msg_recv  %d\n", recv_size);

        int length = zmq_msg_size(&reply);

        printf("zmq_msg_size %d\n", length);

        memcpy(person, zmq_msg_data(&reply), length);
        // zmq_msg_close(&reply);

        avro_record_get(person, "isincode", &isincode);
        avro_string_get(isincode, &p);
        fprintf(stdout, "%12s | ", p);

    }

    zmq_close(subscriber);
    zmq_ctx_destroy(context);
    return 0;
}
4

0 回答 0