我有用 C 编写的共享内存通信。
以下结构位于分配段的最开始。
typedef struct {
int closedCount;
int shmId;
int semId;
int size;
volatile bool closed;
volatile unsigned int readCount;
volatile unsigned int writeCount;
volatile bool readBlocked;
volatile bool writeBlocked;
} T_IpcData;
我使用 2 个信号量 - 1 个用于读取器,1 个用于写入器。
Writer 将数据放入共享内存,更新 writeCount 并通知 reader 信号量。如果 writeCount - readCount == size,则 writer 等待 writer 信号量。
Reader 尝试使用 memcpy 从共享内存中读取 writeCount - readCount 个字节,更新 readCount 并通知 writer 信号量。如果 readCount == writeCount,阅读器等待阅读器信号量。
问题:有时(很少)我的应用程序出现错误,这是由读取的数据与写入的数据不同引起的。
我在单 CPU(8 核)和双 CPU(16 核)机器上运行测试。操作系统 - Ubuntu。问题很少在两者上重现。
问题:memcpy 提供了哪些关于“更改可见性”的保证?我怎样才能确保读者正确地看到作者线程中所做的所有修改?
谢谢!
更新(代码)
template <class T>
jlong Java_org_gridgain_grid_util_ipc_shmem_GridIpcSharedMemoryUtils_ReadShMem(
JNIEnv *env, jclass, jlong shMemPtr, T dest, jlong dOffset, jlong len, jlong timeout) {
T_IpcData *ipcData = (T_IpcData*) (((char *) shMemPtr) - BUF_OFFSET);
int unreadCnt = GetUnreadCount(ipcData);
while (unreadCnt == 0) {
if (unreadCnt == 0 && ipcData->closed) {
return -1;
}
// signal the other party, if it's blocked
if (ipcData->writeBlocked) {
if (__DEBUG) {
cerr << "Before write semaphore notification 1 [semId=" << ipcData->semId << "]\n" << flush;
}
SemNotify(env, ipcData->semId, SEM_WRITE, ipcData);
}
if (__DEBUG) {
cerr << "Before read semaphore wait [semId=" << ipcData->semId << "]\n" << flush;
}
ipcData->readBlocked = 1;
SemWait(env, ipcData->semId, SEM_READ, timeout, ipcData);
ipcData->readBlocked = 0;
unreadCnt = GetUnreadCount(ipcData);
}
int bytesRead = 0;
while (unreadCnt > 0 && bytesRead < len) {
int pos = ipcData->readCount % ipcData->size;
int len0 = (ipcData->size - pos < unreadCnt)? ipcData->size - pos: unreadCnt;
if (len0 > len - bytesRead) {
len0 = len - bytesRead;
}
RW::FromShMem(env, dest, dOffset + bytesRead , len0, (void*) (shMemPtr + pos));
ipcData->readCount += len0;
bytesRead += len0;
if (__DEBUG) {
cerr << "Before write semaphore notification 2 [semId=" << ipcData->semId << "]\n" << flush;
}
SemNotify(env, ipcData->semId, SEM_WRITE, ipcData);
unreadCnt = GetUnreadCount(ipcData);
}
return bytesRead;
}
template <class T>
jlong Java_org_gridgain_grid_util_ipc_shmem_GridIpcSharedMemoryUtils_WriteShMem(
JNIEnv *env, jclass clsName, jlong shMemPtr, T src, jlong sOffset, jlong len, jlong timeout) {
T_IpcData *ipcData = (T_IpcData*) (((char *) shMemPtr) - BUF_OFFSET);
int bytesWritten = 0;
while(bytesWritten < len) {
// Wait for reader.
int unreadCnt = GetUnreadCount(ipcData);
int pos = ipcData->writeCount % ipcData->size;
while (unreadCnt == ipcData->size) {
if (ipcData->closed) {
env->ThrowNew(env->FindClass(GRID_EXCEPTION), "Shared memory segment has been closed.");
return -1;
}
// signal the other party, if it's blocked
if (ipcData->readBlocked) {
SemNotify(env, ipcData->semId, SEM_READ, ipcData);
}
ipcData->writeBlocked = 1;
SemWait(env, ipcData->semId, SEM_WRITE, timeout, ipcData);
ipcData->writeBlocked = 0;
unreadCnt = GetUnreadCount(ipcData);
}
int len0 = ipcData->size - ((pos > unreadCnt)? pos : unreadCnt);
if (len0 > len - bytesWritten) {
len0 = len - bytesWritten;
}
if (ipcData->closed) {
env->ThrowNew(env->FindClass(GRID_EXCEPTION), "Shared memory segment has been closed");
return -1;
}
RW::ToShMem(env, src, sOffset + bytesWritten, len0, (void*) (shMemPtr + pos));
ipcData->writeCount += len0;
bytesWritten += len0;
SemNotify(env, ipcData->semId, SEM_READ, ipcData);
}
return GetUnreadCount(ipcData);
}
SemWait() 调用 semop() 和 SemNotify() 调用 semctl()。