0

i'm trying my first run with shared memory and named semaphore to sync access to it.

My program had 3 processes - Parent one, and two childs, all must use the same shared memory. in order to sync between them, i'm using named sempahore. the resource which they share is a 3 integers array, when array[0] - exit flag location, which the child process read before they operate in order to determine if parent process wants to exit. array[1], array[2] - are used to communicate with parent process , each process places a message in his own array cell, and the parent process reads it, and places an ACK message in respone.

I'm trying to get a basic work flow of my code - make all necessary resources, make parent sleep for 3 seconds, and then initiate the exit_procedure.

My problem is, when getting to the exit_procedure, the main process blocks forever on sem_wait() operation - apparently deadlock. I'm trying to figure out the problem and can't seem to pin point it. I'm new to process synchronization - until this code i've synced threads only.

UPDATE: I've switched using POSIX memory mapping, and now i've got the same deadlock issue. relevant methods in process_api.h can't get a hold of the lock, they just block forever. i don't know what am i doing wrong. Can some please assist?

My code:

main file:

int *shmem_p;       //!< Shared variable to be used across different proccesses
int shmem_fd;       //!< Shared memory id
sem_t *sem_p;       //!< Sempahore for syncronizing access to shared memory

volatile sig_atomic_t done;         //!< An atomic flag to signal this process threads they are done
volatile sig_atomic_t signal_rcvd;  //!< Indication to exit procedure if a signal caused termination

/**
 * @brief Exit procedure to be called when program is done
 */
static void exit_procedure()
{
    block_all_signals();            /* Block all signals - we're already existing */

    if(signal_rcvd == SIGTERM) {    /* SIGTERM is manually raised by us when a thread terminates, thus not handled in signal handler */
        write(STDERR_FILENO, "Error occured - thread terminated\n", 33);
    }

    if( !signal_rcvd ) {            /* We got here normally, or by thread termination - set done flag */
        done = true;
    }

    /* Free all relevant resources */
    sem_unlink("/shmemory");
    sem_close(sem_p);

    munmap(shmem_p, TOTAL_PROC_NUM*sizeof(int));
    shm_unlink("/shmemory");

    sem_p = NULL;
    shmem_p = NULL;
}

static void signal_handler(int sig_num) {
    switch(sig_num) {
    case SIGCHLD:
        write(STDERR_FILENO, "Error occured - Child process terminated\n", 43);
        break;

    case SIGALRM:
        write(STDOUT_FILENO, "Successfully finished sim\n", 28);
        break;

    default:
        fprintf(stderr, "Error - Signal %s has been raised", strsignal(sig_num));
        fflush(stderr);
        break;
    }

    done = true;
    signal_rcvd = true;
}

static status_t init_procedure()
{
    done = false;
    signal_rcvd = false;
    size_t size = TOTAL_PROC_NUM*sizeof(int);

    /* Initialize shared memory to be used as an exit flag to be used by all processes */

    shmem_fd = shm_open("/shmemory", O_CREAT | O_TRUNC | O_RDWR, 0644);
    if(shmem_fd < 0) {
        error_and_exit("shm_open() failed, err = ", errno);
    }

    if(ftruncate(shmem_fd, size)) {
        shm_unlink("/shmemory");
        error_and_exit("ftruncate() failed, err = ", errno);
    }

    shmem_p = (int *)mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, shmem_fd, 0);
    if(shmem_p == MAP_FAILED) {
        shm_unlink("/shmemory");
        error_and_exit("mmap() failed, err = ", errno);
    }

    close(shmem_fd);    /* No longer needed */
    memset(shmem_p, 0, size);

    /* Initialize a named sempahore for the procceses shared memory */
    sem_p = sem_open("/shsemaphore", O_CREAT | O_RDWR, 0644, 1);
    if(SEM_FAILED == sem_p) {
        error("sem_open() failed, err = ", errno);
        munmap(shmem_p, size);
        shm_unlink("/shmemory");
    }

    /* Initialize memory access invokers processes */
    if(processes_init() != SUCCESS) {
        error("init_processes() failed\n", ERR);
        munmap(shmem_p, size);
        shm_unlink("/shmemory");
        sem_close(sem_p);
        return FAILURE;
    }

    /* Handle Signals - Ignore SIGINT, SIGQUIT, handle SIGCHLD & SIGALRM */

    struct sigaction sig_handler;
    sig_handler.sa_flags = 0;

    if(sigfillset(&sig_handler.sa_mask)) {  /* Mask all other signals inside the handler */
        error("sigemptyset() failed, err = ", errno);
        exit_procedure();
        return FAILURE;
    }

    sig_handler.sa_handler = signal_handler;
    if(sigaction(SIGCHLD, &sig_handler, NULL) || sigaction(SIGALRM, &sig_handler, NULL)) {  /* Set the signal handler for SIGCHLD & SIGALRM */
        error("sigaction() failed, err = ", errno);
        exit_procedure();
        return FAILURE;
    }

    sig_handler.sa_handler = SIG_IGN;
    if(sigaction(SIGINT, &sig_handler, NULL) || sigaction(SIGQUIT, &sig_handler, NULL)) {   /* Ignore ctrl+c and ctrl+z */
        error("sigaction() failed, err = ", errno);
        exit_procedure();
        return FAILURE;
    }

    return SUCCESS;
}

