1

我是异步 I/O 的新手。我需要让它在 Linux 系统上的一些 C 和 Fortran 程序中工作。我设法编写了一个从两个文件异步读取的小 C 测试代码(包括在下面)。代码编译并运行。不过,我想知道的是,我是真正获得异步 I/O,还是 I/O 真的是串行的?我正在处理的 luster 文件系统有点过时,不清楚它是否真正支持异步 I/O,似乎没有人有明确的答案。所以我想知道是否有一些时间语句或任何类型的输出可以添加到代码中,以确定它是否以真正异步的方式运行。我打赌我需要比我正在处理的文件大得多的文件才能进行有意义的测试。不知道我还需要什么。

代码是:

#include <stdio.h>
#include <stdlib.h>
/*  for "open()" ... */
#include <fcntl.h>
/* for "bzero()" ... */
#include<strings.h>
/* for asynch I/O ... */
#include <aio.h>
/* for EINPROGRESS ... */
#include <errno.h>
/* for "usleep()" ... */
#include <unistd.h>

#define BUFSIZE 1024


int main() {

        int fd0, fd1, readstat0, readstat1;
        struct aiocb *ioobjs[2];

        ioobjs[0] = malloc(sizeof(struct aiocb));
        ioobjs[1] = malloc(sizeof(struct aiocb));

        fd0 = open("file.txt", O_RDONLY);
        if (fd0 < 0) perror("open");
        fd1 = open("otherfile.txt", O_RDONLY);
        if (fd1 < 0) perror("open");

        bzero((char *)ioobjs[0], sizeof(struct aiocb));
        bzero((char *)ioobjs[1], sizeof(struct aiocb));

        ioobjs[0]->aio_buf = malloc(BUFSIZE+1);
        if (!ioobjs[0]->aio_buf) perror("malloc 0");
        ioobjs[1]->aio_buf = malloc(BUFSIZE+1);
        if (!ioobjs[1]->aio_buf) perror("malloc 0");

        ioobjs[0]->aio_fildes = fd0;
        ioobjs[0]->aio_nbytes = BUFSIZE;
        ioobjs[0]->aio_offset = 0;
        /* Don't forget this!  With list I/O, there is no
         * particular function call to make.  You have to
         * tell what you want to do via this member of
         * your aiocb struct:
         */
        ioobjs[0]->aio_lio_opcode = LIO_READ;
        ioobjs[1]->aio_fildes = fd1;
        ioobjs[1]->aio_nbytes = BUFSIZE;
        ioobjs[1]->aio_offset = 0;
        ioobjs[1]->aio_lio_opcode = LIO_READ;

        readstat0 = aio_read(ioobjs[0]);
        if (readstat0 < 0) perror("reading 0");
        readstat1 = aio_read(ioobjs[1]);
        if (readstat1 < 0) perror("reading 1");

        lio_listio(LIO_NOWAIT, ioobjs, 2, NULL);


        /* don't completely understand.  gives system time to
         * "wrap things up".  without this, one of the outputs
         * below (maybe both) will have no output to give.
         */
        usleep(100);

        if ((readstat0 = aio_return( ioobjs[0] )) > 0) {
                printf(">>>\n");
                printf("%s\n", (char *)(ioobjs[0]->aio_buf));
                printf("<<<\n");
        } else {
                perror("return");
        }
        if ((readstat1 = aio_return( ioobjs[1] )) > 0) {
                printf(">>>\n");
                printf("%s\n", (char *)(ioobjs[1]->aio_buf));
                printf("<<<\n");
        } else {
                perror("return");
        }


}
4

1 回答 1

2

man aio,请注意,这aio_*完全是一个glibc[用户空间] 实现。

因此,如前所述,它有一些局限性。

从时间上看正在发生的事情的方法是拥有一个带有时间戳的事件日志。

天真的方法是只使用 [debug]printf调用。但是,对于精确时间测量,开销printf可能会破坏实际/实际时间。也就是说,我们不测量“被测系统”,而是“被测系统+时序/基准开销”。

一种方法是strace使用适当的时间戳选项运行您的程序。该strace日志包含有关使用的系统调用的信息。但是,由于aio是在用户空间中实现的,它可能无法深入到足够细的粒度。而且,strace它本身可能会产生开销。

另一种方法是创建跟踪/事件日志机制并检测您的代码。基本上,它实现了一个固定长度的“微量元素”环形队列。因此,跟踪数据存储在内存中,因此速度非常快。

可以帮助解决此问题的标准实用程序是dtrace. 我自己没有这样做,因为我更喜欢“自己动手”。有关我使用的一些实际代码,请参见下文。

然后,使用(例如)检测您的代码:

evtadd(TRACE_HELLO,"hello world");

