3

我正在关注本教程并查看源代码中的测试用例。我的代码SimplePool.hs在源代码中使用并创建了以下文件:(片段)

sampleTask :: (TimeInterval, String) -> Process String
sampleTask (t, s) = sleep t >> return s 

$(remotable ['sampleTask])

jobTest :: MVar (AsyncResult (Either String String)) -> Process ()
jobTest result = do
  pid <- startTestPool 1 -- start the pool of workers here only one worker
  job <- return $ ($(mkClosure 'sampleTask) (seconds 2, "foobar"))
  -- callAsync put job into pool
  p <- callAsync pid job 
  a <- wait p
  setResult result a
  where
    setResult :: MVar a -> a -> Process ()
    setResult mvar x = liftIO $ putMVar mvar x

startTestPool :: Int -> Process ProcessId
startTestPool s = spawnLocal $ do
  _ <- runPool s
  return ()

runPool :: Int -> Process (Either (InitResult (Pool String)) TerminateReason)
runPool s =
  -- setting a to String
  let s' = poolServer :: ProcessDefinition (Pool String)
  in simplePool s s'

myRemoteTable :: RemoteTable
myRemoteTable = Control.Distributed.Process.Platform.__remoteTable initRemoteTable

main :: IO ()
main = do
  Right (transport, _) <- createTransportExposeInternals
                                    "127.0.0.1" "9901" defaultTCPParameters
  localNode       <- newLocalNode transport myRemoteTable
  result          <- newEmptyMVar
  pid             <- forkProcess localNode $ jobTest result
  ans             <- takeMVar result
  putStrLn $ show pid
  putStrLn $ show ans

运行后出现此错误:

AsyncFailed (DiedException "exit-from=pid://127.0.0.1:9901:0:6")

如果我错了,请纠正我,我认为作业没有正确运行,一定是从属进程有问题。p <- callAsync pid job 我认为这行代码是将任务传递给从属进程执行的地方。我查看了以找到callAsync. 关键行callAsyncUsingsendTo sid (CallMessage msg (Pid wpid))函数将任务传递给 poolServer 的位置。

acceptTask行中的 SimplePool.hsasyncHandle <- async proc是我认为他们生成一个新进程来执行任务的地方。所以我认为也许异步进程没有完成运行导致调用者过早终止?或者可能是该过程没有正确产生?关于调试这个的最佳方法的任何想法?另外,有人可以指出我正确的方向,以找出如何使 poolSever 跨越不同的节点/不同的计算机(使用 Control.Distributed.Process.Platform.Async.AsyncChan?)?

4

2 回答 2

2

我稍微修改了您的代码,并且此代码段包含导入,因此可以编译。确保您使用的是最新的SimplePool 模块,因为您的代码使用simplePool的是我找不到的,并且您的使用runPool不明确。

{-# LANGUAGE TemplateHaskell #-}

import Control.Concurrent.MVar
import Control.Exception (SomeException)
import Control.Distributed.Process hiding (call)
import Control.Distributed.Process.Closure
import Control.Distributed.Process.Node
import Control.Distributed.Process.Platform hiding (__remoteTable)
import Control.Distributed.Process.Platform.Async
import Control.Distributed.Process.Platform.ManagedProcess
import Control.Distributed.Process.Platform.Test
import Control.Distributed.Process.Platform.Time
import Control.Distributed.Process.Platform.Timer
import Control.Distributed.Process.Serializable()

import Network.Transport
import Network.Transport.TCP

import Data.Binary
import Data.Typeable (Typeable)

import SimplePool hiding (runPool)
import qualified SimplePool (runPool)

sampleTask :: (TimeInterval, String) -> Process String
sampleTask (t, s) = sleep t >> return s

$(remotable ['sampleTask])

jobTest :: MVar (AsyncResult (Either String String)) -> Process ()
jobTest result = do
  pid <- startTestPool 1 -- start the pool of workers here only one worker
  let job = $(mkClosure 'sampleTask) (seconds 2, "foobar")
  -- callAsync put job into pool
  p <- callAsync pid job
  a <- wait p
  setResult result a
  where
    setResult :: MVar a -> a -> Process ()
    setResult mvar x = liftIO $ putMVar mvar x

startTestPool :: Int -> Process ProcessId
startTestPool s = spawnLocal $ do
  _ <- runPool s
  return ()

runPool :: Int -> Process (Either (InitResult (Pool String)) TerminateReason)
runPool = SimplePool.runPool

myRemoteTable :: RemoteTable
myRemoteTable = Main.__remoteTable initRemoteTable

main :: IO ()
main = do

  Right (transport, _) <- createTransportExposeInternals
                                "127.0.0.1" "9901" defaultTCPParameters
  localNode       <- newLocalNode transport myRemoteTable
  result          <- newEmptyMVar
  pid             <- forkProcess localNode $ jobTest result
  ans             <- takeMVar result
  print pid >> print ans

运行此可编译代码:

$ ./Example 
pid://127.0.0.1:9901:0:3
AsyncDone (Right "foobar")
于 2013-05-03T16:50:26.343 回答
0

请注意,分布式进程平台测试套件中的 SimplePool 示例模块已升级为库的成熟组件。它在最新(开发)分支上的新位置是https://github.com/haskell-distributed/distributed-process-platform/blob/development/src/Control/Distributed/Process/Platform/Task/Queue/BlockingQueue.hs .

某些名称/类型已更改,因此您可能需要更新代码才能继续使用它。

于 2013-12-31T13:43:03.093 回答