2

我有如下的队列配置

 @Bean
   public ConnectionFactory connectionFactory() {
   CachingConnectionFactory connectionFactory = new            CachingConnectionFactory(hostName);
    connectionFactory.setUsername(mqUsername);
    connectionFactory.setPassword(mqPassword);
    connectionFactory.setVirtualHost(virtualHost);
    return connectionFactory;
   }

   @Bean
   RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    return rabbitTemplate;
   }

  @Bean
  public AmqpAdmin amqpAdmin() {
    RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
    return rabbitAdmin;
  }

我有异步配置

@EnableAsync
@Configuration
public class AsyncConfiguration implements AsyncConfigurer {

    @Override
    public Executor getAsyncExecutor() {
        return taskExector();
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new SimpleAsyncUncaughtExceptionHandler();
    }

    @Bean
    public ThreadPoolTaskExecutor taskExector() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.initialize();
        return taskExecutor;
    }

}       

在我的异步方法中,我使用的是 amqp admin 和 rabbit 模板 bean。因此,根据配置,我将在最大执行任务时有 10 个线程,一段时间后我发现应用程序挂起并使用执行器进行转储,我在下面的信息中找到,似乎从行号使用 rabbit 模板/amqp 管理 bean 时出现死锁。这种方法有什么问题,或者如何确保多个线程可以访问那些兔子 mq bean。

版本:Spring Boot 1.4.0.RELEASE,Java 8。

我的服务是这样的

@Service
public class QDispatcherService implements DispatcherService {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private AmqpAdmin amqpAdmin;


    @Autowired
    RabbitTemplate rabbitTemplate;

