12

我的印象是flock(2)是线程安全的,我最近在代码中遇到了一个案例,其中多个线程能够在同一个文件上获得锁,这些锁都与使用获得独占锁同步c api 群。进程 25554 是具有 20 个线程的多线程应用程序,当死锁发生时,锁定同一文件的线程数会有所不同。多线程应用程序testEvent是文件的写入者,而推送是文件的读取者。不幸的是,lsof它不打印 LWP 值,所以我找不到哪些是持有锁的线程。当下面提到的情况发生时,进程和线程都被困在 flock 调用上,如pstackor所示strace调用 pid 25569 和 25554。有关如何在 RHEL 4.x 中克服此问题的任何建议。

我想更新的一件事是,flock 并不总是行为不端,当消息的 tx 速率超过 2 mbps 时,我才会遇到与 flock 的死锁问题,低于该 tx 速率的所有内容都是文件。我保持num_threads= 20, size_of_msg= 1000bytes 不变,只是将每秒发送的消息数量从 10 条消息变为 100 条消息,即 20 * 1000 * 100 = 2 mbps,当我将消息数量增加到 150 时,然后群发发生。

我只是想问一下您对flockfile c api的看法。

 sudo lsof filename.txt
    COMMAND       PID     USER     FD       TYPE     DEVICE     SIZE   NODE       NAME
    push         25569    root     11u       REG      253.4      1079   49266853   filename.txt
    testEvent    25554    root     27uW      REG      253.4      1079   49266853   filename.txt
    testEvent    25554    root     28uW      REG      253.4      1079   49266853   filename.txt
    testEvent    25554    root     29uW      REG      253.4      1079   49266853   filename.txt
    testEvent    25554    root     30uW      REG      253.4      1079   49266853   filename.txt

将调用write_data_lib_funclib 函数的多线程测试程序。

void* sendMessage(void *arg)  {

int* numOfMessagesPerSecond = (int*) arg;
std::cout <<" Executing p thread id " << pthread_self() << std::endl;
 while(!terminateTest) {
   Record *er1 = Record::create();
   er1.setDate("some data");

   for(int i = 0 ; i <=*numOfMessagesPerSecond ; i++){
     ec = _write_data_lib_func(*er1);
     if( ec != SUCCESS) {
       std::cout << "write was not successful" << std::endl;

     }

   }
   delete er1;
   sleep(1);
 }

 return NULL;

上面的方法会在测试的main函数中的pthreads中调用。

for (i=0; i<_numThreads ; ++i) {
  rc = pthread_create(&threads[i], NULL, sendMessage, (void *)&_num_msgs);
  assert(0 == rc);

}

这是作者/阅读器源,由于专有原因,我不想只是剪切和粘贴,作者源将在一个进程中访问多个线程

int write_data_lib_func(Record * rec) {      
if(fd == -1 ) {  
    fd = open(fn,O_RDWR| O_CREAT | O_APPEND, 0666);
} 
if ( fd >= 0 ) {
   /* some code */ 

   if( flock(fd, LOCK_EX) < 0 ) {
     print "some error message";
   }
   else { 
    if( maxfilesize) {
      off_t len = lseek ( fd,0,SEEK_END);
      ...
      ... 
      ftruncate( fd,0);
      ...
      lseek(fd,0,SEEK_SET); 
   } /* end of max spool size */ 
   if( writev(fd,rec) < 0 ) {
     print "some error message" ; 
   }

   if(flock(fd,LOCK_UN) < 0 ) {
   print some error message; 
   } 

在读者方面是一个没有线程的守护进程。

int readData() {
    while(true) {
      if( fd == -1 ) {
         fd= open (filename,O_RDWR);
      }
      if( flock (fd, LOCK_EX) < 0 ) { 
        print "some error message"; 
        break; 
      } 
      if( n = read(fd,readBuf,readBufSize)) < 0 ) { 
        print "some error message" ;
        break;
      }  
      if( off < n ) { 
        if ( off <= 0 && n > 0 ) { 
          corrupt_file = true; 
        } 
        if ( lseek(fd, off-n, SEEK_CUR) < 0 ) { 
          print "some error message"; 
        } 
        if( corrupt_spool ) {  
          if (ftruncate(fd,0) < 0 ) { 
             print "some error message";
             break;
           }  
        }
      }
      if( flock(fd, LOCK_UN) < 0 ) 
       print some error message ;
      }  
   }     
}
4

2 回答 2

10

flock(2)被记录为“如果另一个进程持有不兼容的锁则阻塞”并且“flock() 创建的锁与打开的文件表条目相关联”,因此应该预期flock-ed 锁由多个线程同一进程的不交互。(flock文档没有提到线程)。

因此,解决方案对您来说应该很简单:将一个pthread_mutex_t与每个flock-able 文件描述符相关联,并保护flock对该互斥体的调用。如果您想要读写锁定,也可以使用pthread_rwlock_t 。

于 2012-02-27T09:26:52.973 回答
3

来自flock(2) 的Linux 手册页:

由flock() 创建的锁与打开的文件表条目相关联。这意味着重复的文件描述符(例如,由 fork(2) 或 dup(2) 创建)引用同一个锁,并且可以使用这些描述符中的任何一个来修改或释放该锁。此外,通过对这些重复描述符中的任何一个执行显式 LOCK_UN 操作或在所有此类描述符已关闭时释放锁。

此外,flock 锁不会“堆叠”,因此如果您尝试获取您已经持有的锁,flock 调用是一个 noop,它会立即返回而不会阻塞并且不会以任何方式更改锁状态。

由于进程中的线程共享文件描述符,因此您可以从不同的线程多次聚集文件,并且它不会阻塞,因为锁已经被持有。

同样来自羊群(2)的注释:

对于分叉进程和 dup(2),flock() 和 fcntl(2) 锁具有不同的语义。在使用 fcntl(2) 实现flock() 的系统上,flock() 的语义将与本手册页中描述的不同。

于 2012-02-28T06:14:00.727 回答