-1

IBM 队列 (insert_queue) 中有 100 条消息。我想消费 10 并存储在一个对象中并处理 10 个消费消息并等待一段时间,然后确认 10 个消费消息。

我的示例代码:

public void consume(String queueName, int maxloadcount) throws Exception {

  Source<Message, KillSwitch> jmsSource =
            JmsConsumer.create(
                    JmsConsumerSettings.create(IBMQueueConnectionFactory.getMQQueueConnectionFactory())
                            .withQueue(queueName)
                            .withSessionCount(1)
                            .withBufferSize(10)
                            .withAcknowledgeMode(AcknowledgeMode.ClientAcknowledge()));

    CompletionStage<List<Message>> result =
            jmsSource
                    .take(maxloadcount)
                    .map(message -> {
                        return message;
                    })
                    .runWith(Sink.seq(), materializer);

    final List<Message> outMessages = result.toCompletableFuture().get(3, TimeUnit.SECONDS);
    for (Message outMsg : outMessages) {
        BytesMessage msg = (BytesMessage) outMsg;
        byte[] messageBody = msg.getBody(byte[].class);
        System.out.println(new String(messageBody));
    }
    TimeUnit.SECONDS.sleep(5);
    outMessages.stream().forEach(message -> {
        try {
           message.acknowledge();
        }
        catch (Exception e){
            e.printStackTrace();
        }
    });

}

这不起作用

4

1 回答 1

0

Source.take一旦消耗给定数量的元素,就完成流。你可能想要的是Source.throttle. 以下示例每 5 秒向下游发送最多 10 条消息:

CompletionStage<List<Message>> result =
  jmsSource
    .throttle(10, java.time.Duration.ofSeconds(5L))
    .map(message -> {
      message.acknowledge();
      return message;
    })
    .runWith(Sink.seq(), materializer);

或者,看看Source.groupedWithin,它将流“分成在时间窗口内接收的元素组,或受给定元素数量限制,无论先发生什么”。

于 2018-08-10T17:23:10.727 回答