    @Override
    public void sendData(Data dataObject) throws Exception {

        try {
            //something on this properties , I have to check if queue exist or there are messages in it to decide to add message in other queue
            Properties properties = amqpAdmin.getQueueProperties(queueName);
            amqpAdmin.declareQueue(new Queue(queueName));
             logger.info("***********************DEBUG 4***********************");
            rabbitTemplate.convertAndSend(queueName, dataObject);

        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

}

{
    "threadName": "taskExector-10",
    "threadId": 77,
    "blockedTime": -1,
    "blockedCount": 317,
    "waitedTime": -1,
    "waitedCount": 379,
    "lockName": "com.rabbitmq.utility.BlockingValueOrException@105f30b9",
    "lockOwnerId": -1,
    "lockOwnerName": null,
    "inNative": false,
    "suspended": false,
    "threadState": "WAITING",
    "stackTrace": [
      {
        "methodName": "wait",
        "fileName": "Object.java",
        "lineNumber": -2,
        "className": "java.lang.Object",
        "nativeMethod": true
      },
      {
        "methodName": "wait",
        "fileName": "Object.java",
        "lineNumber": 502,
        "className": "java.lang.Object",
        "nativeMethod": false
      },
      {
        "methodName": "get",
        "fileName": "BlockingCell.java",
        "lineNumber": 50,
        "className": "com.rabbitmq.utility.BlockingCell",
        "nativeMethod": false
      },
      {
        "methodName": "uninterruptibleGet",
        "fileName": "BlockingCell.java",
        "lineNumber": 89,
        "className": "com.rabbitmq.utility.BlockingCell",
        "nativeMethod": false
      },
      {
        "methodName": "uninterruptibleGetValue",
        "fileName": "BlockingValueOrException.java",
        "lineNumber": 33,
        "className": "com.rabbitmq.utility.BlockingValueOrException",
        "nativeMethod": false
      },
      {
        "methodName": "getReply",
        "fileName": "AMQChannel.java",
        "lineNumber": 361,
        "className": "com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation",
        "nativeMethod": false
      },
      {
        "methodName": "privateRpc",
        "fileName": "AMQChannel.java",
        "lineNumber": 226,
        "className": "com.rabbitmq.client.impl.AMQChannel",
        "nativeMethod": false
      },
      {
        "methodName": "exnWrappingRpc",
        "fileName": "AMQChannel.java",
        "lineNumber": 118,
        "className": "com.rabbitmq.client.impl.AMQChannel",
        "nativeMethod": false
      },
      {
        "methodName": "queueDeclare",
        "fileName": "ChannelN.java",
        "lineNumber": 844,
        "className": "com.rabbitmq.client.impl.ChannelN",
        "nativeMethod": false
      },
      {
        "methodName": "queueDeclare",
        "fileName": "ChannelN.java",
        "lineNumber": 61,
        "className": "com.rabbitmq.client.impl.ChannelN",
        "nativeMethod": false
      },
      {
        "methodName": "invoke",
        "fileName": null,
        "lineNumber": -1,
        "className": "sun.reflect.GeneratedMethodAccessor176",
        "nativeMethod": false
      },
      {
        "methodName": "invoke",
        "fileName": "DelegatingMethodAccessorImpl.java",
        "lineNumber": 43,
        "className": "sun.reflect.DelegatingMethodAccessorImpl",
        "nativeMethod": false
      },
      {
        "methodName": "invoke",
        "fileName": "Method.java",
        "lineNumber": 498,
        "className": "java.lang.reflect.Method",
        "nativeMethod": false
      },
      {
        "methodName": "invoke",
        "fileName": "CachingConnectionFactory.java",
        "lineNumber": 916,
        "className": "org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler",
        "nativeMethod": false
      },
      {
        "methodName": "queueDeclare",
        "fileName": null,
        "lineNumber": -1,
        "className": "com.sun.proxy.$Proxy166",
        "nativeMethod": false
      },
      {
        "methodName": "declareQueues",
        "fileName": "RabbitAdmin.java",
        "lineNumber": 577,
        "className": "org.springframework.amqp.rabbit.core.RabbitAdmin",
        "nativeMethod": false
      },
      {
        "methodName": "access$200",
        "fileName": "RabbitAdmin.java",
        "lineNumber": 67,
        "className": "org.springframework.amqp.rabbit.core.RabbitAdmin",
        "nativeMethod": false
      },
      {
        "methodName": "doInRabbit",
        "fileName": "RabbitAdmin.java",
        "lineNumber": 209,
        "className": "org.springframework.amqp.rabbit.core.RabbitAdmin$3",
        "nativeMethod": false
      },
      {
        "methodName": "doInRabbit",
        "fileName": "RabbitAdmin.java",
        "lineNumber": 206,
        "className": "org.springframework.amqp.rabbit.core.RabbitAdmin$3",
        "nativeMethod": false
      },
      {
        "methodName": "doExecute",
        "fileName": "RabbitTemplate.java",
        "lineNumber": 1394,
        "className": "org.springframework.amqp.rabbit.core.RabbitTemplate",
        "nativeMethod": false
      },
      {
        "methodName": "execute",
        "fileName": "RabbitTemplate.java",
        "lineNumber": 1367,
        "className": "org.springframework.amqp.rabbit.core.RabbitTemplate",
        "nativeMethod": false
      },
      {
        "methodName": "execute",
        "fileName": "RabbitTemplate.java",
        "lineNumber": 1343,
        "className": "org.springframework.amqp.rabbit.core.RabbitTemplate",
        "nativeMethod": false
      },
      {
        "methodName": "declareQueue",
        "fileName": "RabbitAdmin.java",
        "lineNumber": 206,
        "className": "org.springframework.amqp.rabbit.core.RabbitAdmin",
        "nativeMethod": false
      },
      {
        "methodName": "sendData",
        "fileName": "QDispatcherService.java",
        "lineNumber": 59,
        "className": "com.mycompany.QDispatcherService",
        "nativeMethod": false
      },

      ....

       "lockedMonitors": [
      {
        "className": "java.lang.Object",
        "identityHashCode": 285810320,
        "lockedStackFrame": {
          "methodName": "invoke",
          "fileName": "CachingConnectionFactory.java",
          "lineNumber": 916,
          "className": "org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler",
          "nativeMethod": false
        },
        "lockedStackDepth": 13
      }
    ],
    "lockedSynchronizers": [
      {
        "className": "java.util.concurrent.ThreadPoolExecutor$Worker",
        "identityHashCode": 372417558
      }
    ],
    "lockInfo": {
      "className": "com.rabbitmq.utility.BlockingValueOrException",
      "identityHashCode": 274673849
    }
  },

____________________________________________________________________________-

新踪迹

{
"threadName": "taskExector-10",
"threadId": 77,
"blockedTime": -1,
"blockedCount": 37,
"waitedTime": -1,
"waitedCount": 1113,
"lockName": "java.io.DataOutputStream@33111fc",
"lockOwnerId": 65,
"lockOwnerName": "taskExector-8",
"inNative": false,
"suspended": false,
"threadState": "BLOCKED",
"stackTrace": [
  {
    "methodName": "writeFrame",
    "fileName": "SocketFrameHandler.java",
    "lineNumber": 170,
    "className": "com.rabbitmq.client.impl.SocketFrameHandler",
    "nativeMethod": false
  },
  {
    "methodName": "writeFrame",
    "fileName": "AMQConnection.java",
    "lineNumber": 542,
    "className": "com.rabbitmq.client.impl.AMQConnection",
    "nativeMethod": false
  },
  {
    "methodName": "transmit",
    "fileName": "AMQCommand.java",
    "lineNumber": 104,
    "className": "com.rabbitmq.client.impl.AMQCommand",
    "nativeMethod": false
  },
  {
    "methodName": "quiescingTransmit",
    "fileName": "AMQChannel.java",
    "lineNumber": 337,
    "className": "com.rabbitmq.client.impl.AMQChannel",
    "nativeMethod": false
  },
  {
    "methodName": "transmit",
    "fileName": "AMQChannel.java",
    "lineNumber": 313,
    "className": "com.rabbitmq.client.impl.AMQChannel",
    "nativeMethod": false
  },
  {
    "methodName": "basicPublish",
    "fileName": "ChannelN.java",
    "lineNumber": 686,
    "className": "com.rabbitmq.client.impl.ChannelN",
    "nativeMethod": false
  },
  {
    "methodName": "basicPublish",
    "fileName": "ChannelN.java",
    "lineNumber": 668,
    "className": "com.rabbitmq.client.impl.ChannelN",
    "nativeMethod": false
  },
  {
    "methodName": "invoke",
    "fileName": null,
    "lineNumber": -1,
    "className": "sun.reflect.GeneratedMethodAccessor176",
    "nativeMethod": false
  },
  {
    "methodName": "invoke",
    "fileName": "DelegatingMethodAccessorImpl.java",
    "lineNumber": 43,
    "className": "sun.reflect.DelegatingMethodAccessorImpl",
    "nativeMethod": false
  },
  {
    "methodName": "invoke",
    "fileName": "Method.java",
    "lineNumber": 498,
    "className": "java.lang.reflect.Method",
    "nativeMethod": false
  },
  {
    "methodName": "invoke",
    "fileName": "CachingConnectionFactory.java",
    "lineNumber": 916,
    "className": "org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler",
    "nativeMethod": false
  },
  {
    "methodName": "basicPublish",
    "fileName": null,
    "lineNumber": -1,
    "className": "com.sun.proxy.$Proxy166",
    "nativeMethod": false
  },
  {
    "methodName": "doSend",
    "fileName": "RabbitTemplate.java",
    "lineNumber": 1451,
    "className": "org.springframework.amqp.rabbit.core.RabbitTemplate",
    "nativeMethod": false
  },
  {
    "methodName": "doInRabbit",
    "fileName": "RabbitTemplate.java",
    "lineNumber": 703,
    "className": "org.springframework.amqp.rabbit.core.RabbitTemplate$3",
    "nativeMethod": false
  },
  {
    "methodName": "doExecute",
    "fileName": "RabbitTemplate.java",
    "lineNumber": 1394,
    "className": "org.springframework.amqp.rabbit.core.RabbitTemplate",
    "nativeMethod": false
  },
  {
    "methodName": "execute",
    "fileName": "RabbitTemplate.java",
    "lineNumber": 1367,
    "className": "org.springframework.amqp.rabbit.core.RabbitTemplate",
    "nativeMethod": false
  },
  {
    "methodName": "send",
    "fileName": "RabbitTemplate.java",
    "lineNumber": 699,
    "className": "org.springframework.amqp.rabbit.core.RabbitTemplate",
    "nativeMethod": false
  },
  {
    "methodName": "convertAndSend",
    "fileName": "RabbitTemplate.java",
    "lineNumber": 767,
    "className": "org.springframework.amqp.rabbit.core.RabbitTemplate",
    "nativeMethod": false
  },
  {
    "methodName": "convertAndSend",
    "fileName": "RabbitTemplate.java",
    "lineNumber": 754,
    "className": "org.springframework.amqp.rabbit.core.RabbitTemplate",
    "nativeMethod": false
  }
4

1 回答 1

0

在每次发送时声明队列有点不寻常;也就是说,除了效率低下之外,它应该可以工作。堆栈跟踪表明我们被困在兔子客户端等待队列声明的回复。当多个线程使用同一个通道时,我已经看到了这种情况,但这绝不应该在这里发生,因为当当前操作(管理员、模板)完成并且不能被多个线程使用时,通道总是返回到缓存中。

您可能想尝试新的 4.0.0.RC1 amqp-client jar,因为它们现在添加了日志记录(这在 3.xx 客户端中不可用)。它可能有助于追踪事情。

于 2016-11-11T13:41:40.603 回答