嗨,我对 HornetQ 比较陌生。我编写了一个测试包,它有一个嵌入式 HornetQ 服务器,带有一个生产者和一个异步消费者。希望我已经正确地完成了实施。现在问题如下...
当我通过 prodcuer 向队列发送消息时,它返回成功,但是当尝试从队列中消费消息时,似乎没有消息被消费。就像消费者不活跃一样。
当我尝试使用该
setSendAcknowledgementHandler
方法时,出现错误:java.lang.IllegalStateException: You can't set confirmationHandler on a connection with confirmation-window-size < 0. Look at the documentation for more information. at org.hornetq.core.protocol.core.impl.ChannelImpl.setCommandConfirmationHandler(ChannelImpl.java:330) at org.hornetq.core.client.impl.ClientSessionImpl.setSendAcknowledgementHandler(ClientSessionImpl.java:943) at org.hornetq.core.client.impl.DelegatingSession.setSendAcknowledgementHandler(DelegatingSession.java:493) at componentsII.QueueProducer.SendMessage(QueueProducer.java:9) at componentsII.StartClients.main(StartClients.java:11)
检查下面的课程。
连接和会话
public class QueueConnection {
private static org.hornetq.api.core.TransportConfiguration transport;
private static org.hornetq.api.core.client.ClientSessionFactory sharedfactory;
public static org.hornetq.api.core.client.ClientSession sharedSession;
private static java.util.HashMap<String,Object> maps;
private static boolean started= false;
public static void setMaps(String []key, Object[] value){
maps= new java.util.HashMap<String, Object>() ;
if(key.length!=value.length){
maps=null;
}else{
for(int x=0;x<value.length;x++){
maps.put(key[x], value[x]);
}
}
}
private static org.hornetq.api.core.client.ClientSession session(){
try{
if(sharedSession==null){
sharedSession=factory().createSession(true,true,0);
}
}catch(org.hornetq.api.core.HornetQException e){
e.printStackTrace();
}catch(Exception e){
e.printStackTrace();
}
return sharedSession;
}
public static synchronized int size(String queueName){
int count = 0;
try {
org.hornetq.api.core.client.ClientSession.QueueQuery result;
result = sharedSession.queueQuery(new org.hornetq.api.core.SimpleString(queueName));
count = (int) result.getMessageCount();
} catch (org.hornetq.api.core.HornetQException e) {
e.printStackTrace();
}
return count;
}
public static void startSession(){
try{
if (session() != null || started!=true){
session().start();
started=true;
System.out.println("Client Session started");
}else{
System.out.println("Client Session already started");
}
}catch(org.hornetq.api.core.HornetQException e){
e.printStackTrace();
}catch(Exception e){
e.printStackTrace();
}
}
public static void stopSession(){
try{
if (session() != null ){
session().stop();
factory().close();
sharedSession=null;
sharedfactory=null;
System.out.println("Client Session stopped");
}
}catch(org.hornetq.api.core.HornetQException e){
e.printStackTrace();
}catch(Exception e){
e.printStackTrace();
}
}
private static org.hornetq.api.core.TransportConfiguration TransportConfigs(){
transport=new org.hornetq.api.core.TransportConfiguration(org.hornetq.core.remoting.impl.netty.NettyConnectorFactory.class.getName(),maps);
return transport;
}
private static org.hornetq.api.core.client.ClientSessionFactory factory(){
try{
if(sharedfactory==null){
org.hornetq.api.core.client.ServerLocator locator=org.hornetq.api.core.client.HornetQClient.createServerLocator(true,TransportConfigs());
locator.setAckBatchSize(0);
locator.setReconnectAttempts(3);
locator.setConfirmationWindowSize(2);
sharedfactory=locator.createSessionFactory();
}
}catch(Exception e){
e.printStackTrace();
}
return sharedfactory;
}
}
消费者
public class QueueConsumer {
public static void Recieve(String queuename){
try {
org.hornetq.api.core.client.ClientConsumer consumer = QueueConnection.sharedSession.createConsumer(queuename);
consumer.setMessageHandler(new msghandler());
} catch (org.hornetq.api.core.HornetQException e) {
e.printStackTrace();
} catch(Exception e){
e.printStackTrace();
}
}
private static class msghandler implements org.hornetq.api.core.client.MessageHandler {
@Override
public void onMessage(org.hornetq.api.core.client.ClientMessage msg) {
System.out.println("Message consumed ~"+msg.getStringProperty("myMsg"));
}
}
}
制片人
public class QueueProducer {
public static void SendMessage(String queuename,String msg){
try{
QueueConnection.sharedSession.setSendAcknowledgementHandler(new acknowledegeHandler());
org.hornetq.api.core.client.ClientProducer producer= QueueConnection.sharedSession.createProducer(queuename);
org.hornetq.api.core.client.ClientMessage message = QueueConnection.sharedSession.createMessage(org.hornetq.api.core.client.ClientMessage.TEXT_TYPE,true);
message.putStringProperty("myMsg", msg);
producer.send(queuename,message);
System.out.println("Message Sent to "+queuename);
}catch(org.hornetq.api.core.HornetQException e){
e.printStackTrace();
}catch(Exception e){
e.printStackTrace();
}
}
private static class acknowledegeHandler implements org.hornetq.api.core.client.SendAcknowledgementHandler{
@Override
public void sendAcknowledged( org.hornetq.api.core.Message msg) {
System.out.println("Received acknowledgement for message ~: "+msg.getStringProperty("myMsg"));
}
}
}
初始化客户端
public class StartClients {
private static void initialize(){
String keys[]={org.hornetq.core.remoting.impl.netty.TransportConstants.HOST_PROP_NAME,org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME};
Object values[]={"localhost",5664};
QueueConnection.setMaps(keys,values);
QueueConnection.startSession();
}
public static void main(String []args){
new Thread(){@Override
public void run(){
System.out.println("Producer Thread");
StartClients.initialize();
String QueueName= "queues.Queue1";
for(int a=1;a<10;a++){
QueueProducer.SendMessage(QueueName, "Message "+a+" to the embedded HornetQ Server");
}
System.out.println("Queue "+QueueName+" has "+QueueConnection.size(QueueName)+" messages on send");
QueueConnection.stopSession();
}}.start();
new Thread(){@Override
public void run(){
try{
sleep(5000);
System.out.println();
System.out.println("Consumer Thread");
StartClients.initialize();
String QueueName= "queues.Queue1";
QueueConsumer.Recieve(QueueName);
System.out.println("Queue "+QueueName+" has "+QueueConnection.size(QueueName)+" messages after consumption");
QueueConnection.stopSession();
}catch(java.lang.InterruptedException e){
e.printStackTrace();
}catch(Exception e){
e.printStackTrace();
}
}}.start();
}
}
嵌入式服务器
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.CoreQueueConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.JournalType;
import org.hornetq.core.server.embedded.EmbeddedHornetQ;
public class QueueServer {
public static void StartServer(){
try {
//Connection configurations
Map<String, Object> params = new HashMap<String, Object>();
params.put(TransportConstants.HOST_PROP_NAME, "localhost");
params.put(TransportConstants.PORT_PROP_NAME, 5664);
params.put(TransportConstants.USE_NIO_PROP_NAME, true);
params.put(TransportConstants.TCP_NODELAY_PROPNAME, true);
params.put(TransportConstants.NIO_REMOTING_THREADS_PROPNAME,3);
Map<String, Object> params2 = new HashMap<String, Object>();
params2.put(TransportConstants.HOST_PROP_NAME, "localhost");
params2.put(TransportConstants.PORT_PROP_NAME, 5665);
params2.put(TransportConstants.USE_NIO_PROP_NAME, true);
params2.put(TransportConstants.TCP_NODELAY_PROPNAME, true);
params2.put(TransportConstants.NIO_REMOTING_THREADS_PROPNAME,3);
Map<String, Object> params3 = new HashMap<String, Object>();
params3.put(TransportConstants.HOST_PROP_NAME, "localhost");
params3.put(TransportConstants.PORT_PROP_NAME, 5666);
params3.put(TransportConstants.USE_NIO_PROP_NAME, true);
params3.put(TransportConstants.TCP_NODELAY_PROPNAME, true);
params3.put(TransportConstants.NIO_REMOTING_THREADS_PROPNAME,3);
Map<String, Object> params4 = new HashMap<String, Object>();
params4.put(TransportConstants.HOST_PROP_NAME, "localhost");
params4.put(TransportConstants.PORT_PROP_NAME, 5667);
params4.put(TransportConstants.USE_NIO_PROP_NAME, true);
params4.put(TransportConstants.TCP_NODELAY_PROPNAME, true);
params4.put(TransportConstants.NIO_REMOTING_THREADS_PROPNAME,3);
//Server configurations
Configuration config= new ConfigurationImpl();
HashSet<TransportConfiguration>transports= new HashSet <TransportConfiguration>();
transports.add(new TransportConfiguration(NettyAcceptorFactory.class.getName(),params));
transports.add(new TransportConfiguration(NettyAcceptorFactory.class.getName(),params2));
transports.add(new TransportConfiguration(NettyAcceptorFactory.class.getName(),params3));
transports.add(new TransportConfiguration(NettyAcceptorFactory.class.getName(),params4));
//Queues Configurations
List<CoreQueueConfiguration> queueConfigs = new ArrayList<CoreQueueConfiguration>();
String queueName="queues.Queue";
for(int x=1;x<5;x++){
queueConfigs.add(new CoreQueueConfiguration(queueName.concat(String.valueOf(x)),queueName.concat(String.valueOf(x)), null, true));
}
//Set Configurations
config.setAcceptorConfigurations(transports);
config.setQueueConfigurations(queueConfigs);
config.setJournalType(JournalType.NIO);
config.setSecurityEnabled(false);
//Starting server
EmbeddedHornetQ embedded = new EmbeddedHornetQ();
embedded.setConfiguration(config);
embedded.start();
Thread.sleep(6000000);
embedded.stop();
} catch (Exception ex) {
Logger.getLogger(QueueServer.class.getName()).log(Level.SEVERE, null, ex);
}
}
public static void main(String [] args){
StartServer();
}
}
我哪里出错了?