0

我正在编写一个 C/MPI 程序,该程序使许多进程从数据文件中读取。当使用来自 stdio 的标准函数(fopen、fread、fseek)时,一切顺利。我不能超过 4 Go 偏移的问题。所以我使用 MPI-IO 函数来读取一个大文件,此时内存并没有很好地释放。

实际上,我读取了一个缓冲区,然后对其进行处理,然后释放分配的内存。每个进程的内存使用量是完美的,但全局内存使用量并没有停止增加。只需将 mpi_file_read 替换为 fread 就没有这个问题。

有我的代码:

double CPUtime(){ return ((double) clock())/CLOCKS_PER_SEC;}int main(int argc, char* argv []){

if(argc != 5) {
    printf("\t[Dictionary file] [Dictionary] [Input file] [Buffer size]\n");
    exit(0);
}

char* sInput    = malloc (sizeof(char)*maxLength);
char* sOutput   = malloc (sizeof(char)*maxLength);
char* compl     = malloc (sizeof(char)*maxLength);

char* sDictionaryFileName   =   argv[1];
char* sDictionaryName       =   argv[2];
char* filename              =   argv[3];
int Mbuffer                 =   atoi(argv[4]);



int maxBuffer = Mbuffer*1024*1024;
int over      = 10000;

int rank,numprocess;
long int offset;


char* buffer;
char* opbuffer;

double tstart=CPUtime();

MPI_Init( &argc, &argv );
MPI_Comm_rank( MPI_COMM_WORLD, &rank );

/* mpi version */
/* open the file*/
MPI_File fh;
int err;
err = MPI_File_open(MPI_COMM_WORLD, filename, MPI_MODE_RDONLY, MPI_INFO_NULL, &fh);
if (err != MPI_SUCCESS) {
    char errstr[MPI_MAX_ERROR_STRING];
    int  errlen;
    MPI_Error_string(err, errstr, &errlen);
    printf("Error at opening file %s (%s)\n",filename,errstr);
    MPI_Finalize();
    exit(1);
}

// get offsets and buffer size
MPI_Offset sfile;
MPI_File_get_size(fh,&sfile);
MPI_Status status;


/* C version */
/*FILE* fh;
long int sfile;
fh =fopen( filename,"rb");
if (fh==NULL) {
    printf("Error at opening file %s\n",filename);
    exit(1);
}
// get offsets and buffer size
fseek(fh, 0L, SEEK_END);
sfile = ftell(fh);
fseek(fh, 0L, SEEK_SET);*/


MPI_Comm_size( MPI_COMM_WORLD, &numprocess );

/* number of iterations */
long int data_size = (long int)(sfile/(numprocess));
int nbIter = data_size/maxBuffer;
if(nbIter<=1){
    nbIter = 1;
    maxBuffer = data_size;
}

/* offsets */
offset = data_size*(rank);
long int cursor = offset;
char* header;
if(rank==0){
    FILE* fh;
    fh =fopen( filename,"rb");
    if (fh==NULL) {
        printf("Error at opening file %s\n",filename);
        exit(1);
    }
    /* read the header and broadcast it */
    header = malloc(sizeof(char)*1000);
    fgets(header,1000,fh);
    fclose(fh);

    //broadcast header
    int sndHeader = strlen(header);
    //cursor+=sndHeader;
    int process_counter;
    for(process_counter=1;process_counter<numprocess;process_counter++){
        int ierr = MPI_Send(&sndHeader, 1, MPI_INT, process_counter, 42,MPI_COMM_WORLD);
        if (ierr != MPI_SUCCESS) {
            int errclass,resultlen;
            char err_buffer[MPI_MAX_ERROR_STRING];
            MPI_Error_class(ierr,&errclass);
            if (errclass== MPI_ERR_RANK) {
                fprintf(stderr,"Invalid rank used in MPI send call\n");
                MPI_Error_string(ierr,err_buffer,&resultlen);
                fprintf(stderr,err_buffer);
                MPI_Finalize();
            }
        }
        MPI_Send(header, sndHeader, MPI_CHAR, process_counter, 43, MPI_COMM_WORLD);
    }
}
else{
    /* receive the header */
    int sizeofHeader;
    MPI_Status s ;
    MPI_Recv(&sizeofHeader,1,MPI_INT,0,42,MPI_COMM_WORLD,&s);
    header = malloc (sizeof(char)*sizeofHeader+1);
    MPI_Recv(header,sizeofHeader,MPI_CHAR,0,43,MPI_COMM_WORLD,&s);
}


/* Synchronization barrier */
MPI_Barrier(MPI_COMM_WORLD);


int count;

opbuffer = malloc(sizeof(char)*maxBuffer);

/* C version */
//fseek(fh,cursor,SEEK_SET);

for(count=0;count<nbIter;count++){

    if(count==0 && rank==numprocess-1){ //init ring
        //send the token to p0
        int token=1;
        MPI_Send(&token,sizeof(int),MPI_INT,0,55,MPI_COMM_WORLD);
    }

    //recv
    int token;
    int sender;
    if(rank==0)
        sender = numprocess-1;
    else
        sender=rank-1;

    MPI_Status s;
    MPI_Recv(&token,sizeof(int),MPI_INT,sender,55,MPI_COMM_WORLD,&s);
    fflush(stdout);printf("P%d got the token at %G\n",rank,CPUtime());
    //read
    double start=CPUtime();
    /*double readtime;
    double sread=CPUtime();//read time*/

    //read
    if(token==1){
        /* MPI version */
        int err=MPI_File_read_at(fh, cursor,opbuffer,  sizeof(char)*maxBuffer, MPI_CHAR, &status);
        if(err!=MPI_SUCCESS){
            /*char errstr[MPI_MAX_ERROR_STRING];
            int  errlen;
            MPI_Error_string(err, errstr, &errlen);
            printf("Error reading file %s (%s)\n",filename,errstr);*/
            MPI_Finalize();
            exit(0);
        }

        /* C version of read */
        /*int k=fread(opbuffer,sizeof(char),maxBuffer,fh);
        if(k==0)
            perror("fread");*/

        cursor+=maxBuffer;
        buffer=opbuffer;

    }
    else{
        printf("Error token!\n");
        token=1;
    }
    //printf("P%d readtime=%G\n",rank,CPUtime()-sread);
    //Isend
    int next = (rank+1)%numprocess;
    MPI_Send(&token,sizeof(int),MPI_INT,next,55,MPI_COMM_WORLD);

    /* start processing*/ 
    /* end processing */


}
free(opbuffer);
int er=MPI_File_close(&fh);
if(er!=MPI_SUCCESS){
    printf("Error closing file\n");
    MPI_Finalize();
    exit(1);
}
MPI_Finalize();

printf("Global time : %G\n",CPUtime()-tstart);
return 0;
}

如果有人知道它是什么,我会很高兴。谢谢你。

4

1 回答 1

1

这可能是你从来没有打电话MPI_File_close。这将导致对文件的中间操作泄漏。请注意,if(err!=MPI_SUCCESS)如果您真的想编写干净的代码,也应该在错误情况下关闭它。

于 2012-07-11T16:52:05.030 回答