3

现在我正在开发一种以太网数据包处理库。基本思想是数据包有两种不同的来源:网络接口和 pcap 转储文件。数据包应按流分组,应过滤流,应以一种方式处理 UDP 流,以另一种方式处理 TCP,等等。我开发了没有管道的版本,但我发现现在有太多重复的代码,什么时候我正在尝试抽象然后我正在发明类似于管道的东西。所以我尝试切换到管道然后卡住了。

所以图片是这样的:

                                   [UDP processing]
[pcap source]   |                 /                \
                |---[flow map]-->*                  *->[dump to many files]
                |                 \                /  
[iface source]  |                  [TCP processing]

第一个问题是流程图。它应该累积流量,并且当流量中的数据包超过某个阈值时 - 将其传递给处理。

第二个问题是我想为 UDP 和 TCP 处理使用不同的管道,所以应该以某种方式拆分管道。

另一个问题是所有这些东西都应该是多线程的,所以生产者和消费者应该在不同的线程中。

那么,就管道而言,这张图片中的内容应该是什么?

来源就是来源,很清楚。但是流程图应该是什么?一个接收器,它产生进一步处理的源?许多流是巨大的,因此必须避免在进一步处理之前将所有数据包累积在内存中。

有任何想法吗?同样,很清楚如何在没有管道的情况下完成所有这些操作,因此问题是如何使用它们正确设计它。

UPD。

  data FlowFrame = FlowFrame { flowKey   :: !F.FlowKey
                             , flowFrame :: [Packet]
                             }

  data FlowState

  flowFrames :: MonadIO m => Conduit Packet m FlowFrame
  flowFrames = awaitForever $ \p -> do
    let (Right (l3, _)) = runGet F.readL3Headers (pktData p)
    let fk = F.flowKey l3
    yield (FlowFrame fk [p])

  sinkPrintFlow :: MonadIO m => Consumer FlowFrame m ()
  sinkPrintFlow = awaitForever $ liftIO.putStrLn.show.pPrint.flowKey

  isA :: F.Protocol -> FlowFrame -> Bool
  isA p frame =
    case ((flowKey frame)) of
      F.FlowKey p _ _ -> True
      _               -> False

  processUDP :: MonadIO m => Conduit FlowFrame m FlowFrame
  processUDP = CL.filter (isA F.PROTO_UDP)

  processTCP :: MonadIO m => Conduit FlowFrame m FlowFrame
  processTCP = CL.filter (isA F.PROTO_TCP)

  main = do 
    (file:_) <- getArgs
    input <- openOffline file
    sourcePcap input
      $$ flowFrames =$= void (sequenceConduits [processUDP, processTCP])
      $= sinkPrintFlow
    putStrLn "done"
4

2 回答 2

3

如果使用pipes,则可以使用(+++)Pipes.Extras 中的组合器并排运行两个管道。它有这种类型:

(+++)
    :: Monad m
    => Pipe a c m r
    -> Pipe b d m r
    -> Pipe (Either a b) (Either c d) m r

那么你的程序会变成:

producer >-> (udpPipe +++ tcpPipe) >-> consumer

每次您希望生产者将值转发给udpPipe您时,您将值包装在 a 中Left,并且每次您希望将值转发给tcpPipe您时,您将值包装在 a 中Right。然后下游consumer 可以对其输入进行模式匹配,以判断Pipe它来自哪个。 Left价值观来自udpPipeRight价值观来自tcpPipe

编辑:请注意,这不需要并发。 (+++)接受两个单线程管道并返回一个结合了它们的逻辑的新单线程管道。

于 2014-06-25T15:42:01.917 回答
2

您提到了几个不同的概念。让我依次回答:

  • 为了将两个不同的来源合并为一个来源,有多种选择。ZipSource是一个常见的抽象,但可能不是你要找的。由于您可能希望同时执行此操作,因此我建议将每个数据源馈送到共享通道(例如 a ),然后从该通道读取TChan单个数据。Source有关更多信息,请参见stm-conduit
  • Sink要从单个读取两个不同的 s Source,您可以使用ZipSink. 在您的情况下,这可能是正确的答案。在将源与相关的接收器融合之前,您可以将源过滤到仅 TCP 和 UDP 数据。
  • 通常管道的所有组件都在一个线程中运行(这就是我们使用协程的原因)。为了能够在单独的线程中运行您Source的 s 和s,您可以使用Data.Conduit.Async(也来自 stm-conduit)。Sink

在像您这样的情况下,更明确地说明您在表面下使用并发原语并让每个数据源或接收器直接与TChan. 但这会涉及一些更复杂的设计问题,我真的无法根据当前信息给出任何明确的答案。

(顺便说一句,流程图不错。)

于 2014-06-25T19:58:42.817 回答