我在使用 spring-rabbit-1.3.9.RELEASE 库对 Rabbitmq 3.3.5 进行 POC 时观察到奇怪的行为
当我启动一个生产线程时,事情运行顺利。但是,如果同时启动超过 1 个线程,则只有一个线程完成,所有其他线程都会无限期地阻塞,即使队列变空也是如此。
当从rabbitmqctl list_connections
. 应该注意的是,当生产者阻塞时,或者在整个运行期间的任何其他时间,都不会发出警报。
我还观察到,如果我在每次发送后休眠 1 毫秒,问题就会消失。
所以,我有这些问题
- rabbitmq 不支持并发生产者,高速率发布吗?
- 即使连接确实被阻止,为什么它不显示在 rabbitmqctl list_connections 中?
- 为什么他们无限期地阻塞而不恢复乳清队列变空?
代码
public static void main(String[] argv) throws java.io.IOException, InterruptedException {
init();
PocConfig config = new PocConfig();
int threadCount = config.getThreadCount();
final int eventsPerThread = config.getEvents() / threadCount;
final long sleep = config.getSleep();
System.out.println("Start producer with configuration [threadCount=" + threadCount + ", events=" + eventsPerThread + ", sleep="
+ sleep + "]");
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
executorService.submit(new Runnable() {
public void run() {
produce(eventsPerThread, sleep, threadId);
}
});
}
waitAndTearDown(executorService);
}
private static void produce(int events, long sleep, int threadId) {
long start = System.currentTimeMillis();
for (int index = 1; index <= events; index++) {
try {
byte[] message = messageFactory.createTestMessage(index);
amqpTemplate.convertAndSend(QUEUE_NAME, message);
if (sleep > 0) {
Thread.sleep(sleep);
}
} catch (Exception e) {
LOG.error("Error", e);
}
}
long time = System.currentTimeMillis() - start;
System.out.println("Producer:" + threadId + " finished, events: " + events + ", Time(s): " + time / 1000 + ", tps: " + (events * 1000) / time);
}
弹簧配置
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<property name="addresses" value="${addresses}" />
<property name="username" value="${user}" />
<property name="password" value="${passwd}" />
<property name="cacheMode" value="CONNECTION" />
<property name="connectionCacheSize" value="${threads}" />
<property name="channelCacheSize" value="10" />
</bean>
<rabbit:template id="template" connection-factory="connectionFactory"
exchange="testExchange" routing-key="testQueue"/>