0

我希望我的应用程序对从数千个不同客户端发送的 UDP 事件做出反应。每个客户端每 5-10 秒发送 1-10 个 UDP 数据包。每个数据包都将并且应该非常快速地处理(主要在内存和小型计算中,通过 redis 帮助,只有偶尔的 DB 调用)。不会有数据返回给调用者。

我在 Spring 中实现了 Reactor,就像他们在wiki上描述的那样。然后我实现了 UDP 入站通道,就像他们的 Spring Integration 文档中描述的那样。这是配置:

<int-ip:udp-inbound-channel-adapter id="receiverChannel"
                                    channel="stringConvert"
                                    port="9000"
                                    multicast="false"
                                    check-length="false"
                                    pool-size="10"
                                    lookup-host="false"
        />

<int:transformer id="convertChannel"
                 input-channel="stringConvert"
                 output-channel="toProcess"
                 ref="transformer"
                 method="transform"

        />

<int:service-activator input-channel="toProcess"
                       ref="accumulator"
                       method="accumulate"/>

<bean id="accumulator" class="hello.UDPAccumulator" />
<bean id="transformer" class="hello.UDPTransformer" />

然后在 UDPAccumulator 中我将该消息发布到反应器:

@Service
public class UDPAccumulator {

@Autowired
ReactorProducer producer;

public void accumulate(String quote) {
    producer.fireEvent(quote);

}

}

这是“正确”的方法吗,关于我想要高输出?int-ip:udp-inbound-channel-adapter的内部工作原理是什么?它在将消息传递到反应器之前会成为瓶颈吗?我看到反应器有一些与 TCP 相关的类和支持,但没有 UDP。任何关于如何以最佳方式做到这一点的建议表示赞赏!

奖金问题。如果消息到达的速度比发送到反应器的速度快怎么办?redis 消息存储(文章底部)会有帮助吗?如果我在反应器中处理这些数据包的方法很慢怎么办?

4

2 回答 2

3

Since we don't have direct UDP support in Reactor yet, your abstractions to publish events into Reactor are very sensible. But you do note in your "bonus question" that there are issues with publisher/consumer throughput that have to be managed in some domain-specific way; there's no silver bullet there.

In your use-case, I'd actually be tempted to say the Processor [1] might be a better fit. It provides much higher overall throughput for data processing because it circumvents the dynamic Selector-based dispatching that happens in a plain Reactor. Unless you're dispatching the incoming events to different handlers based on some topic criteria, I'd suggest you look at that instead. With higher throughput, you'll have to worry a little less about Consumers keeping up (unless your Consumer is doing something really slow, which nothing can automagically speed up).

But if you really, really need to manage the backlog, I'd suggest decoupling your producers and consumers via a Queue. Reactor has a PersistentQueue [2] abstraction you can use to publish objects into and persist to disk using JavaChronicle [3], which can then be drained into a Consumer using a Poller (javadoc is coming on Poller sometime this week as we get ready for 1.0...it was previously called Pipe [4]).

于 2013-11-04T15:00:09.867 回答
1

我无法与 Reactor 交谈,但 UDP 适配器有一个专用线程,可以读取原始数据包并将它们交给TaskExecutor. 它会尽快执行此操作,以便可以重新读取下一个数据包。

默认TaskExecutor是固定线程池。

Reactor 有一个DispatcherTaskExecutor可以注入适配器的。

相同的任务执行器用于主阅读器线程和切换。

于 2013-11-04T14:22:34.417 回答