int main(int argc, char *argv[])
{
    if(argc != 1) {
        fprintf(stderr, "usage: %s (no arguments allowed)\n", argv[0]);
        exit(EXIT_FAILURE);
    }

    if(SUCCESS != init_procedure()) {
        error_and_exit("init_procedure() failed\n", ERR);
    }

    sleep(5);
    exit_procedure();

    return EXIT_SUCCESS;
}

process handler:

#define WR_RATE (0.8)               //!< Writing probabilty when invoking memory access
#define WR_RATE_INT (WR_RATE*10)    //!< WR_RATE as an int value between 1 and 10
#define INTER_MEM_ACCS_T (100000)   //!< Waiting time between memory accesses

static pid_t child_pids[CHILD_PROC_NUM];

int process_cnt;                    //!< Determines the index of the process, for child processes
extern sem_t *sem_p;

static bool is_ack_received(int *mem_p, off_t size)
{
    bool ack;

    /*********************************************************/
    /**                 Critical Section start              **/
    if((sem_wait(sem_p) != 0) && (errno != EINTR)) {
        munmap(mem_p, size);
        shm_unlink("/shmemory");
        error_and_Exit("sem_wait() failed, err = ", errno);
    }

    ack = (mem_p[process_cnt] == MSG_ACK);

    if(ack) {// TODO - Remove
        fprintf(stdout, "Process %d received ACK\n", process_cnt);
        fflush(stdout);
    }

    if((sem_post(sem_p) != 0) && (errno != EINTR)) {
        munmap(mem_p, size);
        shm_unlink("/shmemory");
        error_and_Exit("sem_post() failed, err = ", errno);
    }
    /**                 Critical Section end                **/
    /*********************************************************/

    return ack;
}

static void invoke_memory_access(int *mem_p, off_t size)
{
    msg_e send_msg = MSG_READ;
    if(rand_range(1, 10) <= WR_RATE_INT) {  /* Write Memory */
        send_msg = MSG_WRITE;
    }

    /*********************************************************/
    /**                 Critical Section start              **/
    if((sem_wait(sem_p) != 0) && (errno != EINTR)) {  
        munmap(mem_p, size);
        shm_unlink("/shmemory");
        error_and_Exit("sem_wait() failed, err = ", errno);
    }

    mem_p[process_cnt] = send_msg;
    fprintf(stdout, "Process %d sent MSG_%d in mem_address: %p\n", process_cnt, send_msg, &mem_p[process_cnt]); // TODO - Remove
    fflush(stdout);

    if((sem_post(sem_p) != 0) && (errno != EINTR)) {
        munmap(mem_p, size);
        shm_unlink("/shmemory");
        error_and_Exit("sem_post() failed, err = ", errno);
    }
    /**                 Critical Section end                **/
    /*********************************************************/
}

