0

最近我用libevent写了一个简单的例子,

  #include <stdio.h>
    #include <stdlib.h>
    #include <stddef.h>
    #include <sys/types.h>
    #include <signal.h>
    #include <unistd.h>
    #include <string.h>
    #include <event.h>
    #include <unistd.h>
    #include <fcntl.h>
    #include <errno.h>
    #include <sys/socket.h>
    #include <netdb.h>
    #include <netinet/tcp.h>

    #define bool char
    #define false 0
    #define true 1

enum conn_states
{
    conn_listening, /**< the socket which listens for connections */
    conn_new_cmd, /**< Prepare connection for next command */
    conn_waiting, /**< waiting for a readable socket */
    conn_read, /**< reading in a command line */
    conn_parse_cmd, /**< try to parse a command from the input buffer */
    conn_write, /**< writing out a simple response */
    conn_swallow, /**< swallowing unnecessary bytes w/o storing */
    conn_closing, /**< closing this connection */
};

/**
 * The structure representing a connection into memcached.
 * Used when processing request for worker threads
 */
typedef struct conn conn;
struct conn
{
    char *rbuf; /** buffer to read commands into */
    char *rcurr; /** but if we parsed some already, this is where we stopped */
    int rsize; /** total allocated size of rbuf */
    int rbytes; /** how much data have we read*/

    char *wbuf; /* buffer to write*/
    char *wcurr;
    int wsize; /* sizeof(buf)*/
    int wbytes;/* strlen(buf)*/

    int sfd;
    enum conn_states state;
    struct event event;
    short ev_flags;
    short which; /** which events were just triggered */

    /** which state to go into after finishing current write */
    enum conn_states write_and_go;
    void *write_and_free; /** free this memory after finishing writing */

    /* data for the swallow state */
    int sbytes; /* how many bytes to swallow */

    /*No care about UDP*/

    bool noreply; /* True if the reply should not be sent. */
    /* current stats command */
    struct
    {
        char *buffer;
        size_t size;
        size_t offset;
    } stats;

    conn *next; /* Used for generating a list of conn structures */
};

/** file scope variables **/
struct event_base *main_base;


//the master listen fd stores here
//also the first node of listen conn queue
conn *listen_conn = NULL;

//how to pass which to event_handler?
void event_handler(const int fd, const short which, void *arg)
{
    conn *c;

    fprintf(stderr, "event_handler going to work for %d\n", fd);

    /* wait for next event */
    return;
}


/*
 * Frees a connection.
 */
void conn_free(conn *c)
{
    if (c)
    {
        if (c->rbuf)
            free(c->rbuf);
        if (c->wbuf)
            free(c->wbuf);
        free(c);
    }
}


/**
 *  create a new conn, and set the conn's event
 */
conn *conn_new(const int sfd, enum conn_states init_state,
        const int event_flags, const int read_buffer_size,  struct event_base *base)
{
    conn *c = NULL;

    if (NULL == c) //c=NULL  at first time
    {
        if (!(c = (conn *) calloc(1, sizeof(conn))))
        {
            fprintf(stderr, "calloc failed");
            return NULL;
        }

        c->rbuf = c->wbuf = 0;
        c->rsize = read_buffer_size;
        c->wsize = 2048;
        c->rbuf = (char *) malloc((size_t) c->rsize);
        c->wbuf = (char *) malloc((size_t) c->wsize);

        if (c->rbuf == 0 || c->wbuf == 0)
        {
            conn_free(c);
            fprintf(stderr, "malloc failed");
            return NULL;
        }

    }

    c->sfd = sfd;
    c->state = init_state;
    c->rbytes = c->wbytes = 0;
    c->wcurr = c->wbuf;
    c->rcurr = c->rbuf;

    c->write_and_go = init_state;
    c->write_and_free = 0;

    c->noreply = false;

    event_set(&c->event, sfd, event_flags, event_handler, (void *) c);
    event_base_set(base, &c->event);
    c->ev_flags = event_flags;

    if (event_add(&c->event, 0) == -1)
    {
        conn_free(c);
        fprintf(stderr, "event_add failed");
        return NULL;
    }

    return c;
}

