1

我正在使用 pthread + ACE 编写一个假客户端。

这个客户端有3个线程,每个线程都可以通过ACE无休止地发送和接收消息。然而,这些线程总是被函数 send() 或 recv() 停止。也就是说,如果发送或接收出现问题,线程将退出,不幸的是,我不知道错误是什么,我也无法捕捉到它。代码是:

struct thread_data {
    int  thread_id;
    string ip;
    uint32_t port;
    uint32_t timeout;
};
std::vector<struct thread_data> m_thread;

void * test_fun1(void * threadid)
{
    struct thread_data * tmp_thread_data = (struct thread_data *)threadid;
    long tmp_threadid = (long)tmp_thread_data->thread_id;
    string tmp_ip = tmp_thread_data->ip;
    uint32_t tmp_port = tmp_thread_data->port;
    uint32_t tmp_timeout = tmp_thread_data->timeout;

    ACE_INET_Addr addr(tmp_port, tmp_ip.c_str());
    ACE_Time_Value timeout(0, tmp_timeout * 1000);
    ACE_SOCK_Connector connector;
    ACE_SOCK_Stream peer;

    // connect
    if(connector.connect(peer, addr, &timeout) != 0)
            pthread_exit((void *) threadid);

    // send msg
    while (1)
    {
            ssize_t tmp_ret1 = peer.send("hello world", 12);
            if (tmp_ret1 <= 0)
                    continue;

            char tmp_buf[1024] = '\0';
            ssize_t tmp_ret2 = peer.recv(tmp_buf, 1024, &timeout);
            if (tmp_ret2 <= 0)
                     continue;
            else
                     fprintf(stderr, "recv:%s\n", tmp_buf);
    }

    // close
    peer.close();
    pthread_exit((void *) threadid);
}

int main(int argc, char *argv[]) 
{
    std::vector<pthread_t> threads;
    pthread_attr_t attr;
    int rc;
    int i = 0;
    void * status;

    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

    // thread create
    int tmp_num = 3;
    for(i = 0; i < tmp_num; i++)
    {
            pthread_t tmp_thread_handler;
            struct thread_data tmp_thread_info;
            tmp_thread_info.thread_id = i;
            tmp_thread_info.ip = "127.0.0.1";
            tmp_thread_info.port = 8001;
            tmp_thread_info.timeout = 100;
            rc = pthread_create(&tmp_thread_handler, NULL, test_fun1, (void *)&tmp_thread_info);
            if (rc != 0)
                    return -1;

            threads.push_back(tmp_thread_handler);
            m_thread.push_back(tmp_thread_info);
    }

    // thread start
    pthread_attr_destroy(&attr);
    for(i = 0; i < tmp_num; i++) 
    {

            rc = pthread_join(threads[i], &status);
            if (rc != 0) 
                    return -1;
    }

    pthread_exit(NULL);
    return 0; 
}

如果我想无休止地发送和接收消息怎么办?任何帮助,将不胜感激!

4

1 回答 1

1

您在此代码中有一个重要的作用域寿命问题。您正在向每次循环迭代创建/销毁的线程发送数据块。撇开边线:

// thread create
int tmp_num = 3;
for(i = 0; i < tmp_num; i++)
{
    pthread_t tmp_thread_handler;
    struct thread_data tmp_thread_info;
    // --- snip ----
    rc = pthread_create(&tmp_thread_handler, NULL, test_fun1, (void *)&tmp_thread_info);
    // --- snip ----
}

结果块是否被复制到底层信息向量中是不相关的。一旦循环摆动,对象就会被破坏,并且std::string成员变量也会随之被破坏ip。它可能驻留在同一内存空间(结构)中,但字符串的重新分配几乎肯定不会。换句话说,您正在调用undefined behavior

对此的一种可能解决方案是对结构使用智能指针,将其get()成员传递给线程,并将所述相同的智能指针推送到您的信息向量中(针对智能指针进行了重组)。我个人更喜欢一种更简单的方法,即在线程启动之前分配两个向量,从而修复它们的“内部”以供按地址使用。从那里您可以发送结构地址,因为向量为您保存它:

// thread create
size_t tmp_num = 3;
m_thread.resize(tmp_num);
threads.resize(tmp_num);
for(size_t i = 0; i < tmp_num; i++)
{
    m_thread[i].thread_id = i;
    m_thread[i].ip = "127.0.0.1";
    m_thread[i].port = 8001;
    m_thread[i].timeout = 100;

    rc = pthread_create(&threads[i], NULL, test_fun1, &m_thread[i]);
    if (rc != 0)
        return -1;
}

关于其他问题,在您的线程过程中,这不会编译:

char tmp_buf[1024] = '\0';

但这与您所拥有的未定义行为相比是很小的。

于 2013-09-25T04:42:29.933 回答