static void main_loop()
{
    int shmem_fd = shm_open("/shmemory", O_RDWR, 0);
    if(shmem_fd < 0) {
        error_and_Exit("shm_open() failed, err = ", errno);
    }

    struct stat mem_stat;
    fstat(shmem_fd, &mem_stat);

    int *child_memory_p = (int *)mmap(NULL, mem_stat.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, shmem_fd, 0);
    if(child_memory_p == MAP_FAILED) {
        shm_unlink("/shmemory");
        error_and_Exit("mmap() failed, err = ", errno);
    }

    close(shmem_fd);    /* No longer needed */

    bool first_run = true;
    bool ack = false;
    const struct timespec ns_wait = {.tv_sec = 0, .tv_nsec = INTER_MEM_ACCS_T};

    while(child_memory_p[0] != MSG_EXIT) {
        if( !first_run ) {                  /* Not the first run, check for ack */
            ack = is_ack_received(child_memory_p, mem_stat.st_size);
        }

        nanosleep(&ns_wait, NULL);

        if( !first_run && !ack ) {  /* No ack received for latest call, nothing to be done */
            continue;
        }

        invoke_memory_access(child_memory_p, mem_stat.st_size);

        if(first_run) {             /* First run is over.. */
            first_run = false;
        }
    }

    fprintf(stdout, "PROCCESS %d EXIT!\n", process_cnt); // TODO Remove this
    fflush(stdout);

    munmap(child_memory_p, mem_stat.st_size);
    shm_unlink("/shmemory");

    child_memory_p = NULL;

    _Exit(EXIT_SUCCESS);
}

status_t processes_init()
{
    pid_t pid;
    process_cnt = 1;    /* Will be used for child processes to determine their order creation */
    int i;
    for(i = 0; i < CHILD_PROC_NUM; ++i) {
        pid = fork();

        if(ERR == pid) {
            error("fork() failed, err = ", errno);
            return FAILURE;
        } else if(pid != 0) {           /* Parent process */
            child_pids[i] = pid;
            process_cnt++;
        } else {                        /* Child process */
            block_all_signals();        /* Only main process responsible for indicate exit to its child*/
            main_loop();
        }
    }

    return SUCCESS;
}

void processes_deinit(int **mem_p)
{
    (*mem_p)[0] = MSG_EXIT;
    fprintf(stdout, "EXIT wrriten to address %p\n", *mem_p);

    /* Wait for all child processes to terminate */
    int i;
    write(STDOUT_FILENO, "Waiting for children to exit\n", 29); // TODO Remove this
    for(i = 0; i < CHILD_PROC_NUM; ++i) {
        if((ERR == waitpid(child_pids[i], NULL, 0)) && (ECHILD != errno)) {
            error("waitpid() failed, err = ", errno);
        }
    }

    fprintf(stdout, "PROCCESS DEINIT DONE!\n"); // TODO Remove this
    fflush(stdout);
}

Can someone please explain what am i'm doing wrong?

I've tried:

  1. passing sem_t pointer from main process as *sem_t **semaphore_p* to processes_init method, and have every child use the real pointer to the semaphore (Even if child will copy the pointer upon COW mecanishm, he'll still use the actual addres Thanks

  2. Declaring the sem_t pointer as extern in the process handler

  3. Opening each child process (in the main_loop method) a "copy" of the named semaphore using sem_open("/shsemaphore", O_RDWR)

None of these worked. i'm going crazy over here guys, please help me :(

4

2 回答 2

2

在打开信号量后的函数init_procedure()中,有一个函数调用

sem_unlink(shared_sem_name);

它总是被执行,在信号量创建后立即销毁。您希望在错误处理块中使用这一行。

于 2016-06-29T14:35:32.817 回答
1

找到的解决方案

在主文件中创建命名信号量时,权限设置为 0644,这赋予进程组只读权限。

更改为以下内容后:

sem_open(semaphore_name, O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP, 1)

问题似乎解决了!显然,如果 sem_wait 在没有对信号量具有读\写权限的情况下被调用(这发生在子进程中 - 他们只使用具有 READ 权限的信号量)行为是未定义的

于 2016-07-02T16:44:42.183 回答