1

按照Bootiful GCP 上的这个示例:与 Google Cloud Pub/Sub (4/8) 集成,我试图构建一个流程,从 Google Pubsub 订阅中读取数据并写入另一个主题。

DEBUG模式下启动我的应用程序后,我可以看到消息来自Google PubSub但它们没有因此而被“消耗”

osidispatcher.BroadcastingDispatcher :没有订阅者,默认行为是忽略

非常感谢您对此的任何帮助。

以下是我的主要代码的样子 -

public class PubsubRouteBuilderService {

    private final PubSubTemplate pubSubTemplate; // injected via Spring

    public PubsubRouteBuilderService(PubSubTemplate pubSubTemplate) {
        this.pubSubTemplate = pubSubTemplate;
    }

    public synchronized boolean buildRoute(PubsubRouteModel pubsubRouteModel) {
        log.info("Building route for: {}", pubsubRouteModel);
        buildPubsubRoute(pubsubRouteModel);
        // some unrelated logic
        return true;
    }

    private void buildPubsubRoute(PubsubRouteModel pubsubRouteModel) {

        final StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(
            RouteBuilderFactory
                    .messageChannelAdapter(
                            RouteBuilderFactory.getMessageChannel(),
                            pubSubTemplate,
                            pubsubRouteModel.getFromSub()))
            .handle(
                    message -> {
                        log.info("consumed new message: [" + message.getPayload() + "]");
                        AckReplyConsumer consumer = message.getHeaders()
                                .get(GcpPubSubHeaders.ORIGINAL_MESSAGE, AckReplyConsumer.class);
                        consumer.ack();
                    })
            .get();

        standardIntegrationFlow.start();
    }
}

以下是其他方法RouteBuilderFactory如下 -

public static MessageChannel getMessageChannel() {
    return MessageChannels.publishSubscribe().get();
}

public static PubSubInboundChannelAdapter messageChannelAdapter(MessageChannel inputChannel, PubSubTemplate pubSubTemplate, String channelName) {
    PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, channelName);
    adapter.setOutputChannel(inputChannel);
    adapter.setAckMode(AckMode.MANUAL);
    return adapter;
}
4

1 回答 1

0

您的代码似乎根本不是基于该博客文章...

private void buildPubsubRoute(PubsubRouteModel pubsubRouteModel) {

    final StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(
        RouteBuilderFactory
                .messageChannelAdapter(
                        RouteBuilderFactory.getMessageChannel(),
                        pubSubTemplate,
                        pubsubRouteModel.getFromSub()))
        .handle(
                message -> {
                    log.info("consumed new message: [" + message.getPayload() + "]");
                    AckReplyConsumer consumer = message.getHeaders()
                            .get(GcpPubSubHeaders.ORIGINAL_MESSAGE, AckReplyConsumer.class);
                    consumer.ack();
                })
        .get();

    standardIntegrationFlow.start();
}

你不能只是“启动”一些任意IntegrationFlow对象——它必须由 Spring 管理(声明为 a @Bean)。

该框架在幕后构建了一些基础设施来完成所有这些工作。

于 2020-05-14T02:04:17.570 回答