-1

我的 Rust 代码用于RwLock在多个线程中处理数据。每个线程在使用锁时填充一个公共存储read(例如填充一个数据库,但我的情况有点不同)。最终,公共存储空间将被填满。我需要暂停所有处理,重新分配存储空间(例如从云中分配更多磁盘空间),然后继续。

// psudo-code
fn thread_worker(tasks) {
  let lock = rwlock.read().unwrap();
  for task in tasks {
    // please ignore out_of_space check race condition
    // it's here just to explain the question 
    if out_of_space {
      drop(lock);
      let write_lock = rwlock.write().unwrap();
      // get more storage
      drop(write_lock);
      lock = rwlock.read().unwrap();
    }
    // handle task WITHOUT getting a read lock on every pass
    // getting a lock is far costlier than actual task processing
  }
  drop(lock);
}

由于所有线程将在大约同一时间快速耗尽空间,因此它们都可以释放read锁,并获得一个write. 第一个获得write锁的线程将解决存储问题。但是现在我有一个可能的临时死锁情况——所有其他线程也在等待write锁,即使它们不再需要它。

所以这种情况有可能发生:给定 3 个线程都在等待write,第一个线程得到write,修复问题,释放write,并等待read。第二个进入write但很快跳过,因为问题已经修复并发布。第 1 个和第 2 个线程将进入read并继续处理,但第 3 个线程仍在等待write并将等待很长时间,直到前两个线程空间不足或完成所有工作。

鉴于所有线程正在等待write,我如何在第一个线程完成工作后,但在它释放write它已经获得的锁之前“中止”所有其他线程的等待?

我看到有一个poisoning功能,但它是为恐慌而设计的,并且在生产中重用它似乎是错误的并且很难正确完成。Rust 开发人员也在考虑删除它。

PS 每次循环迭代本质上是一个data[index] = value赋值,这里data是一个被许多线程共享的巨大的 memmap。所有线程都在index缓慢增长,因此最终所有线程都用完了 memmap 大小。发生这种情况时,memmap 被销毁,文件重新分配,并创建一个新的 memmap。因此,不可能在每次循环迭代时都获得读锁。

4

2 回答 2

1

首先请注意,根据您的目标平台,您的代码可能已经按原样运行。例如对于 Rust 线​​程依赖于 libpthread 的平台(例如 Linux),以及写锁优先于读锁的任何平台。

如果您想要一个跨平台的解决方案,您需要做的就是切换到parking-lot提供RwLock. 特别是,这意味着当有写入者等待获取锁时,即使锁已解锁,尝试获取锁的读取器也会阻塞

以下是 fair 的事件顺序RwLock

  • 最初所有线程都在运行并持有读锁。
  • 第一个用完空间的线程释放读锁并请求写锁。由于其他线程仍然持有读锁,所以第一个线程被阻塞。
  • 一个接一个,其他线程耗尽空间,释放读锁,请求写锁。
  • 一旦所有线程都释放了读锁,其中一个线程就获得了写锁。
  • 获得写锁的线程分配更多内存,释放写锁并请求读锁。由于等待写锁的其他线程优先,读请求阻塞。
  • 一个接一个地,其他线程获取写锁,注意到有可用内存,释放写锁并请求读锁。
  • 一旦所有线程都获得并释放了写锁,它们都将获得读锁并继续。

请注意,如果其他线程能够在释放读锁并请求写锁所需的时间内继续进行,则存在一个理论上的竞争条件,一旦分配了内存,可能会使其中一个线程阻塞,例如:

drop(lock);
// Another thread gets the write lock, allocates memory and releases the lock
// All the other threads acquire and release the write lock
// At least one other thread acquires the read lock
let write_lock = rwlock.write().unwrap();

考虑到单独分配内存所花费的时间,这种情况在现实生活中发生的可能性非常小,以至于可以忽略不计。

于 2021-11-24T08:06:24.833 回答
0

查看您的代码,您可以使用额外的互斥锁:

// pseudo-code
fn thread_worker(tasks) {
  for task in tasks {
    if out_of_space {
      drop(lock);
      {
        let mutex = mutex.lock();      
        if out_of_space { // potentially updated by another worker
          let write_lock = rwlock.write();
          // get more storage
          ...
          // drop(write_lock); is automatic here
        }
        // drop(mutex); is automatic here
      }
      lock = rwlock.read();
    }

    // copy memory for the task
    ...
  }
}

此处使用的模式称为双重检查锁定

这解决了您在重新分配后下一方不会永远等待 rwlock.write 的问题,因为它不会通过互斥锁关键部分内的 out_of_space 检查。

然而,这个解决方案仍然存在一个问题,即第一个失败的 worker 将等待所有其他 worker 遇到 out_of_space 条件,然后才能继续重新分配,因为它需要等待所有read() 锁被删除。

我建议重构此代码以将重新分配逻辑移出此方法。

如果可能,也尽量避免显式丢弃,以支持或RAII,这通常是一个很好的做法。

于 2021-11-21T20:47:46.873 回答