I have written a C# utility to consume/delete certain messages in an activeMq queue. This utility fails to remove all of the messages which match the selector. Is there a problem with ActiveMQ, or (more likely) I am doing something incorrectly? A similar utility running on Solace does not manifest the same issue.
The message producer adds an identifying property to the msg; the utility will consume messages where the property has a certain value by using a selector. The messages being deleted are those for application entities that users have decided to cancel.
When the number of messages in the queue is small, e.g. 100, the queue draining utility works as planned. When the number of messages is large, e.g. 10000, the utility will remove/drain the first 50ish of those messages that should be removed, leaving a large number that should have been drained but have not been.
The two code samples below are taken from linqpad scripts that demonstrate the issue. The first creates 10000 messages, tagged randomly with one of 10 strings ('aaaaaaaa' to 'jjjjjjjj'). The second code block attempts to remove those messages with tagged with 'aaaaaaaa'. The first block will generate around 1000 'aaaaaaaa' messages, but the second block will only drain around 50.
(I am using ActiveMq version: 5.16.3 on windows, the linqpad scripts reference Apache.NMS.ActiveMQ.NetCore version 1.7.3 running on .Net 5.0)
producer.linq
string queueName = "queue.1";
var rnd = new System.Random();
var tags = new[] {
"aaaaaaaa", "bbbbbbbb", "cccccccc", "dddddddd", "eeeeeeee",
"ffffffff", "gggggggg", "hhhhhhhh", "iiiiiiii", "jjjjjjjj"};
Uri uri = new Uri("activemq:tcp://localhost:61616");
var connectionFactory = new Apache.NMS.ActiveMQ.ConnectionFactory(uri);
using IConnection connection = connectionFactory.CreateConnection();
using ISession session = connection.CreateSession( AcknowledgementMode.AutoAcknowledge);
using IDestination destination = session.GetQueue(queueName);
connection.Start();
using IMessageProducer producer = session.CreateProducer(destination);
producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
producer.Priority = MsgPriority.Normal;
producer.RequestTimeout = TimeSpan.FromSeconds(1000.0);
int numDrainTargets = 0;
int numMsgsToSend = 10000;
for( int ctr = 0; ctr < numMsgsToSend; ++ctr)
{
var msg = producer.CreateTextMessage();
msg.Text = $"msg: {ctr,4}";
int tagIndex = rnd.Next(tags.Length);
if (tagIndex == 0)
++numDrainTargets;
msg.Properties.SetString("MyKey", tags[tagIndex]);
producer.Send(msg);
}
numDrainTargets.Dump("numDrainTargets");
"exiting producer".Dump();
drainer.linq
string queueName = "queue.1";
Uri uri = new Uri("activemq:tcp://localhost:61616");
IConnectionFactory factory = new Apache.NMS.ActiveMQ.ConnectionFactory(uri);
using IConnection connection = factory.CreateConnection();
using ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
using IDestination destination = session.GetQueue(queueName);
connection.Start();
string selector = $"MyKey = 'aaaaaaaa'";
using IMessageConsumer consumer = session.CreateConsumer(destination, selector);
var waitTime = TimeSpan.FromSeconds(0.1);
int drainedCtr = 0;
IMessage msg;
while((msg = consumer.Receive(waitTime)) != null)
{
++drainedCtr;
}
drainedCtr.Dump("num drained");