5

背景

在回答一个问题时,我构建并上传了一个 bounded-tchan(我不适合上传jnb 的版本)。如果名称不够,则有界 tchan (BTChan) 是具有最大容量的 STM 通道(如果通道处于容量状态,则写入块)。

最近,我收到了一个请求,要求添加一个像常规 TChan一样的 dup 功能。问题就这样开始了。

BTChan 的外观

下面是 BTChan 的简化(实际上是非功能性的)视图。

data BTChan a = BTChan
    { max :: Int
    , count :: TVar Int
    , channel :: TVar [(Int, a)]
    , nrDups  :: TVar Int
    }

每次您写入频道时,您都会nrDups在元组中包含重复次数 ( ) - 这是一个“单个元素计数器”,表示有多少读者获得了该元素。

每个阅读器都会减少它读取的元素的计数器,然后将它的读取指针移动到列表中的下一个元素。如果阅读器将计数器减至零,则 的值count会减少以正确反映通道上的可用容量。

要明确所需的语义:通道容量表示通道中排队的最大元素数。任何给定的元素都会排队,直到每个 dup 的读取器接收到该元素。任何元素都不应为 GCed dup 排队(这是主要问题)。

例如,让容量为 2 的通道 (c1, c2, c3) 有 3 个 dup,其中 2 个项目被写入通道,然后所有项目都从c1和中读出c2。通道仍然是满的(0 剩余容量),因为c3没有消耗它的副本。在任何时候,如果所有对的引用c3都被删除(c3GCed 也是如此),那么容量应该被释放(在这种情况下恢复为 2)。

这是问题所在: 假设我有以下代码

c <- newBTChan 1
_ <- dupBTChan c  -- This represents what would probably be a pathological bug or terminated reader
writeBTChan c "hello"
_ <- readBTChan c

导致 BTChan 看起来像:

BTChan 1 (TVar 0) (TVar []) (TVar 1)             -->   -- newBTChan
BTChan 1 (TVar 0) (TVar []) (TVar 2)             -->   -- dupBTChan
BTChan 1 (TVar 1) (TVar [(2, "hello")]) (TVar 2) -->   -- readBTChan c
BTChan 1 (TVar 1) (TVar [(1, "hello")]) (TVar 2)       -- OH NO!

注意最后的读取计数"hello"仍然是1?这意味着消息不会被视为消失(即使它会在实际实现中被 GC),并且我们count永远不会递减。因为通道处于容量状态(最多 1 个元素),所以写入器将始终阻塞。

我希望每次dupBTChan调用时都创建一个终结器。当收集到复制(或原始)通道时,该通道上剩余要读取的所有元素将减少每个元素的计数,nrDups变量也将减少。结果,未来的写入将具有正确count的(count不为 GCed 通道未读取的变量保留空间)。

解决方案 1 - 手动资源管理(我想避免的)

由于这个原因,JNB 的 bounded-tchan 实际上有手动资源管理。见cancelBTChan。我要让用户更难出错(并不是说手动管理在许多情况下不是正确的方法)。

解决方案 2 - 通过阻止 TVar 使用异常(GHC 无法按照我的意愿执行此操作)

编辑这个解决方案,解决方案 3 只是一个衍生产品,不起作用!由于错误 5055 (WONTFIX),GHC 编译器向两个阻塞线程发送异常,即使一个就足够了(理论上可以确定,但对于 GHC GC 不实用)。

如果获取 a 的所有方法BTChan都是 IO,我们可以forkIO使用一个线程来读取/重试给定的唯一的额外(虚拟)TVar 字段BTChan。当所有其他对 TVar 的引用都被删除时,新线程将捕获异常,因此它将知道何时递减nrDups单个元素计数器。这应该可行,但会强制我所有的用户使用 IO 来获取他们BTChan的 s:

data BTChan = BTChan { ... as before ..., dummyTV :: TVar () }

dupBTChan :: BTChan a -> IO (BTChan a)
dupBTChan c = do
       ... as before ...
       d <- newTVarIO ()
       let chan = BTChan ... d
       forkIO $ watchChan chan
       return chan

watchBTChan :: BTChan a -> IO ()
watchBTChan b = do
    catch (atomically (readTVar (dummyTV b) >> retry)) $ \e -> do
    case fromException e of
        BlockedIndefinitelyOnSTM -> atomically $ do -- the BTChan must have gotten collected
            ls <- readTVar (channel b)
            writeTVar (channel b) (map (\(a,b) -> (a-1,b)) ls)
            readTVar (nrDup b) >>= writeTVar (nrDup b) . (-1)
        _ -> watchBTChan b

编辑:是的,这是一个糟糕的 man 终结器,我没有任何特别的理由避免使用addFinalizer. 那将是相同的解决方案,仍然强制使用 IO afaict。

解决方案 3:比解决方案 2 更简洁的 API,但 GHC 仍然不支持

用户通过调用启动管理器线程initBTChanCollector,它将监视一组这些虚拟 TVar(来自解决方案 2)并进行所需的清理。unsafePerformIO基本上,它将 IO 推入另一个线程,该线程通过全局 ( ed)知道该做什么TVar。事情基本上像解决方案 2 一样工作,但 BTChan 的创建仍然可以是 STM。运行失败initBTChanCollector将导致进程运行时任务列表不断增长(空间泄漏)。

解决方案 4:永远不允许丢弃BTChans

这类似于忽略问题。如果用户从不丢弃重复项BTChan,那么问题就会消失。

解决方案 5 我看到了 ezyang 的回答(完全有效和赞赏),但真的很想保留当前的 ​​API 只是一个 'dup' 函数。

** 解决方案 6** 请告诉我有更好的选择。

编辑:我实现了解决方案 3(完全未经测试的 alpha 版本)并通过将全局本身设为 a 来处理潜在的空间泄漏BTChan- chan 的容量可能应该为 1,因此忘记运行会init很快出现,但这是一个很小的变化。这在 GHCi (7.0.3) 中有效,但这似乎是偶然的。GHC 向两个被阻塞的线程(读取 BTChan 和观察线程的有效线程)抛出异常,所以如果你在另一个线程丢弃它的引用时被阻止读取 BTChan,那么你就会死。

4

1 回答 1

5

这是另一种解决方案:要求对有界通道副本的所有访问都由一个函数括起来,该函数在退出时释放其资源(通过异常或正常情况下)。您可以使用带有 rank-2 runner 的 monad 来防止重复的通道泄漏。它仍然是手动的,但是类型系统使得做淘气的事情变得更加困难。

您真的不想依赖真正的 IO 终结器,因为 GHC 不保证何时可以运行终结器:据您所知,它可能会等到程序结束后再运行终结器,这意味着您陷入僵局直到那时。

于 2011-03-28T14:15:50.497 回答