1

我在使用 spring-rabbit-1.3.9.RELEASE 库对 Rabbitmq 3.3.5 进行 POC 时观察到奇怪的行为

当我启动一个生产线程时,事情运行顺利。但是,如果同时启动超过 1 个线程,则只有一个线程完成,所有其他线程都会无限期地阻塞,即使队列变空也是如此。

当从rabbitmqctl list_connections. 应该注意的是,当生产者阻塞时,或者在整个运行期间的任何其他时间,都不会发出警报。

我还观察到,如果我在每次发送后休眠 1 毫秒,问题就会消失。

所以,我有这些问题

  1. rabbitmq 不支持并发生产者,高速率发布吗?
  2. 即使连接确实被阻止,为什么它不显示在 rabbitmqctl list_connections 中?
  3. 为什么他们无限期地阻塞而不恢复乳清队列变空?

代码

    public static void main(String[] argv) throws java.io.IOException, InterruptedException {
        init();
        PocConfig config = new PocConfig();
        int threadCount = config.getThreadCount();
        final int eventsPerThread = config.getEvents() / threadCount;
        final long sleep = config.getSleep();

        System.out.println("Start producer with configuration [threadCount=" + threadCount + ", events=" + eventsPerThread + ", sleep="
            + sleep + "]");

        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
        for (int i = 0; i < threadCount; i++) {
            final int threadId = i;
            executorService.submit(new Runnable() {
                public void run() {
                    produce(eventsPerThread, sleep, threadId);
                }
            });
        }
        waitAndTearDown(executorService);
    }

    private static void produce(int events, long sleep, int threadId)     {
        long start = System.currentTimeMillis();
        for (int index = 1; index <= events; index++) {
            try {
                byte[] message = messageFactory.createTestMessage(index);
                amqpTemplate.convertAndSend(QUEUE_NAME, message);
                if (sleep > 0) {
                    Thread.sleep(sleep);
                }
            } catch (Exception e) {
                LOG.error("Error", e);
            }
        }
        long time = System.currentTimeMillis() - start;
        System.out.println("Producer:" + threadId + " finished, events: " + events + ", Time(s): " + time / 1000 + ", tps: " + (events * 1000) / time);
    }

弹簧配置

<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <property name="addresses" value="${addresses}" />
    <property name="username" value="${user}" />
    <property name="password" value="${passwd}" />
    <property name="cacheMode" value="CONNECTION" />
    <property name="connectionCacheSize" value="${threads}" />
    <property name="channelCacheSize" value="10" />
</bean>

<rabbit:template id="template" connection-factory="connectionFactory"
    exchange="testExchange" routing-key="testQueue"/>
4

1 回答 1

2

我想不出什么会阻止,所以我只是运行了你的测试;并且没有问题:

Start producer with configuration [threadCount=5, events=10, sleep=0]
Producer:2 finished, events: 1000, Time(s): 0, tps: 4405
Producer:3 finished, events: 1000, Time(s): 0, tps: 4132
Producer:1 finished, events: 1000, Time(s): 0, tps: 4048
Producer:0 finished, events: 1000, Time(s): 0, tps: 3968
Producer:4 finished, events: 1000, Time(s): 0, tps: 3952

是什么让你觉得他们被封锁了?

进行线程转储(例如使用 jstack)以查看线程在做什么。

编辑

我仍然无法重现它,即使有 1M 条消息并且CacheMode CONNECTION......

Start producer with configuration [threadCount=5, events=200000, sleep=0]
Producer:0 finished, events: 200000, Time(s): 50, tps: 3959
Producer:3 finished, events: 200000, Time(s): 53, tps: 3746
Producer:1 finished, events: 200000, Time(s): 55, tps: 3635
Producer:2 finished, events: 200000, Time(s): 55, tps: 3634
Producer:4 finished, events: 200000, Time(s): 55, tps: 3629

确实看到队列进入了flow模式(通过管理 UI),但一切都恢复得很好。

我确实看到您的工作人员处于流量控制之下...

"pool-2-thread-3" prio=10 tid=0x00007f4af4849800 nid=0x65d5 runnable [0x00007f4ae082f000]
 java.lang.Thread.State: RUNNABLE
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)

你在兔子日志中看到什么了吗?您在管理 UI 上看到了什么关于消息率、状态等的信息?

无论如何,这似乎与 Spring AMQP 没有任何关系。您需要联系rabbitmq-usersgoogle 组的 rabbitmq 人员。

(我正在使用 rabbitmq 3.4.2 进行测试)。

编辑2:

完全干净安装 3.5.2 ...

Start producer with configuration [threadCount=5, events=200000, sleep=0]
Producer:0 finished, events: 200000, Time(s): 39, tps: 5091
Producer:1 finished, events: 200000, Time(s): 39, tps: 5002
Producer:2 finished, events: 200000, Time(s): 40, tps: 4954
Producer:3 finished, events: 200000, Time(s): 40, tps: 4951
Producer:4 finished, events: 200000, Time(s): 40, tps: 4939

我在管理 UI 中没有看到任何flow状态(在队列中,但通道/连接显示它们正在流动,但又恢复了)。

于 2015-05-30T12:00:37.823 回答