4

我正在基于此页面中的示例开发多线程 RPC 服务器:http: //bderzhavets.blogspot.ca/2005/11/multithreaded-rpc-server-in-white-box.html

不幸的是,它并没有完全开箱即用,并且在跟踪错误一段时间后,我发现服务器无法解码参数(基于来自的返回码squareproc_2)。squareproc_2_svc在调用函数后,服务器端的执行似乎停止了serv_request。请参见case: SQUAREPROCsquare_svc.c 的以下代码

void *serv_request(void *data)
{
    struct thr_data *ptr_data = (struct thr_data *)data;
    {
        square_in argument;
        square_out result;
        bool_t retval;
        xdrproc_t _xdr_argument, _xdr_result;
        bool_t (*local)(char *, void *, struct svc_req *);
        struct svc_req *rqstp = ptr_data->rqstp;
        register SVCXPRT *transp = ptr_data->transp;
        switch (rqstp->rq_proc) {
            case NULLPROC:
                printf("NULLPROC called\n");
                (void) svc_sendreply (transp, (xdrproc_t) xdr_void, (char *)NULL);
                return;
            case SQUAREPROC:
                _xdr_argument = (xdrproc_t) xdr_square_in;
                _xdr_result = (xdrproc_t) xdr_square_out;
                printf("_xdr_result = %ld\n",_xdr_result);
                local = (bool_t (*) (char *, void *,  struct svc_req *))squareproc_2_svc;
                break;
            default:
                printf("default case executed");
                svcerr_noproc (transp);
                return;
        }
        memset ((void *)&argument, 0, sizeof (argument));
        if (!svc_getargs (transp, (xdrproc_t) _xdr_argument, (caddr_t) &argument)) {
            printf("svc_getargs failed");
            svcerr_decode (transp);
            return;
        }
        retval = (bool_t) (*local)((char *)&argument, (void *)&result, rqstp);
        printf("serv_request result: %d\n",retval);
        if (retval > 0 && !svc_sendreply(transp, (xdrproc_t) _xdr_result, (char *)&result))
        {
            printf("something happened...\n");
            svcerr_systemerr (transp);
        }
        if (!svc_freeargs (transp, (xdrproc_t) _xdr_argument, (caddr_t) &argument)) {
            fprintf (stderr, "%s", "unable to free arguments");
            exit (1);
        }
        if (!square_prog_2_freeresult (transp, _xdr_result, (caddr_t) &result))
            fprintf (stderr, "%s", "unable to free results");
        return;
    }
}

这是squareproc_2_svc来自文件 square_server.c 的实现:

bool_t squareproc_2_svc(square_in *inp,square_out *outp,struct svc_req *rqstp)
{
    printf("Thread id = '%ld' started, arg = %ld\n",pthread_self(),inp->arg1);
    sleep(5);
    outp->res1=inp->arg1*inp->arg1;
    printf("Thread id = '%ld' is done %ld \n",pthread_self(),outp->res1);
    return(TRUE);
}

客户端输出:

yak@AcerPC:~/RPC/multithread_example$ ./ClientSQUARE localhost 2
squareproc_2 called
xdr_square_in result: 1
function call failed; code: 11

服务器端输出:

yak@AcerPC:~/RPC/multithread_example$ sudo ./ServerSQUARE 
creating threads
SQUAREPROC called
xdr_square_in result: 0

如您所见,xdr_square_in 在服务器端返回 FALSE 结果。这是 square.x

struct square_in {
    long arg1;
};

struct square_out {
    long res1;
};

program SQUARE_PROG {
    version SQUARE_VERS {
        square_out SQUAREPROC(square_in) = 1;
    } = 2 ;
} = 0x31230000;

和 square_xdr.c

/*
 * Please do not edit this file.
 * It was generated using rpcgen.
 */

#include "square.h"

bool_t
xdr_square_in (XDR *xdrs, square_in *objp)
{
    register int32_t *buf;
    int retval;
    if (!xdr_long (xdrs, &objp->arg1)) retval = FALSE;
    else retval = TRUE;
    printf("xdr_square_in result: %d\n",retval);
    return retval;
}

bool_t
xdr_square_out (XDR *xdrs, square_out *objp)
{
    register int32_t *buf;
    int retval;
    if (!xdr_long (xdrs, &objp->res1)) retval = FALSE;
    else retval = TRUE;
    printf("xdr_square_out result: %d\n",retval);
    return retval;
}

我在 Ubuntu 14.04 LTS 中工作,使用 生成存根和 xdr 代码rpcgen -a -M,并使用gcc.

该错误似乎仅在使用 TCP 作为传输方法时发生。我可以使用 UDP 作为传输来获得结果,但是当来自多个客户端的请求同时到达时,一些调用会失败。我希望能够支持多达 15 个客户。当我尝试使用 UDP 和 10 个客户端时,10 个调用中有 2 个失败,返回代码与squareproc_2.

