2016-11-10 103 views
1

我有队列配置如下图所示异步和共享兔模板

@Bean 
    public ConnectionFactory connectionFactory() { 
    CachingConnectionFactory connectionFactory = new   CachingConnectionFactory(hostName); 
    connectionFactory.setUsername(mqUsername); 
    connectionFactory.setPassword(mqPassword); 
    connectionFactory.setVirtualHost(virtualHost); 
    return connectionFactory; 
    } 

    @Bean 
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { 
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); 
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); 
    return rabbitTemplate; 
    } 

    @Bean 
    public AmqpAdmin amqpAdmin() { 
    RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory()); 
    return rabbitAdmin; 
    } 

,我有异步配置像

@EnableAsync 
@Configuration 
public class AsyncConfiguration implements AsyncConfigurer { 

    @Override 
    public Executor getAsyncExecutor() { 
     return taskExector(); 
    } 

    @Override 
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { 
     return new SimpleAsyncUncaughtExceptionHandler(); 
    } 

    @Bean 
    public ThreadPoolTaskExecutor taskExector() { 
     ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); 
     taskExecutor.setCorePoolSize(10); 
     taskExecutor.setMaxPoolSize(10); 
     taskExecutor.initialize(); 
     return taskExecutor; 
    } 

}  

而在我的异步方法我使用AMQP管理和兔模板豆。因此,作为每个配置我将有10个线程的最高执行任务,我已经过了一段时间应用程序挂起发现和使用驱动器,我发现下面的信息以转储,似乎僵局使用兔模板/ AMQP管理豆从行号。 有什么不对的这种做法或如何确保多个线程可以访问那些兔子MQ豆。

版本:春天开机1.4.0.RELEASE,爪哇8

我的服务是这样的

@Service 
public class QDispatcherService implements DispatcherService { 
    private final Logger logger = LoggerFactory.getLogger(this.getClass()); 

    @Autowired 
    private AmqpAdmin amqpAdmin; 


    @Autowired 
    RabbitTemplate rabbitTemplate; 

    @Override 
    public void sendData(Data dataObject) throws Exception { 

     try { 
      //something on this properties , I have to check if queue exist or there are messages in it to decide to add message in other queue 
      Properties properties = amqpAdmin.getQueueProperties(queueName); 
      amqpAdmin.declareQueue(new Queue(queueName)); 
      logger.info("***********************DEBUG 4***********************"); 
      rabbitTemplate.convertAndSend(queueName, dataObject); 

     } catch (Exception ex) { 
      ex.printStackTrace(); 
     } 
    } 

} 

