现在我正在开发一种以太网数据包处理库。基本思想是数据包有两种不同的来源:网络接口和 pcap 转储文件。数据包应按流分组,应过滤流,应以一种方式处理 UDP 流,以另一种方式处理 TCP,等等。我开发了没有管道的版本,但我发现现在有太多重复的代码,什么时候我正在尝试抽象然后我正在发明类似于管道的东西。所以我尝试切换到管道然后卡住了。
所以图片是这样的:
[UDP processing]
[pcap source] | / \
|---[flow map]-->* *->[dump to many files]
| \ /
[iface source] | [TCP processing]
第一个问题是流程图。它应该累积流量,并且当流量中的数据包超过某个阈值时 - 将其传递给处理。
第二个问题是我想为 UDP 和 TCP 处理使用不同的管道,所以应该以某种方式拆分管道。
另一个问题是所有这些东西都应该是多线程的,所以生产者和消费者应该在不同的线程中。
那么,就管道而言,这张图片中的内容应该是什么?
来源就是来源,很清楚。但是流程图应该是什么?一个接收器,它产生进一步处理的源?许多流是巨大的,因此必须避免在进一步处理之前将所有数据包累积在内存中。
有任何想法吗?同样,很清楚如何在没有管道的情况下完成所有这些操作,因此问题是如何使用它们正确设计它。
UPD。
data FlowFrame = FlowFrame { flowKey :: !F.FlowKey
, flowFrame :: [Packet]
}
data FlowState
flowFrames :: MonadIO m => Conduit Packet m FlowFrame
flowFrames = awaitForever $ \p -> do
let (Right (l3, _)) = runGet F.readL3Headers (pktData p)
let fk = F.flowKey l3
yield (FlowFrame fk [p])
sinkPrintFlow :: MonadIO m => Consumer FlowFrame m ()
sinkPrintFlow = awaitForever $ liftIO.putStrLn.show.pPrint.flowKey
isA :: F.Protocol -> FlowFrame -> Bool
isA p frame =
case ((flowKey frame)) of
F.FlowKey p _ _ -> True
_ -> False
processUDP :: MonadIO m => Conduit FlowFrame m FlowFrame
processUDP = CL.filter (isA F.PROTO_UDP)
processTCP :: MonadIO m => Conduit FlowFrame m FlowFrame
processTCP = CL.filter (isA F.PROTO_TCP)
main = do
(file:_) <- getArgs
input <- openOffline file
sourcePcap input
$$ flowFrames =$= void (sequenceConduits [processUDP, processTCP])
$= sinkPrintFlow
putStrLn "done"