0

我正在处理一个 syslog 日志文件,每一行作为一个单独的 syslog 条目,并使用 Attoparsec 解析器解析该条目。所以我正在使用

fileToBS :: IO Handle -> C.Source (ResourceT IO) BS.ByteString
fileToBS handleMaker = source C.$= bsSplitterConduit
  where source = CB.sourceIOHandle handleMaker
        bsSplitterConduit = CB.lines

生成系统日志条目流。我在用

parseToLogData:: C.Conduit BS.ByteString (ResourceT IO) (Either CATT.ParseError (CATT.PositionRange, LogData))
parseToLogData = CATT.conduitParserEither syslogParser

将这些字节串转换为系统日志值。Syslog 值是从此解析器生成的(使用我自己的一些类型同义词):

syslogParser :: Parser (Priority, Maybe UTCTime, IPAddress, BS.ByteString)
syslogParser = do
  pri <- priority <?> "priority parse error"
  mbDate <- date <?> "date parse error"
  space
  srcAddr <- ip
  space
  msg <- ATT.takeByteString
  return LogData{pri = pri, timestamp = mbDate, source = srcAddr, message = "msg"}

priority :: Parser Priority
priority = do
  string "<"
  digitsString <- takeWhile1 digit
  string ">"
  return (RawPriority digitsString)

date :: Parser (Maybe UTCTime)
date = do
  rawDate <- ATT.take 15
  let stringDate = BS.unpack rawDate
  let parsedDate = parseTime defaultTimeLocale syslogDateFormat stringDate
  return parsedDate

ip :: Parser IPAddress
ip = do
  oct0 <- takeWhile1 digit
  period
  oct1 <- takeWhile1 digit
  period
  oct2 <- takeWhile1 digit
  period
  oct3 <- takeWhile1 digit
  return (oct0, oct1, oct2, oct3)
--ip = takeWhile1 (\x -> digit x || x == 46)

space = string " "
colon = string ":"
period = string "."

digit test = (test >= 48 && test <= 57)
octet = digit

问题是占用系统日志条目 ( msg <- ATT.takeByteString) 的所有其余部分的行。这个函数不能很好地处理流,因为如果使用可恢复的解析器(这是管道的 attoparsec 库使用的),它需要一个终止信号。

我试图产生空字节串来修复此行为,但它没有按预期工作(请参阅https://hackage.haskell.org/package/attoparsec-0.12.1.2/docs/Data-Attoparsec-ByteString.html上的增量输入)。它将整个 syslog 输入文件消耗为一个解析值。这是一个 80MB 的测试文件,因此在初始字段提取之后,它将所有后续 syslog 消息放入 syslog 值的 message 字段中。

这是我尝试发出“原子消息”行为的终结器管道。我不确定为什么它不起作用。

terminator :: C.Conduit BS.ByteString (ResourceT IO) BS.ByteString
terminator = C.awaitForever yieldAndAddTerminator
  where
    yieldAndAddTerminator bs = do
      C.yield bs
      C.yield terminator
    terminator = ""

如何将 UDP 消息视为管道世界中的原子数据?

可以在此处找到此代码库的副本:https ://github.com/tureus/safe-forwarder 。

4

1 回答 1

1

您可能希望将您parseToLogData的函数与阻止它使用新行(ASCII 代码 10)的函数融合在一起。使用管道组合器术语,例如:

takeWhileCE (/= 10) =$= parseToLogData
dropWhileCE (/= 10) >> dropCE 1 -- flush the rest of it

您可能还想研究line组合器功能。

于 2015-01-19T13:46:37.870 回答