0

我有一个相对简单的 Haskell 守护进程,它blpop从 Redis 获取内容并将它们写入一个线程中的通道,然后从通道中读取它们并将它们打印到另一个线程中的标准输出。

在运行它时,它开始很好,然后过了一段时间,我得到以下信息stderr

hogstash: getAddrInfo: does not exist (nodename nor servname provided, or not known)

并在我的system.log(OSX)中查看以下内容:

hogstash[11281]: dnssd_clientstub deliver_request: socketpair failed 24 (Too many open files)

我从中推断出我正在以某种方式进行一些无界线程 DNS 查找,但我不明白如何进行,而且我对 Haskell 的熟悉程度不足以真正了解如何调试它。

主程序如下:

import Hogstash.Inputs.Redis
import Hogstash.Event
import Hogstash.Outputs.Stdout

import Control.Concurrent
import Control.Concurrent.BoundedChan
import Control.Monad

main = forever $ do
           channel <- newBoundedChan 10
           forkIO $ do
               connection <- tmpHaxx
               forever $ getEvent connection "logstash:beaver" channel
           forkIO $ forever $ stdout channel

stdout很简单:

module Hogstash.Outputs.Stdout where

import Control.Concurrent.BoundedChan as BC
import Hogstash.Event

stdout :: BoundedChan Event -> IO ()
stdout channel = do 
                     event <- readChan channel
                     putStrLn $ show event

并且getEvent是:

module Hogstash.Inputs.Redis where

import Database.Redis

import Hogstash.Event

import Control.Concurrent
import Control.Concurrent.BoundedChan as BC
import qualified Data.ByteString.Char8 as BSC


eventFromByteString :: BSC.ByteString -> Event
eventFromByteString _ = Event

listListen key = blpop [key] 0

tmpHaxx = connect defaultConnectInfo -- FIXME Remove this

getEvent :: Connection -> String -> BoundedChan Event -> IO ()

getEvent a b = getEvent' a (BSC.pack b)

getEvent' ci key channel = do
                                fnar <- pullEvent ci key
                                case fnar of
                                    Just e -> BC.writeChan channel e
                                    Nothing -> return ()

pullEvent :: Connection -> BSC.ByteString -> IO (Maybe Event)
pullEvent connection key = do
                                    event_data <- runRedis connection $ listListen key
                                    return (case event_data of
                                        Left a -> Nothing
                                        Right a -> extractEvent a)

extractEvent :: Maybe (a, BSC.ByteString) -> Maybe Event
extractEvent = fmap (eventFromByteString . snd)

谢谢

4

1 回答 1

2

非常感谢Kaini来自#haskellFreenode 的:

main正在forever愚蠢地尝试无限期地阻塞主线程。这当然意味着永远产生 Redis 和 stdout 线程,从而达到文件打开限制。

重写main

main = do
           channel <- newBoundedChan 10
           forkIO $ do
               connection <- tmpHaxx
               forever $ getEvent connection "logstash:beaver" channel
           forkIO $ forever $ stdout channel
           forever $ threadDelay 1000 -- Block forever

工作得更好!

于 2013-09-12T13:31:14.777 回答