我试图使用 fork 结合setjmp/longjmp来实现基于多线程 fork 的检查点方案。我希望我的解决方案能奏效,但正如预期的那样,它没有。代码如下所示,其中包含检查点/回滚的示例用法。
主要思想是自己为线程分配堆栈,就像使用函数pthread_create_with_stack一样,然后只使用来自主线程的 fork。分支进程(检查点)在开始时被挂起,当被唤醒(回滚)时,分支进程的主线程通过调用pthread_create重新创建线程,并使用与原始进程中的线程相同的堆栈。longjmp也是在线程例程开始时完成的,以便在进程作为检查点分叉时跳转到代码中的同一点。请注意,所有setjmp调用都在函数my_pthread_barrier_wait内完成,因此没有线程获得锁。
我认为这里的问题是setjmp/lonjmp。getcontext /savecontext/makecontext会在这里提供帮助,还是其他什么?甚至可以在这里以这样的方式使用setjmp/longjmp吗?任何解决方案将不胜感激。
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <unistd.h>
#include <semaphore.h>
#include <signal.h>
#include <sys/types.h>
#include <setjmp.h>
#define PERFORM_JMP
#define NUM_THREADS 4
void *stackAddr[NUM_THREADS];
pthread_t thread[NUM_THREADS];
jmp_buf buf[NUM_THREADS];
pthread_attr_t attr[NUM_THREADS];
pthread_barrier_t bar;
sem_t sem;
pid_t cp_pid;
int rollbacked;
int iter;
long thread_id[NUM_THREADS];
void *BusyWork(void *t);
void sig_handler(int signum)
{
    printf( "signal_handler posting sem!\n" );
    sem_post( &sem );
}
int pthread_create_with_stack( void *(*start_routine) (void *), int tid )
{
    const size_t STACKSIZE = 0xC00000; //12582912
    size_t i;
    pid_t pid;
    int rc;
    printf( "tid = %d\n", tid );
    pthread_attr_init( &attr[tid] );
    stackAddr[tid] = malloc(STACKSIZE);
    pthread_attr_setstack( &attr[tid], stackAddr[tid], STACKSIZE );
    thread_id[tid] = tid;
    rc = pthread_create( &thread[tid], &attr[tid], start_routine, (void*)&thread_id[tid] );
    if (rc) 
    {
        printf("ERROR; return code from pthread_create() is %d\n", rc);
        exit(-1);
    }
    return rc;
}
pid_t checkpoint()
{
    pid_t pid;
    int t, rc;
    switch (pid=fork()) 
    {
    case -1: 
        perror("fork"); 
        break;
    case 0:         // child process starts
        sem_wait( &sem );
        rollbacked = 1;
        printf( "case 0: rollbacked = 1, my pid is %d\n", getpid() );
        for( t = 1; t < NUM_THREADS; t++ ) 
        {
            printf( "checkpoint: creating thread %d again\n", t );
            rc = pthread_create( &thread[t], &attr[t], BusyWork, (void*)&thread_id[t] );
            if (rc) 
            {
                printf("ERROR; return code from pthread_create() is %d\n", rc);
                exit(-1);
            }
        }
        return 1;  // child process ends
    default:        // parent process starts
        return pid;
    }
}
void restart_from_checkpoint( pid_t pid )
{
    printf( "Restart_from_checkpoint, sending signal to %d!\n", pid );
    kill( pid, SIGUSR1 );
    exit( 0 );
}
void take_checkpoint_or_rollback( int sig_diff )
{
    if ( cp_pid )
    {
        if ( sig_diff )
        {
            printf( "rollbacking\n" );
            if ( !rollbacked )
                restart_from_checkpoint( cp_pid );
        }
        else
        {
            kill( cp_pid, SIGKILL );
            cp_pid = checkpoint();
            printf( "%d: cp_pid = %d!\n", getpid(), cp_pid );
        }
    }
    else
        cp_pid = checkpoint();
}
void my_pthread_barrier_wait( int tid, pthread_barrier_t *pbar )
{
    pthread_barrier_wait( pbar );
#ifdef PERFORM_JMP   
    if ( tid == 0 )
    {
        if ( !rollbacked )
        {
            take_checkpoint_or_rollback( ++iter == 4 );
        }
    }
    if ( setjmp( buf[tid] ) != 0 ) {}
    else {}
    printf( "%d: %d is waiting at the second barrier!\n", getpid(), tid );
#endif
    pthread_barrier_wait( pbar );
}
void *BusyWork(void *t)
{
   volatile int i;
   volatile long tid = *((long*)t);
   volatile double result = 0.0;
   printf( "thread %ld in BusyWork!\n", tid );
#ifdef PERFORM_JMP   
   if ( rollbacked )
   {
    printf( "hmm, thread %ld is now doing a longjmp, goodluck!\n", tid );
    longjmp( buf[tid], 1 );
   }
#endif
   printf("Thread %ld starting...\n",tid);
   for ( i = 0; i < 10; i++)
   {
      result += (tid+1) * i;
      printf( "%d: tid %ld: result = %g\n", getpid(), tid, result );
      my_pthread_barrier_wait(tid, &bar);
   }
   printf("Thread %ld done. Result = %g\n", tid, result);
   //pthread_exit((void*) t);
}
int main (int argc, char *argv[])
{
   int rc;
   long t;
   void *status;
   /* Initialize and set thread detached attribute */
   pthread_barrier_init(&bar, NULL, NUM_THREADS);
#ifdef PERFORM_JMP   
   signal(SIGUSR1, sig_handler);
   sem_init( &sem, 0, 0 );
#endif
   for( t = 1; t < NUM_THREADS; t++ ) 
   {
      printf( "Main: creating thread %ld\n", t );
      rc = pthread_create_with_stack( BusyWork, t ); // This is the line 52
      if (rc) 
      {
         printf("ERROR; return code from pthread_create() is %d\n", rc);
         exit(-1);
      }
   }
   thread_id[0] = 0;
   BusyWork( &thread_id[0] );
   /* Free attribute and wait for the other threads */
   for(t=1; t<NUM_THREADS; t++) 
   {
      rc = pthread_join(thread[t], &status);
      if (rc) 
      {
         printf("ERROR; return code from pthread_join() is %d\n", rc);
         exit(-1);
      }
      printf("Main: completed join with thread %ld having a status"   
            "of %ld\n",t,(long)status);
    }
    printf("Main: program completed. Exiting.\n");
    pthread_exit(NULL);
}