{ 
    "threadName": "taskExector-10", 
    "threadId": 77, 
    "blockedTime": -1, 
    "blockedCount": 317, 
    "waitedTime": -1, 
    "waitedCount": 379, 
    "lockName": "[email protected]", 
    "lockOwnerId": -1, 
    "lockOwnerName": null, 
    "inNative": false, 
    "suspended": false, 
    "threadState": "WAITING", 
    "stackTrace": [ 
     { 
     "methodName": "wait", 
     "fileName": "Object.java", 
     "lineNumber": -2, 
     "className": "java.lang.Object", 
     "nativeMethod": true 
     }, 
     { 
     "methodName": "wait", 
     "fileName": "Object.java", 
     "lineNumber": 502, 
     "className": "java.lang.Object", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "get", 
     "fileName": "BlockingCell.java", 
     "lineNumber": 50, 
     "className": "com.rabbitmq.utility.BlockingCell", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "uninterruptibleGet", 
     "fileName": "BlockingCell.java", 
     "lineNumber": 89, 
     "className": "com.rabbitmq.utility.BlockingCell", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "uninterruptibleGetValue", 
     "fileName": "BlockingValueOrException.java", 
     "lineNumber": 33, 
     "className": "com.rabbitmq.utility.BlockingValueOrException", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "getReply", 
     "fileName": "AMQChannel.java", 
     "lineNumber": 361, 
     "className": "com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "privateRpc", 
     "fileName": "AMQChannel.java", 
     "lineNumber": 226, 
     "className": "com.rabbitmq.client.impl.AMQChannel", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "exnWrappingRpc", 
     "fileName": "AMQChannel.java", 
     "lineNumber": 118, 
     "className": "com.rabbitmq.client.impl.AMQChannel", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "queueDeclare", 
     "fileName": "ChannelN.java", 
     "lineNumber": 844, 
     "className": "com.rabbitmq.client.impl.ChannelN", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "queueDeclare", 
     "fileName": "ChannelN.java", 
     "lineNumber": 61, 
     "className": "com.rabbitmq.client.impl.ChannelN", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "invoke", 
     "fileName": null, 
     "lineNumber": -1, 
     "className": "sun.reflect.GeneratedMethodAccessor176", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "invoke", 
     "fileName": "DelegatingMethodAccessorImpl.java", 
     "lineNumber": 43, 
     "className": "sun.reflect.DelegatingMethodAccessorImpl", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "invoke", 
     "fileName": "Method.java", 
     "lineNumber": 498, 
     "className": "java.lang.reflect.Method", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "invoke", 
     "fileName": "CachingConnectionFactory.java", 
     "lineNumber": 916, 
     "className": "org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "queueDeclare", 
     "fileName": null, 
     "lineNumber": -1, 
     "className": "com.sun.proxy.$Proxy166", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "declareQueues", 
     "fileName": "RabbitAdmin.java", 
     "lineNumber": 577, 
     "className": "org.springframework.amqp.rabbit.core.RabbitAdmin", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "access$200", 
     "fileName": "RabbitAdmin.java", 
     "lineNumber": 67, 
     "className": "org.springframework.amqp.rabbit.core.RabbitAdmin", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "doInRabbit", 
     "fileName": "RabbitAdmin.java", 
     "lineNumber": 209, 
     "className": "org.springframework.amqp.rabbit.core.RabbitAdmin$3", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "doInRabbit", 
     "fileName": "RabbitAdmin.java", 
     "lineNumber": 206, 
     "className": "org.springframework.amqp.rabbit.core.RabbitAdmin$3", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "doExecute", 
     "fileName": "RabbitTemplate.java", 
     "lineNumber": 1394, 
     "className": "org.springframework.amqp.rabbit.core.RabbitTemplate", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "execute", 
     "fileName": "RabbitTemplate.java", 
     "lineNumber": 1367, 
     "className": "org.springframework.amqp.rabbit.core.RabbitTemplate", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "execute", 
     "fileName": "RabbitTemplate.java", 
     "lineNumber": 1343, 
     "className": "org.springframework.amqp.rabbit.core.RabbitTemplate", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "declareQueue", 
     "fileName": "RabbitAdmin.java", 
     "lineNumber": 206, 
     "className": "org.springframework.amqp.rabbit.core.RabbitAdmin", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "sendData", 
     "fileName": "QDispatcherService.java", 
     "lineNumber": 59, 
     "className": "com.mycompany.QDispatcherService", 
     "nativeMethod": false 
     }, 

     .... 

     "lockedMonitors": [ 
     { 
     "className": "java.lang.Object", 
     "identityHashCode": 285810320, 
     "lockedStackFrame": { 
      "methodName": "invoke", 
      "fileName": "CachingConnectionFactory.java", 
      "lineNumber": 916, 
      "className": "org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler", 
      "nativeMethod": false 
     }, 
     "lockedStackDepth": 13 
     } 
    ], 
    "lockedSynchronizers": [ 
     { 
     "className": "java.util.concurrent.ThreadPoolExecutor$Worker", 
     "identityHashCode": 372417558 
     } 
    ], 
    "lockInfo": { 
     "className": "com.rabbitmq.utility.BlockingValueOrException", 
     "identityHashCode": 274673849 
    } 
    }, 

________________________________________________________________________-

新的跟踪

{ 
"threadName": "taskExector-10", 
"threadId": 77, 
"blockedTime": -1, 
"blockedCount": 37, 
"waitedTime": -1, 
"waitedCount": 1113, 
"lockName": "[email protected]", 
"lockOwnerId": 65, 
"lockOwnerName": "taskExector-8", 
"inNative": false, 
"suspended": false, 
"threadState": "BLOCKED", 
"stackTrace": [ 
    { 
    "methodName": "writeFrame", 
    "fileName": "SocketFrameHandler.java", 
    "lineNumber": 170, 
    "className": "com.rabbitmq.client.impl.SocketFrameHandler", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "writeFrame", 
    "fileName": "AMQConnection.java", 
    "lineNumber": 542, 
    "className": "com.rabbitmq.client.impl.AMQConnection", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "transmit", 
    "fileName": "AMQCommand.java", 
    "lineNumber": 104, 
    "className": "com.rabbitmq.client.impl.AMQCommand", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "quiescingTransmit", 
    "fileName": "AMQChannel.java", 
    "lineNumber": 337, 
    "className": "com.rabbitmq.client.impl.AMQChannel", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "transmit", 
    "fileName": "AMQChannel.java", 
    "lineNumber": 313, 
    "className": "com.rabbitmq.client.impl.AMQChannel", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "basicPublish", 
    "fileName": "ChannelN.java", 
    "lineNumber": 686, 
    "className": "com.rabbitmq.client.impl.ChannelN", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "basicPublish", 
    "fileName": "ChannelN.java", 
    "lineNumber": 668, 
    "className": "com.rabbitmq.client.impl.ChannelN", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "invoke", 
    "fileName": null, 
    "lineNumber": -1, 
    "className": "sun.reflect.GeneratedMethodAccessor176", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "invoke", 
    "fileName": "DelegatingMethodAccessorImpl.java", 
    "lineNumber": 43, 
    "className": "sun.reflect.DelegatingMethodAccessorImpl", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "invoke", 
    "fileName": "Method.java", 
    "lineNumber": 498, 
    "className": "java.lang.reflect.Method", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "invoke", 
    "fileName": "CachingConnectionFactory.java", 
    "lineNumber": 916, 
    "className": "org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "basicPublish", 
    "fileName": null, 
    "lineNumber": -1, 
    "className": "com.sun.proxy.$Proxy166", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "doSend", 
    "fileName": "RabbitTemplate.java", 
    "lineNumber": 1451, 
    "className": "org.springframework.amqp.rabbit.core.RabbitTemplate", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "doInRabbit", 
    "fileName": "RabbitTemplate.java", 
    "lineNumber": 703, 
    "className": "org.springframework.amqp.rabbit.core.RabbitTemplate$3", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "doExecute", 
    "fileName": "RabbitTemplate.java", 
    "lineNumber": 1394, 
    "className": "org.springframework.amqp.rabbit.core.RabbitTemplate", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "execute", 
    "fileName": "RabbitTemplate.java", 
    "lineNumber": 1367, 
    "className": "org.springframework.amqp.rabbit.core.RabbitTemplate", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "send", 
    "fileName": "RabbitTemplate.java", 
    "lineNumber": 699, 
    "className": "org.springframework.amqp.rabbit.core.RabbitTemplate", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "convertAndSend", 
    "fileName": "RabbitTemplate.java", 
    "lineNumber": 767, 
    "className": "org.springframework.amqp.rabbit.core.RabbitTemplate", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "convertAndSend", 
    "fileName": "RabbitTemplate.java", 
    "lineNumber": 754, 
    "className": "org.springframework.amqp.rabbit.core.RabbitTemplate", 
    "nativeMethod": false 
    } 
+0

你必须显示你的代码如何使用'RabbitTemplate'和'RabbitAdmin' –

+0

在主要问题 – user3444718

+0

中添加好。你可以分享一些简单的启动应用程序从我们这边玩吗? –

回答

0

这是一个有点不寻常的宣布,每发送队列中;这表示,除了效率低下之外,它应该起作用。堆栈跟踪表明我们卡在兔子客户端,等待队列声明的回复。当多个线程使用相同信道我已经看到了这一点,但由于信道总是返回到缓存当前操作(管理,模板)完成时,不能由多个线程使用的是不应该发生在这里。

您可能想尝试新的4.0.0.RC1 amqp-client jar,因为它们现在添加了日志记录(这在3.x.x客户端中不可用)。它可能有助于追踪事情。

+0

谢谢我会尝试使用新版本,我想排除这是因为一些其他应用程序尝试使用/删除队列的时候,当我们试图获取队列道具或者试图声明队列的时候,就像你说的等待兔子的回复,这是否意味着没有什么像锁定资源,因为我们的自定义线程池用于异步,I Ť呃,它不是但要确认。 (初始实验看起来是否删除了我的服务的异步性质,它的工作正常,但必须做更多的测试)。为什么它没有超时如何设置为兔子操作或 – user3444718

+0

我不知道兔子客户端的内部不够好 - 你必须问rabbitmq用户谷歌组的问题。我以前只有在有人试图发送消息的时候才看到过这个消息 - 这是一个类似的死锁 - 但我没有看到你在这里强制使用。 –

+0

我没有直接使用amqp lib,我使用1.6.1.RELEASE的spring-ampq。 (spring-boot-starter-amqp spring boot ver 1.4.0.release) – user3444718