static int new_socket(struct addrinfo *ai)
{
    int sfd;
    int flags;

    if ((sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1)
    {
        return -1;
    }

    if ((flags = fcntl(sfd, F_GETFL, 0)) < 0
            || fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0)
    {
        fprintf(stderr, "setting O_NONBLOCK");
        close(sfd);
        return -1;
    }
    return sfd;
}


int server_socket(int port)
{
    int sfd;
    struct linger ling =
    { 0, 0 };
    struct addrinfo *ai;
    struct addrinfo *next;
    struct addrinfo hints =
    { .ai_flags = AI_PASSIVE, .ai_family = AF_UNSPEC };
    char port_buf[NI_MAXSERV];
    int error;
    int success = 0;
    int flags = 1;

    hints.ai_socktype = SOCK_STREAM;

    if (port == -1)
    {
        port = 0;
    }
    snprintf(port_buf, sizeof(port_buf), "%d", port);
    error = getaddrinfo(NULL, port_buf, &hints, &ai);
    if (error != 0)
    {
        fprintf(stderr, "getaddrinfo(): %s", gai_strerror(error));
        return -1;
    }

    for (next = ai; next; next = next->ai_next)
    {
        conn *listen_conn_add;
        if ((sfd = new_socket(next)) == -1)
        {
            /* getaddrinfo can return "junk" addresses,
             * we make sure at least one works before erroring.
             */
            if (errno == EMFILE)
            {
                /* ...unless we're out of fds */
                fprintf(stderr, "server_socket failed");
                exit(-1);
            }
            continue;
        }

        /*will not care about ipv6 here*/

        setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *) &flags, sizeof(flags));
        error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *) &flags,
                sizeof(flags));
        if (error != 0)
            fprintf(stderr, "setsockopt");

        error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *) &ling,
                sizeof(ling));
        if (error != 0)
            fprintf(stderr, "setsockopt");

        /** TCP_NODELAY is for a specific purpose; to disable the Nagle buffering
            algorithm. It should only be set for applications that send frequent
            small bursts of information without getting an immediate response,
            where timely delivery of data is required  (the canonical example is
            mouse movements)*/

        error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *) &flags,
                sizeof(flags));
        if (error != 0)
            fprintf(stderr, "setsockopt");

        if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1)
        {
            if (errno != EADDRINUSE)
            {
                fprintf(stderr, "bind failed");
                close(sfd);
                freeaddrinfo(ai);
                return -1;
            }
            close(sfd);
            continue;
        }
        else
        {
            success++;
            if (next->ai_addr->sa_family == AF_INET)
            {
                union
                {
                    struct sockaddr_in in;
                    struct sockaddr_in6 in6;
                } my_sockaddr;
                socklen_t len = sizeof(my_sockaddr);
                if (getsockname(sfd, (struct sockaddr*) &my_sockaddr, &len) == 0)
                {
                    if (next->ai_addr->sa_family == AF_INET)
                    {
                        fprintf(stderr, "Server listenning at port : %u",
                                ntohs(my_sockaddr.in.sin_port));
                    }
                }
            }
        }

        if (!(listen_conn_add = conn_new(sfd, conn_listening,
                EV_READ | EV_PERSIST, 1, main_base)))
        {
            fprintf(stderr, "failed to create listening connection");
            exit(-1);
        }
        listen_conn_add->next = listen_conn;
        listen_conn = listen_conn_add; //the way to add a node
        fprintf(stderr, "<%d listen tcp conn added to queue", sfd);
    }

    freeaddrinfo(ai);

    return success;
}

int main()
{

    /* initialize main thread libevent instance */
    main_base = event_init();

    errno = 0;
    if (server_socket(22222) == 0)
    {
        fprintf(stderr, "failed to listen on TCP port %d", 22222);
        return -1;
    }

    if (event_base_loop(main_base, 0) != 0)
    {
        fprintf(stderr, "failed to event_base_loop");
        return -1;
    }

    return 0;
}

但是当它运行时,cpu几乎是99%。

这段代码来自 memcached,当时 memcached 工作得很好!

请帮忙!

但是当它运行时,cpu几乎是99%。

这段代码来自 memcached,当时 memcached 工作得很好!

请帮忙!

但是当它运行时,cpu几乎是99%。

这段代码来自 memcached,当时 memcached 工作得很好!

请帮忙!

但是当它运行时,cpu几乎是99%。

这段代码来自 memcached,当时 memcached 工作得很好!

请帮忙!

但是当它运行时,cpu几乎是99%。

这段代码来自 memcached,当时 memcached 工作得很好!

请帮忙!

但是当它运行时,cpu几乎是99%。

这段代码来自 memcached,当时 memcached 工作得很好!

请帮忙!

但是当它运行时,cpu几乎是99%。

这段代码来自 memcached,当时 memcached 工作得很好!

请帮忙!

但是当它运行时,cpu几乎是99%。

这段代码来自 memcached,当时 memcached 工作得很好!

请帮忙!

但是当它运行时,cpu几乎是99%。

这段代码来自 memcached,当时 memcached 工作得很好!

请帮忙!

但是当它运行时,cpu几乎是99%。

这段代码来自 memcached,当时 memcached 工作得很好!

请帮忙!

但是当它运行时,cpu几乎是99%。

这段代码来自 memcached,当时 memcached 工作得很好!

请帮忙!

谢谢

4

1 回答 1

0

我在检查时看到的一件事是“event_handler”中的 fprintf,这意味着每次您收到一些数据时,这都会花费您。fprintf 和 sprintf 是出了名的慢,并且可能会受到 libevent 与内核一起工作的方式的影响。无论如何..它们会占用大量cpu,如果您经常收到事件....

我在您的代码中没有看到的另一件事是您接受新连接的位置。Libevent 有两种方法可以做到这一点。

listener = evconnlistener_new_bind(base, accept_conn_cb, NULL,
        LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, -1,
        (struct sockaddr*)&sin, sizeof(sin));

event_set(&ev_accept, listen_fd, EV_READ|EV_PERSIST, on_accept, NULL);

这似乎有点奇怪,或杂乱无章。你如何接受新的连接?

于 2013-05-06T07:20:56.590 回答