我有一个“队列”(它不完全是 FIFO),一旦它的预分配内存用完,它就会由磁盘上的文件支持。构建的代码正在 ARM SBC 上执行。
我遇到的问题是,一段时间后,存储缓冲区文件的 SD 卡(或上面的特定文件?)损坏了,然后 Linux 将其重新安装为只读。如果我随后将 SD 卡插入我的 Windows 机器并在其上运行 chkdsk,则显示有错误。
我正在使用 fopen、fputs、fclose 并使用对 SED 的系统调用从文件中删除一行。我在代码中做错了什么吗?我是否应该用我自己的函数替换对 SED 的调用系统调用来重命名文件、读入、再次写出而不使用第一行?
我也尝试过另一张 SD 卡。
有关如何调查文件损坏原因的任何指示都将非常有用。- 谢谢。
/// Number of elements in the in-memory section of the queue.
#define DISK_QUEUE_ELEMENTS 20
#define DISK_QUEUE_SIZE (DISK_QUEUE_ELEMENTS + 1)
/// Directory where the files will be stored that will buffer any messages when the queue exceeds DISK_QUEUE_ELEMENTS.
#define DISK_QUEUE_BUFF_DIR "/mnt/sd/buff/"
/// File name pattern for indexed buffer files.
#define DISK_QUEUE_FILE_PATTERN DISK_QUEUE_BUFF_DIR "buff%d.txt"
#define DISK_QUEUE_MSGS_PER_FILE 1000
#define DISK_QUEUE_FILEPATH_LEN 32
#define DISK_QUEUE_SED_START "sed -i '1d' "
/// This is the transmision message queue. This holds the in-memory messages.
struct TsqMessage DiskQueue[DISK_QUEUE_SIZE];
int DiskQueueIn;
int DiskQueueOut;
/// Mutex used for ensuring the disk queue functions are thread safe.
pthread_mutex_t dqLock;
/// A global to ensure that other parts of the system know how many messages there are on the disk, not in RAM.
int g_dqFileMessageCount;
/// Returns the number of lines in a file.
int GetLinesInBuffFile(char * file) {
FILE* logfile = fopen(file, "r");
if (logfile == NULL) {
pthread_mutex_unlock(&dqLock);
return 0;
} else {
int ch, number_of_lines = 0;
do {
ch = fgetc(logfile);
if(ch == '\n') {
number_of_lines++;
}
} while (ch != EOF);
fclose(logfile);
return number_of_lines;
}
}
/// Returns the total number of messages that are buffered in files.
int GetLinesInBuffFiles() {
int i = 0;
char filename[DISK_QUEUE_FILEPATH_LEN];
int linesInCurrentFile = 0;
int totalLinesInFiles = 0;
do {
snprintf(filename, sizeof(char) * DISK_QUEUE_FILEPATH_LEN, DISK_QUEUE_FILE_PATTERN, i);
++i;
linesInCurrentFile = GetLinesInBuffFile(filename);
totalLinesInFiles += linesInCurrentFile;
} while (linesInCurrentFile != 0);
return totalLinesInFiles;
}
/// Initilaise the DiskQueue by setting the counters to 0 and getting the number of lines on the disk. It is reenterable.
void DiskQueueInit(void) {
pthread_mutex_lock(&dqLock);
static char firstTime = 1;
if (firstTime == 1) {
mkdir(DISK_QUEUE_BUFF_DIR, 0777);
DiskQueueIn = 0;
DiskQueueOut = 0;
g_dqFileMessageCount = GetLinesInBuffFiles();
printf("Number of lines in buff files = %d\n", g_dqFileMessageCount);
}
firstTime = 2;
pthread_mutex_unlock(&dqLock);
}
/// Add a TsqMessage sruct to the DiskQueue.
int DiskQueuePut(struct TsqMessage newMsg) {
int returnCode = 0;
// Lock the DiskQueue before we do anything with it.
pthread_mutex_lock(&dqLock);
if (DiskQueueIn == (( DiskQueueOut - 1 + DISK_QUEUE_SIZE) % DISK_QUEUE_SIZE)) {
// The in-memory DiskQueue is full so now we must store on the disk.
char messageString[DISK_LINE_SIZE];
//// TODO is this needed? memset(messageString, '\0', DISK_LINE_SIZE); // clear string
// Convert message struct to string
sprintf(messageString, "%d,%d,%f,%lld\n", newMsg.Source, newMsg.MessageId, newMsg.Value, newMsg.Timestamp);
// append string to file
char filename[DISK_QUEUE_FILEPATH_LEN];
snprintf(filename, sizeof(char) * DISK_QUEUE_FILEPATH_LEN, DISK_QUEUE_FILE_PATTERN, g_dqFileMessageCount / DISK_QUEUE_MSGS_PER_FILE);
FILE *fp = fopen(filename, "a+");
if (fp != NULL) {
// Write string to file.
if (fputs(messageString, fp) == EOF) {
// Error
printf("disk queue - fputs error\n");
pthread_mutex_unlock(&dqLock);
exit(1);
}
// Close the file.
if (fclose(fp) != EOF) {
// if ok then g_dqFileMessageCount++ (so we know how many messages are in the files)
g_dqFileMessageCount++;
returnCode = 0;
} else {
// else log failure and return -1
printf("Disk queue - error closing file (%s)\n", filename);
returnCode = -1;
}
} else {
printf("Disk queue - cannot open file (%s) - data lost.", filename);
}
pthread_mutex_unlock(&dqLock);
return returnCode;
} // else if in-memory queue not full then add to in-memory queue
DiskQueue[DiskQueueIn] = newMsg;
DiskQueueIn = (DiskQueueIn + 1) % DISK_QUEUE_SIZE;
pthread_mutex_unlock(&dqLock);
return returnCode; // No errors
}
/// Pop the oldest TsqMessage struct of the in-memory DiskQueue if the disk is empty else pop it off of the disk file.
int DiskQueueGet(struct TsqMessage *old) {
// Lock the DiskQueue before we do anything with it.
pthread_mutex_lock(&dqLock);
// If g_dqFileMessageCount is > 0 then prioritise getting messages off of the disk.
if (g_dqFileMessageCount > 0) {
char filename[DISK_QUEUE_FILEPATH_LEN];
snprintf(filename, sizeof(char) * DISK_QUEUE_FILEPATH_LEN, DISK_QUEUE_FILE_PATTERN, (g_dqFileMessageCount-1) / DISK_QUEUE_MSGS_PER_FILE);
char * line = NULL;
size_t len = 0;
ssize_t read;
// Open file
FILE * fp = fopen(filename, "r");
//printf("reading from file: %s\n", filename);
if (fp == NULL) {
printf("ERROR reading from disk buffer, no such file: %s\n", filename);
pthread_mutex_unlock(&dqLock);
return -1;
}
// Read line from file
if ((read = getline(&line, &len, fp)) != -1) {
char* token;
char* toFree;
if (line != NULL) {
toFree = line;
int lineCounter = 0;
while ((token = strsep(&line, ",")) != NULL) {
if (lineCounter == 0) {
old->Source = atoi(token);
} else if (lineCounter == 1) {
old->MessageId = atoi(token);
} else if (lineCounter == 2 ) {
old->Value = atof(token);
} else if (lineCounter == 3) {
old->Timestamp = atoll(token);
} else {
fclose(fp);
pthread_mutex_unlock(&dqLock);
printf("ERROR reading from disk buffer, bad format data\n");
return -1;
}
lineCounter++;
}
free(toFree);
}
} else {
int errsv = errno;
fclose(fp);
free(line);
pthread_mutex_unlock(&dqLock);
printf("ERROR reading from disk buffer, messages that it thinks are on the disk:%d, errorcode:%d\n", g_dqFileMessageCount, errsv);
return -1;
}
fclose(fp);
free(line);
// Delete line from file that we just read.
char * sedCommand;
asprintf(&sedCommand, "%s %s", DISK_QUEUE_SED_START, filename);
int sedRet = system(sedCommand);
free(sedCommand);
if (sedRet != 0) {
printf(" --- --- SED ERROR %d \n", sedRet); // TODO handle if possible?
} else {
g_dqFileMessageCount--;
}
// We've set *old to be the read value from the file so we can safely unlock and return.
pthread_mutex_unlock(&dqLock);
return 0;
}
// Queue empty
if (DiskQueueIn == DiskQueueOut) {
pthread_mutex_unlock(&dqLock);
return -1;
}
// Return the next item from the queue.
*old = DiskQueue[DiskQueueOut];
DiskQueueOut = (DiskQueueOut + 1) % DISK_QUEUE_SIZE;
pthread_mutex_unlock(&dqLock);
// No errors
return 0;
}