4

1 回答 1

4

你有几个问题。

在xen页面中,当它在square_prog_2中做pthread_create时,它首先调用pthread_attr_setdetachstate,但在此之前它需要做pthread_attr_init 。此外, attr 似乎是静态/全局的——将它放在函数的堆栈框架中。

square_prog_2 有两个参数:rqstp 和 transp。这些被保存到一个 malloc 的结构 data_str [所以每个线程都有自己的副本]。但是,我想知道 rqstp 和 transp 值是什么(例如 printf("%p"))。它们需要不同,否则每个线程在尝试使用它们时会相互冲突[因此需要 pthread_mutex_lock]。malloc 不会克隆 rqstp/transp,因此如果它们相同,那就是问题所在,因为您可能有两个线程试图同时在同一个缓冲区上重复。

有一个返回码 11。除非有一些特殊的代码,这看起来很像线程上的 SIGSEGV。这将完全由 rqstp/transp 重叠来解释。

您可能需要重新架构它,因为我怀疑 XDR不是线程安全的——它也不应该是线程安全的。另外,我不认为 svc_* 是线程安全/感知的。

启动单线程。作为测试,让 square_prog_2 直接调用 serv_request(例如,不要执行 pthread_*)。我敢打赌,这适用于所有模式。

如果是这样,请抓住你的帽子 - 使用线程的示例代码已损坏 - 充满竞争条件并且会出现段错误等。如果你没有挂断使用线程(不需要像 x * 这样的轻型任务x),您可以按原样享受。

否则,解决方案会更复杂一些。主线程必须完成对套接字的所有访问和所有 XDR 解析/编码。它不能使用 svc_run——你必须自己动手。孩子只能做实际的工作(例如x * x),可能不会接触socket/req/transp等。

主线程:

while (1) {
    if (svc_getreq_poll()) {
        // parse XDR
        // create data/return struct for child thread
        // create thread
        // add struct to list of "in-flight" requests
    }

    forall struct in inflight {
        if (reqdone) {
            // take result from struct
            // encode into XDR
            // do send_reply
            // remove struct from list
        }
    }
}

对于子结构,它看起来像:

struct child_struct {
    int num;
    int num_squared;
};

而孩子的线程函数变成了一个单行:ptr->num_squared = ptr->num * ptr->num

更新: Linux 或 FreeBSD 下似乎不支持多线程 RPC 服务器

这是一个文档: https ://www.redhat.com/archives/redhat-list/2004-June/msg00439.html 这是一个更简洁的示例。

从那: 记住Linux下不支持rpcgen的-A选项。SunOS RPC 提供的用于构建多线程 RPC 服务器的库调用在 Linux 下也不可用

这是 Linux rpcgen 手册页: http: //linux.die.net/man/1/rpcgen 没有提到 -M。IMO,这意味着 rpcgen 程序可以选择并生成存根,但没有底层支持,因此他们将其排除在文档之外。

这是 FreeBSD 手册页 [以及不支持的原因]: http://www.freebsd.org/cgi/man.cgi?query=rpcgen&sektion=1&manpath=FreeBSD+5.0-RELEASE 请参阅此文档中的 -M :

M——生成多线程安全存根,用于在 rpcgen 生成的代码和用户编写的代码之间传递参数和结果。此选项对于想要在其代码中使用线程的用户很有用。但是,rpc_svc_calls(3) 函数还不是 MT 安全的,这意味着 rpcgen 生成的服务器端代码将不是 MT 安全的。

另一种方法:

为什么要为 RPC/XDR 烦恼呢?对于您打算使用的大型阵列,开销是巨大的。大多数标准用途是用于没有太多数据的黄页之类的东西。

如今,大多数系统都是小端的。只需将本机缓冲区爆破到您直接打开的套接字即可。在服务器上,让一个守护进程进行监听,然后派生一个孩子,让孩子接受、读入数据、进行计算,然后发回回复。在最坏的情况下,孩子将需要进行字节序交换,但这很容易使用 bswap_32 在紧密循环中完成。

在每条消息的开头有一个简单的小控制结构,在任一方向作为数据有效负载的前缀:

struct msgcontrol {
    int what_i_am;
    int operation_to_perform;
    int payload_length;
    int payload[0];
};

特别注意:我以前在商业上做过这个(例如 MPI 和我自己的),你可能必须发出 setsockopt 调用以将内核套接字缓冲区的大小增加到足够大以维持大量数据

实际上,现在我想起来了,如果你不想自己动手,MPI 可能会很有趣。然而,用过它,我不是一个真正的粉丝。它有意想不到的问题,我们不得不将其删除,以便直接控制我们的套接字。

于 2015-10-19T00:25:44.890 回答