基于我的 Netty 的应用程序
- 每秒从单个 TCP 连接接收数十万条消息
- 在几个入站处理程序中处理这些消息
- 将处理结果发送到下游某处
目前,所有这些都在线程上运行,因为它在单个 TCP 连接上。我想知道如何并行化 2。困难在于消息不能随意并行处理,因为消息有部分顺序。你可以把它想象成有一个key(message)
函数,这个函数返回相同结果的所有消息都需要按顺序处理,但如果结果不同,它们可能会并行运行。所以我想有一个从消息到线程的映射,比如hash(key(message)) % threadCount
.
想象一下这个管道:
pipeline.addLast(deframer);
pipeline.addLast(new IdleStateHandler(...));
pipeline.addLast(decoder);
pipeline.addLast(bizLogicHandler1);
pipeline.addLast(bizLogicHandler2);
在解码器中,我能够计算 的结果key(message)
,所以我想并行化解码器下游的所有内容。据记载,为了使用多个线程,我可以做
static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
...
pipeline.addLast(group, "bizLogicHandler1", bizLogicHandler1);
pipeline.addLast("bizLogicHandler2", bizLogicHandler2);
我猜这意味着bizLogicHandler1和它下面的所有东西(在上面的例子中是bizLogicHandler2)将能够并行运行?(或者我是否也必须group
为 bizLogicHandler2 指定?)
然而,正如文档所解释的那样,上述内容仍将完全串行运行,提供UnorderedThreadPoolEventExecutor
作为最大化并行性的替代方案,代价是完全摆脱排序,这在我的情况下不起作用。
查看接口EventExecutorGroup
和EventExecutor
,我看不出如何传达哪些消息可以并行处理,哪些消息必须按顺序处理。
任何想法?