0

I wrote a program to create a number of files using pthread. for example, if I pass "./ioload 1 10", this program should create 10 files (namely output0, output1, ..., output9) that takes in total 1MB of disk space. However, when I ran the program, it was not able to create all 10 files after execution. I am wondering that what could cause this issue?

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>

int numOfThreads;
int sizeOfDisk;
int blockSize = 4096;


void *write(void *name)
{
     FILE * fd;
     char fname[100];
     strcpy(fname,"output");
     strcat(fname, (char *)name);
     int fileSize = sizeOfDisk * 1024 * 1024 / blockSize / numOfThreads; 
     char block[fileSize];
     memset(block, 0, sizeof(*block));

     int i;
     fd = fopen(fname, "w");

     if (NULL == fd)
     {
        fprintf(stderr, "ERROR: FAILED TO OPEN FILE %s\n", fname);
        return NULL;
     }

     for (i = 0; i < fileSize; i++)
        fwrite(block, sizeof(blockSize), 4096, fd);

     fclose(fd);
}

int main(int argc, char * argv[])
{
    if (argc != 3)
    {
        printf("Usage: %s sizeOfDiskInMB #OfThreads\n", argv[0]);
        exit(1);
    }

    sizeOfDisk = atoi(argv[1]);
    numOfThreads = atoi(argv[2]);

    pthread_t ths[numOfThreads];
    int i;

    for (i = 0; i < numOfThreads; i++)
    {
        char ibuffer [100];
        snprintf(ibuffer, sizeof(ibuffer), "%d", i);

        if (pthread_create(&ths[i], NULL, write, (void *)&ibuffer)) 
        {
            fprintf(stderr, "ERROR: FAILD TO CREATE THREAD %d", i);
            exit(1);
        }
    }

    for (i = 0; i < numOfThreads; i++)
    {
        pthread_join(ths[i], 0);
    }

}
4

2 回答 2

3

每个线程都传递一个指向ibuffer. 很有可能ibuffer主线程循环的每次迭代都具有相同的地址,这意味着所有线程都试图从同一内存中读取。

无法保证每个新线程都会被调度,并且会在主线程开始其循环的下一次迭代之前创建其文件,更新ibuffer. 这可能会导致多个线程尝试创建相同的文件,或者一些线程尝试在稍后的snprintf调用中间读取它们的文件名。

要解决此问题,您需要让每个线程发出信号表明它已完成构造,或者在主线程中存储一个文件名数组,将指向不同实例的指针传递给每个线程。

