背景
在回答一个问题时,我构建并上传了一个 bounded-tchan(我不适合上传jnb 的版本)。如果名称不够,则有界 tchan (BTChan) 是具有最大容量的 STM 通道(如果通道处于容量状态,则写入块)。
最近,我收到了一个请求,要求添加一个像常规 TChan一样的 dup 功能。问题就这样开始了。
BTChan 的外观
下面是 BTChan 的简化(实际上是非功能性的)视图。
data BTChan a = BTChan
{ max :: Int
, count :: TVar Int
, channel :: TVar [(Int, a)]
, nrDups :: TVar Int
}
每次您写入频道时,您都会nrDups
在元组中包含重复次数 ( ) - 这是一个“单个元素计数器”,表示有多少读者获得了该元素。
每个阅读器都会减少它读取的元素的计数器,然后将它的读取指针移动到列表中的下一个元素。如果阅读器将计数器减至零,则 的值count
会减少以正确反映通道上的可用容量。
要明确所需的语义:通道容量表示通道中排队的最大元素数。任何给定的元素都会排队,直到每个 dup 的读取器接收到该元素。任何元素都不应为 GCed dup 排队(这是主要问题)。
例如,让容量为 2 的通道 (c1, c2, c3) 有 3 个 dup,其中 2 个项目被写入通道,然后所有项目都从c1
和中读出c2
。通道仍然是满的(0 剩余容量),因为c3
没有消耗它的副本。在任何时候,如果所有对的引用c3
都被删除(c3
GCed 也是如此),那么容量应该被释放(在这种情况下恢复为 2)。
这是问题所在: 假设我有以下代码
c <- newBTChan 1
_ <- dupBTChan c -- This represents what would probably be a pathological bug or terminated reader
writeBTChan c "hello"
_ <- readBTChan c
导致 BTChan 看起来像:
BTChan 1 (TVar 0) (TVar []) (TVar 1) --> -- newBTChan
BTChan 1 (TVar 0) (TVar []) (TVar 2) --> -- dupBTChan
BTChan 1 (TVar 1) (TVar [(2, "hello")]) (TVar 2) --> -- readBTChan c
BTChan 1 (TVar 1) (TVar [(1, "hello")]) (TVar 2) -- OH NO!
注意最后的读取计数"hello"
仍然是1
?这意味着消息不会被视为消失(即使它会在实际实现中被 GC),并且我们count
永远不会递减。因为通道处于容量状态(最多 1 个元素),所以写入器将始终阻塞。
我希望每次dupBTChan
调用时都创建一个终结器。当收集到复制(或原始)通道时,该通道上剩余要读取的所有元素将减少每个元素的计数,nrDups
变量也将减少。结果,未来的写入将具有正确count
的(count
不为 GCed 通道未读取的变量保留空间)。
解决方案 1 - 手动资源管理(我想避免的)
由于这个原因,JNB 的 bounded-tchan 实际上有手动资源管理。见cancelBTChan
。我要让用户更难出错(并不是说手动管理在许多情况下不是正确的方法)。
解决方案 2 - 通过阻止 TVar 使用异常(GHC 无法按照我的意愿执行此操作)
编辑这个解决方案,解决方案 3 只是一个衍生产品,不起作用!由于错误 5055 (WONTFIX),GHC 编译器向两个阻塞线程发送异常,即使一个就足够了(理论上可以确定,但对于 GHC GC 不实用)。
如果获取 a 的所有方法BTChan
都是 IO,我们可以forkIO
使用一个线程来读取/重试给定的唯一的额外(虚拟)TVar 字段BTChan
。当所有其他对 TVar 的引用都被删除时,新线程将捕获异常,因此它将知道何时递减nrDups
单个元素计数器。这应该可行,但会强制我所有的用户使用 IO 来获取他们BTChan
的 s:
data BTChan = BTChan { ... as before ..., dummyTV :: TVar () }
dupBTChan :: BTChan a -> IO (BTChan a)
dupBTChan c = do
... as before ...
d <- newTVarIO ()
let chan = BTChan ... d
forkIO $ watchChan chan
return chan
watchBTChan :: BTChan a -> IO ()
watchBTChan b = do
catch (atomically (readTVar (dummyTV b) >> retry)) $ \e -> do
case fromException e of
BlockedIndefinitelyOnSTM -> atomically $ do -- the BTChan must have gotten collected
ls <- readTVar (channel b)
writeTVar (channel b) (map (\(a,b) -> (a-1,b)) ls)
readTVar (nrDup b) >>= writeTVar (nrDup b) . (-1)
_ -> watchBTChan b
编辑:是的,这是一个糟糕的 man 终结器,我没有任何特别的理由避免使用addFinalizer
. 那将是相同的解决方案,仍然强制使用 IO afaict。
解决方案 3:比解决方案 2 更简洁的 API,但 GHC 仍然不支持
用户通过调用启动管理器线程initBTChanCollector
,它将监视一组这些虚拟 TVar(来自解决方案 2)并进行所需的清理。unsafePerformIO
基本上,它将 IO 推入另一个线程,该线程通过全局 ( ed)知道该做什么TVar
。事情基本上像解决方案 2 一样工作,但 BTChan 的创建仍然可以是 STM。运行失败initBTChanCollector
将导致进程运行时任务列表不断增长(空间泄漏)。
解决方案 4:永远不允许丢弃BTChan
s
这类似于忽略问题。如果用户从不丢弃重复项BTChan
,那么问题就会消失。
解决方案 5 我看到了 ezyang 的回答(完全有效和赞赏),但真的很想保留当前的 API 只是一个 'dup' 函数。
** 解决方案 6** 请告诉我有更好的选择。
编辑:我实现了解决方案 3(完全未经测试的 alpha 版本)并通过将全局本身设为 a 来处理潜在的空间泄漏BTChan
- chan 的容量可能应该为 1,因此忘记运行会init
很快出现,但这是一个很小的变化。这在 GHCi (7.0.3) 中有效,但这似乎是偶然的。GHC 向两个被阻塞的线程(读取 BTChan 和观察线程的有效线程)抛出异常,所以如果你在另一个线程丢弃它的引用时被阻止读取 BTChan,那么你就会死。