IBM 队列 (insert_queue) 中有 100 条消息。我想消费 10 并存储在一个对象中并处理 10 个消费消息并等待一段时间,然后确认 10 个消费消息。
我的示例代码:
public void consume(String queueName, int maxloadcount) throws Exception {
Source<Message, KillSwitch> jmsSource =
JmsConsumer.create(
JmsConsumerSettings.create(IBMQueueConnectionFactory.getMQQueueConnectionFactory())
.withQueue(queueName)
.withSessionCount(1)
.withBufferSize(10)
.withAcknowledgeMode(AcknowledgeMode.ClientAcknowledge()));
CompletionStage<List<Message>> result =
jmsSource
.take(maxloadcount)
.map(message -> {
return message;
})
.runWith(Sink.seq(), materializer);
final List<Message> outMessages = result.toCompletableFuture().get(3, TimeUnit.SECONDS);
for (Message outMsg : outMessages) {
BytesMessage msg = (BytesMessage) outMsg;
byte[] messageBody = msg.getBody(byte[].class);
System.out.println(new String(messageBody));
}
TimeUnit.SECONDS.sleep(5);
outMessages.stream().forEach(message -> {
try {
message.acknowledge();
}
catch (Exception e){
e.printStackTrace();
}
});
}
这不起作用