void *write(void *name)
{
    FILE * fd;
    char fname[100];
    snprintf(fname, sizeof(fname), "output%d", *(int*)name);
    ...


int main(int argc, char * argv[])
{
    ....
    pthread_t ths[numOfThreads];
    int fileNum[numOfThreads];
    int i;
    for (i = 0; i < numOfThreads; i++)
    {
        fileNum[i] = i;
        if (pthread_create(&ths[i], NULL, write, (void *)&fileNum[i])) 
        {
            fprintf(stderr, "ERROR: FAILD TO CREATE THREAD %d", i);
            exit(1);
        }
    }
    ....
于 2013-08-05T21:58:09.360 回答
0

感谢您提出这个问题,已经在 C# 中开发了线程,所以这是关于实现 C/Win32 线程的一个很好的评论。

首先需要注意的是,此代码是使用 Microsoft C 编译器为 Windows 机器编写的。对于 *nix 系统,您将需要做类似的事情,但调用和宏名称不同。但是,一般原理是相同的。

  1. 为每个线程创建一个唯一的存储区域。放置线程特定信息。进入这个区域。

  2. 启动一个线程,将这个区域传递给它,以及它的堆栈大小(在 MS 中,0 大小为线程提供与进程相同大小的堆栈。

  3. main()启动所有线程后,等待它们完成。使用信号量来测试是否完成。

对于 Windows,您需要使用线程安全库,但看起来 XP 及更高版本的任何系统都提供线程安全 DLL,例如msvcrt.lib. 此外,还有一个_begintheadex具有更多功能的版本,值得推荐。

最后,此代码还不能在实际应用程序中使用。它仍然需要做很多事情(比如 using _beginthreadex),但出于讨论的目的,这是一个好的开始。

// Compiled on Eclipse/Microsoft C Compiler
// compile with: /MT /D "_X86_" /c
// processor: x86
#include <Windows.h>
#include <process.h>    /* _beginthread, _endthread */
#include <stdio.h>
#include <stdlib.h>
#include <time.h>

struct ThreadParameterBlock
{
    int  threadNr;     //
    char fname[13];    // outputxx.bin\0
    int  numBlocks;    // expected number of blocks for this file
    int  blockSize;    // Number of bytes per block
    int  num_writes;   // actual number of writes
    char fill_char;    // for each byte
    clock_t  time_done;
    int  thread_done;  // 1 is running, 0 is stopped

};

void A_Writing_Thread(struct ThreadParameterBlock *TPB)
{
     // each thread has its own stack
     FILE *fd;

     int i;

     char *block;

     block = (char *) malloc(TPB->blockSize);
     if (block != NULL)
     {
       memset(block, TPB->fill_char, TPB->blockSize);

       fd = fopen(TPB->fname, "w");

       if (NULL == fd)
       {
          fprintf(stderr, "ERROR: FAILED TO OPEN FILE %s\n", TPB->fname);
       } else {

           for (i = 0; i < TPB->numBlocks; i++)
           {
              // to help demo parallelism of threads,
              // some threads run slow
              if (i%10 == 0)
              {
                  if (TPB->threadNr == 1)     {Sleep(50L);}
                  if ((TPB->threadNr%2) == 0) {Sleep(100L);}
                  if ((TPB->threadNr%3) == 0) {Sleep(200L);}
                  // glad I'm NOT thread 6!
              }
              fwrite(block, sizeof(TPB->blockSize), 1, fd);
              TPB->num_writes++;
           }

       }

       if (fd != NULL) {fclose(fd);}

       free(block);
     }

     printf("\nThread %d done.", TPB->threadNr);

     TPB->time_done   = clock();
     // only exit is thru this code...
     // thread done
     TPB->thread_done = 1;

     _endthread();
}

main()
{

    int numOfThreads = 0;
    int sizeOfDisk = 0;

    int nr_blocks;

    int thread_done_count;

    int blockSize = 4096;

    char ch = 'A';

    int i;

    // allow up to 10 threads
    struct ThreadParameterBlock *thread_array[10];
    struct ThreadParameterBlock *TPB;

    printf("Enter: Size of Disk(MB) white_space  number_of_threads press enter :");
    fflush(stdout);

    scanf("%d %d", &sizeOfDisk, &numOfThreads);

    if (sizeOfDisk < 0)  {sizeOfDisk = 1;}
    if (sizeOfDisk > 10) {sizeOfDisk = 10;}

    if (numOfThreads < 0)  {numOfThreads = 1;}
    if (numOfThreads > 10) {numOfThreads = 10;}

    nr_blocks = (sizeOfDisk * 1024 * 1024) / (numOfThreads * blockSize);
    printf("\ncreating %d threads to allocate %d blocks of 4096 bytes, per thread/file.\n",
                  numOfThreads, nr_blocks);
    fflush(stdout);

    for (i = 0; i < numOfThreads; i++)
    {
        thread_array[i] = TPB = (struct ThreadParameterBlock *)
                                    malloc(sizeof(struct ThreadParameterBlock));

        TPB->threadNr    = i + 1;
        TPB->time_done   = 0;
        TPB->thread_done = 0;
        TPB->numBlocks   = nr_blocks;
        TPB->blockSize   = blockSize;
        TPB->num_writes  = 0;
        TPB->fill_char   = ch++;
        sprintf(TPB->fname, "output%02d.bin", i+1);

        printf("\nstarting thread %d", i+1);
        fflush(stdout);

        /* On first loops, launch character threads. */
        _beginthread(A_Writing_Thread, 0, thread_array[i]);

    }

    // essentially a join...
    while (1)
    {
       /* Wait one second between polls of threads */
       Sleep( 250L );

       thread_done_count = 0;
       for (i = 0; i < numOfThreads; i++)
       {
           thread_done_count += thread_array[i]->thread_done;
       }

       if (thread_done_count >= numOfThreads) {break;}

    }

    printf("\nAll threads are done.");

    printf("\n\nThread Summary");
    for (i = 0; i < numOfThreads; i++)
    {
        printf("\nThread %2d completed %4d writes(%dkB) at %5d to <%s>.",         thread_array[i]->threadNr,
                thread_array[i]->num_writes,
                (thread_array[i]->num_writes*thread_array[i]->blockSize) / 1024,
                thread_array[i]->time_done,
                thread_array[i]->fname);

        // free parameter block for thread i
        free(thread_array[i]);
    }

} // end main;

希望这可以帮助。请询问是否有更多问题。

于 2013-08-05T23:34:36.920 回答