After Google for a few days, and i believe i am totally lost. I would like to implement a kind of priority queue that has about 3 queues:
- high priority queue (daily), that needs to be process first.
- mid priority queue (weekly), that will process if no items in queue #1. (it is ok message in this queue it never process at all)
- low priority queue (monthly), that will process if no items in queue #1 & #2. (it is ok message in this queue it never process at all)
Initially I have the following flow, to have a consumer to consume messages from all three queues and checks whether there is any items in queue #1, #2 and #3. and then I realize that this is wrong because:
- I am totally lost with a question: "How do I know which queue it is coming from?".
- I'm already consuming a message regardless from any queue, So if I get an object from lower priority queue, am I gonna put it back to the queue if I discover there is a message at the higher priority queue?
Following is my current configurations, which shows what an idiot I am.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<rabbit:connection-factory id="connectionFactory" host="localhost" />
<rabbit:template id="amqpTemplatead_daily" connection-factory="connectionFactory"
exchange="" routing-key="daily_queue"/>
<rabbit:template id="amqpTemplatead_weekly" connection-factory="connectionFactory"
exchange="" routing-key="weekly_queue"/>
<rabbit:template id="amqpTemplatead_monthly" connection-factory="connectionFactory"
exchange="" routing-key="monthly_queue"/>
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="Consumer" method="consume" queue-names="daily_queue" />
</rabbit:listener-container>
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="Consumer" method="consume" queue-names="weekly_queue" />
</rabbit:listener-container>
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="Consumer" method="consume" queue-names="monthly_queue" />
</rabbit:listener-container>
<bean id="Consumer" class="com.test.Consumer" />
</beans>
Any idea how should I tackle this with priority queue?
ps: I also wonder, if Apache Camel has something I can depend on?
UPDATE 1: I just saw this from Apache Camel: "https://issues.apache.org/jira/browse/CAMEL-2537" the sequencer on JMSPriority seems to be what im looking for, anyone has tried this before?
UPDATE 2: assuming i am to use RabbitMQ's plugin base on @Gary Russell recommendation, I have the following spring-rabbitmq context XML configuration, which seems to make sense (by guest..):
<rabbit:queue name="ad_google_dfa_reporting_queue">
<rabbit:queue-arguments>
<entry key="x-max-priority" value="10"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="adGoogleDfaReporting" method="consume" queue-names="ad_google_dfa_reporting_queue" />
</rabbit:listener-container>
<bean id="Consumer" class="com.test.Consumer" />
The above xml configuration has successfully create a Queue, with name: "ad_google_dfa_reporting_queue", and with Parameter arguments: x-max-priority: 10 & durable: true
But not when comes to the code that send the message with priority, I totally lost it. How to define the priority as mention in the Sample URL: https://github.com/rabbitmq/rabbitmq-priority-queue/blob/master/examples/java/src/com/rabbitmq/examples/PriorityQueue.java
AmqpTemplate amqpTemplateGoogleDfaReporting = (AmqpTemplate) applicationContext.getBean("amqpTemplateadGoogleDfaReporting");
amqpTemplateGoogleDfaReporting.convertAndSend("message"); // how to define message priority?
UPDATE 3: Based on the @Gary's answer, i manage to sent message with priority set in the message, as per image below: However, when i sent in 1000 messages with random priority between 1-10, the consumer is consuming message with all kinds of priority. (I was expecting only the high priority message to be consume first). following is the code for Message producer:
Random random = new Random();
for (int i=0; i< 1000; i++){
final int priority = random.nextInt(10 - 1 + 1) + 1;
DfaReportingModel model = new DfaReportingModel();
model.setReportType(DfaReportingModel.ReportType.FACT);
model.setUserProfileId(0l + priority);
amqpTemplateGoogleDfaReporting.convertAndSend(model, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setPriority(priority);
return message;
}
});
}
And following is the code for Message consumer:
public void consume(DfaReportingModel message) {
System.out.println(message.getUserProfileId());
Thread.sleep(500);
}
The result im getting:
9, 10, 7, 9, 6, 4, 10, 10, 3, 10, 6, 1, 5, 6, 6, 3, 4, 7, 6, 8, 3, 1, 4, 5, 5, 3, 10, 9, 5, 1, 8, 9, 6, 9, 3, 10, 7, 4, 8, 7, 3, 4, 8, 2, 6, 9, 6, 4, 7, 7, 2, 8, 4, 4, 1,
UPDATE 4: Problem solved! Knowing the sample code from https://github.com/rabbitmq/rabbitmq-priority-queue is working in my environment, I presume that the problem is around the spring context. Hence, after countless time on try and error with different type of configurations, and I pin point the exact combination that will make this works! and is as per following:
<rabbit:queue name="ad_google_dfa_reporting_queue">
<rabbit:queue-arguments>
<entry key="x-max-priority">
<value type="java.lang.Integer">10</value> <!-- MUST specifically define java.lang.Integer to get it to work -->
</entry>
</rabbit:queue-arguments>
</rabbit:queue>
Without specifically define the value is Integer type, the priority queue does not work. Finally, it is solved. Yay!