从哪里来TRACE_*enum您可以根据需要定义。


无论如何,这是我过去使用过的一些事件跟踪代码。可以扩展事件结构以添加您希望在事件点存储的任何额外数据。

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <sys/syscall.h>
#include <pthread.h>

typedef long long tsc_t;
typedef unsigned long long u32;
typedef unsigned long long u64;

// trace buffer element
typedef struct {
    tsc_t evt_tsc;
    pid_t evt_tid;
    const char *evt_name;
    u64 evt_xid;
} evtelem_t;

// trace buffer indexes
typedef struct {
    u32 ring_enq;
    u32 ring_deq;
} evtring_t;

// trace buffer control
typedef struct {
    evtring_t que_ring;
    u32 que_max;
    evtelem_t *que_base;
    pthread_mutex_t que_lock;
} evtctl_t;

evtctl_t evtctl;

// advance queue index [wrap if necessary]
#define EVTINC(_qidx) \
    do { \
        _qidx += 1; \
        if (_qidx >= evtctl.que_max) \
            _qidx = 0; \
    } while (0)

// tscget -- get timestamp
tsc_t
tscget(void)
{
    struct timespec ts;
    static tsc_t tsczero = 0;
    tsc_t tsc;

    clock_gettime(CLOCK_MONOTONIC,&ts);

    tsc = ts.tv_sec;
    tsc *= 1000000000;
    tsc += ts.tv_nsec;

    if (tsczero == 0)
        tsczero = tsc;

    tsc -= tsczero;

    return tsc;
}

// tscsec -- convert timestamp to fractional seconds
double
tscsec(tsc_t tsc)
{
    double sec;

    sec = tsc;
    sec /= 1e9;

    return sec;
}

// evtptr -- point to trace element
evtelem_t *
evtptr(u32 idx)
{

    return &evtctl.que_base[idx];
}

// evtinit -- initialize trace
void
evtinit(u32 qmax)
{

    evtctl.que_base = calloc(qmax,sizeof(evtelem_t));
    evtctl.que_max = qmax;

    evtctl.que_ring.ring_deq = 0;
    evtctl.que_ring.ring_enq = 0;

    pthread_mutex_init(&evtctl.que_lock,NULL);
}

// evtnew -- locate new event slot
evtelem_t *
evtnew(void)
{
    evtring_t *nring;
    evtelem_t *evt;

    pthread_mutex_lock(&evtctl.que_lock);

    nring = &evtctl.que_ring;

    evt = evtptr(nring->ring_enq);

    // advance enqueue pointer [wrap if necessary]
    EVTINC(nring->ring_enq);

    // if queue full advance dequeue pointer to maintain space
    if (nring->ring_enq == nring->ring_deq)
        EVTINC(nring->ring_deq);

    pthread_mutex_unlock(&evtctl.que_lock);

    return evt;
}

// evtadd -- add trace element
evtelem_t *
evtadd(u64 xid,const char *name)
{
    tsc_t tsc;
    evtelem_t *evt;

    tsc = tscget();

    evt = evtnew();
    evt->evt_tsc = tsc;
    evt->evt_xid = xid;
    evt->evt_name = name;
    evt->evt_tid = syscall(SYS_gettid);

    return evt;
}

// _evtdump -- dump queue element
void
_evtdump(evtelem_t *evt,tsc_t *tscptr,FILE *xfinfo)
{
    tsc_t tscprev;
    tsc_t tscnow;
    double elap;
    double delta;

    // get timestamp for this entry
    tscnow = evt->evt_tsc;

    tscprev = *tscptr;
    if (tscprev == 0)
        tscprev = tscnow;

    // get time delta from previous entry
    tscprev = tscnow - tscprev;
    delta = tscsec(tscprev);

    // get elapsed time from start
    elap = tscsec(tscnow);

    fprintf(xfinfo,"%.9f/%.9f %8d %s [%llu]\n",
        elap,delta,evt->evt_tid,evt->evt_name,evt->evt_xid);

    *tscptr = evt->evt_tsc;
}

// evtdump -- dump the queue
void
evtdump(const char *file)
{
    FILE *evtxf;
    evtring_t ring;
    evtelem_t *evt;
    tsc_t tscprev;

    evtxf = fopen(file,"w");
    if (evtxf == NULL) {
        perror(file);
        exit(1);
    }

    ring = evtctl.que_ring;

    // initialize previous timestamp
    tscprev = 0;

    while (ring.ring_enq != ring.ring_deq) {
        evt = evtptr(ring.ring_deq);
        EVTINC(ring.ring_deq);
        _evtdump(evt,&tscprev,evtxf);
    }

    fclose(evtxf);
}
于 2020-12-01T22:18:47.973 回答