1

我有用 C 编写的共享内存通信。

以下结构位于分配段的最开始。

typedef struct {
    int closedCount;
    int shmId;
    int semId;
    int size;
    volatile bool closed;
    volatile unsigned int readCount;
    volatile unsigned int writeCount;
    volatile bool readBlocked;
    volatile bool writeBlocked;
} T_IpcData;

我使用 2 个信号量 - 1 个用于读取器,1 个用于写入器。

Writer 将数据放入共享内存,更新 writeCount 并通知 reader 信号量。如果 writeCount - readCount == size,则 writer 等待 writer 信号量。

Reader 尝试使用 memcpy 从共享内存中读取 writeCount - readCount 个字节,更新 readCount 并通知 writer 信号量。如果 readCount == writeCount,阅读器等待阅读器信号量。

问题:有时(很少)我的应用程序出现错误,这是由读取的数据与写入的数据不同引起的。

我在单 CPU(8 核)和双 CPU(16 核)机器上运行测试。操作系统 - Ubuntu。问题很少在两者上重现。

问题:memcpy 提供了哪些关于“更改可见性”的保证?我怎样才能确保读者正确地看到作者线程中所做的所有修改?

谢谢!

更新(代码)

template <class T>
    jlong Java_org_gridgain_grid_util_ipc_shmem_GridIpcSharedMemoryUtils_ReadShMem(
        JNIEnv *env, jclass, jlong shMemPtr, T dest, jlong dOffset, jlong len, jlong timeout) {
    T_IpcData *ipcData = (T_IpcData*) (((char *) shMemPtr) - BUF_OFFSET);

    int unreadCnt = GetUnreadCount(ipcData);

    while (unreadCnt == 0) {
        if (unreadCnt == 0 && ipcData->closed) {
            return -1;
        }

        // signal the other party, if it's blocked
        if (ipcData->writeBlocked) {
            if (__DEBUG) {
                cerr << "Before write semaphore notification 1 [semId=" << ipcData->semId << "]\n" << flush;
            }

            SemNotify(env, ipcData->semId, SEM_WRITE, ipcData);
        }

        if (__DEBUG) {
            cerr << "Before read semaphore wait [semId=" << ipcData->semId << "]\n" << flush;
        }

        ipcData->readBlocked = 1;
        SemWait(env, ipcData->semId, SEM_READ, timeout, ipcData);
        ipcData->readBlocked = 0;

        unreadCnt = GetUnreadCount(ipcData);
    }

    int bytesRead = 0;

    while (unreadCnt > 0 && bytesRead < len) {
        int pos = ipcData->readCount % ipcData->size;
        int len0 =  (ipcData->size - pos < unreadCnt)? ipcData->size - pos: unreadCnt;

        if (len0 > len - bytesRead) {
            len0 = len - bytesRead;
        }

        RW::FromShMem(env, dest, dOffset + bytesRead , len0, (void*) (shMemPtr + pos));
        ipcData->readCount += len0;
        bytesRead += len0;

        if (__DEBUG) {
            cerr << "Before write semaphore notification 2 [semId=" << ipcData->semId << "]\n" << flush;
        }

        SemNotify(env, ipcData->semId, SEM_WRITE, ipcData);
        unreadCnt = GetUnreadCount(ipcData);
    }

    return bytesRead;
}

template <class T>
jlong Java_org_gridgain_grid_util_ipc_shmem_GridIpcSharedMemoryUtils_WriteShMem(
    JNIEnv *env, jclass clsName, jlong shMemPtr, T src, jlong sOffset, jlong len, jlong timeout) {
    T_IpcData *ipcData = (T_IpcData*) (((char *) shMemPtr) - BUF_OFFSET);

    int bytesWritten = 0;

    while(bytesWritten < len) {
        // Wait for reader.
        int unreadCnt = GetUnreadCount(ipcData);
        int pos = ipcData->writeCount % ipcData->size;

        while (unreadCnt == ipcData->size) {
            if (ipcData->closed) {
                env->ThrowNew(env->FindClass(GRID_EXCEPTION), "Shared memory segment has been closed.");
                return -1;
            }

            // signal the other party, if it's blocked
            if (ipcData->readBlocked) {
                SemNotify(env, ipcData->semId, SEM_READ, ipcData);
            }

            ipcData->writeBlocked = 1;
            SemWait(env, ipcData->semId, SEM_WRITE, timeout, ipcData);
            ipcData->writeBlocked = 0;

            unreadCnt = GetUnreadCount(ipcData);
        }

        int len0 = ipcData->size - ((pos >  unreadCnt)?  pos :  unreadCnt);

        if (len0 > len - bytesWritten) {
            len0 = len - bytesWritten;
        }

        if (ipcData->closed) {
            env->ThrowNew(env->FindClass(GRID_EXCEPTION), "Shared memory segment has been closed");
            return -1;
        }

        RW::ToShMem(env, src, sOffset + bytesWritten, len0, (void*) (shMemPtr + pos));
        ipcData->writeCount += len0;
        bytesWritten += len0;
        SemNotify(env, ipcData->semId, SEM_READ, ipcData);
    }

    return GetUnreadCount(ipcData);
}

SemWait() 调用 semop() 和 SemNotify() 调用 semctl()。

4

0 回答 0