我使用 REQ/REP 0MQ ipc:// 套接字实现了一个 JSON-RPC 服务器,我遇到了奇怪的行为,我怀疑这是由于 ipc:// 底层 unix 套接字不是真正的套接字,而是一根管子。
从文档中,必须强制执行严格的 zmq_send()/zmq_recv() 交替,否则无序的 zmq_send() 将返回错误。
但是,我希望执行是针对每个客户端,而不是每个套接字。当然,对于 Unix 套接字,从多个客户端到服务器只有一条管道,因此服务器不会知道它在与谁通信。两个客户端可以同时 zmq_send() 并且服务器会将此视为交替违规。
顺序可以是:
- 客户端A:zmq_send()
- ClientB: zmq_send() : 它会阻塞直到另一个发送/接收完成吗?它会返回-1吗?(由于固有的低级问题,我怀疑它会使用 ipc:// ,但使用 TCP 可以区分两个客户端)
- 客户A:zmq_recv()
- 客户端B:zmq_recv()
那么 tcp:// 套接字呢?它会同时工作吗?我应该使用其他一些锁定机制来解决这个问题吗?
示例服务器:
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <czmq.h>
int main(void) 
{
    zctx_t *zctx   ;
    void *zsocket_rpc;
        printf ("rpcserver create context\n");
    zctx = zctx_new();
        printf ("rpcserver create socket\n");
    zsocket_rpc = zsocket_new (zctx, ZMQ_REP);
        if (!zsocket_rpc) {
                fprintf (stderr, "zsocket_rpc is NULL\n");
                exit(1);
        }
    zsocket_bind (zsocket_rpc, "ipc:///tmp/rpcserver");
        for(;;) {
                int rc;
                char *msg = zstr_recv(zsocket_rpc);
                printf ("rpcserver received %s\n", msg);
                printf ("rpcserver sleep\n");
                usleep(200000);
                printf ("rpcserver send %s\n", msg);
                rc = zstr_send(zsocket_rpc, msg);
                if (rc < 0) {
                        fprintf (stderr, "rpcserver zstr_send returned %d\n", rc);
                        continue;
                }
                free(msg);
        }
}
示例客户端(作为 ./rpcclient letter 启动):
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <czmq.h>
int main(int argc, char *argv[]) 
{
        char msg[] = "A:MESSAGE 999";
    zctx_t *zctx;
    void *zsocket_rpc;
        if (argc != 2) {
                fprintf (stderr, "Usage: rpcclient letter\n");
                exit(1);
        }
    zctx = zctx_new();
        printf ("rpcclient new socket\n");
    zsocket_rpc = zsocket_new (zctx, ZMQ_REQ);
        if (!zsocket_rpc) {
                fprintf (stderr, "zsocket_rpc is NULL\n");
                exit(1);
        }
        printf ("rpcclient connect\n");
    zsocket_connect (zsocket_rpc, "ipc:///tmp/rpcserver");
        for (int cnt = 0; cnt < 1000; cnt++) {
                int rc;
                sprintf (msg, "%c:MESSAGE %03d", argv[1][0], cnt);
                printf  ("rpcclient send %s\n", msg);
                rc = zstr_send(zsocket_rpc, msg);
                if (rc < 0) {
                        fprintf (stderr, "rpcclient zstr_send returned %d\n", rc);
                        continue;
                }
                printf ("rpcclient sleep...\n");
                usleep(200000);
                char *reply = zstr_recv(zsocket_rpc);
                printf  ("rpcclient recv %s\n", reply);
                free(reply);
        }
}