0

我想将来自不同订阅的事件流式传输到 azure 上的单个 eventthub。目前,我已将 eventthub 配置为单个订阅,并且正在流式传输事件。我有一个 Java 客户端,它使用这些事件并将其存储在我的持久层上。我的java客户端看起来像..

private void processUsingProcessorClient(){
        List<Disposable> subscriptions = null;
        try {

            EventHubConsumerAsyncClient eventHubConsumerAsyncClient = new EventHubClientBuilder()
                    .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
                    .connectionString(CONNECTION_STRING, EVENT_HUB_NAME)
                    .credential("*******.servicebus.windows.net","maney-event-hub",createClientSecretCredential())
                    .buildAsyncConsumerClient();
            ReceiveOptions receiveOptions = new ReceiveOptions().setTrackLastEnqueuedEventProperties(true);
            List<String> block = eventHubConsumerAsyncClient.getPartitionIds().collectList().block();
            Iterator<String> iterator = block.stream().iterator();
            String partitionID = null;
            subscriptions = new ArrayList<>(block.size());
            while(iterator.hasNext()){
                partitionID = iterator.next();
                Disposable subscription = eventHubConsumerAsyncClient.receiveFromPartition(
                        partitionID,
                        EventPosition.fromOffset(0),receiveOptions).subscribe(PARTITION_PROCESSOR,ERROR_HANDLER);
                subscriptions.add(subscription);
            }
            System.in.read();
        }catch (Exception ex){
            ex.printStackTrace();
        } finally {
            if(subscriptions != null){
                subscriptions.forEach( subscrip -> {
                    subscrip.dispose();
                });
            }
        }
    }


    private final Consumer<PartitionEvent> PARTITION_PROCESSOR = partitionEvent -> {
        EventData event = partitionEvent.getData();
        PartitionContext partitionContext = partitionEvent.getPartitionContext();
        String contents = new String(event.getBody(), UTF_8);

        LastEnqueuedEventProperties properties = partitionEvent.getLastEnqueuedEventProperties();
        System.out.printf("Information received at %s. Last enqueued sequence number: %s%n",properties.getRetrievalTime(), properties.getSequenceNumber());

        System.out.printf("Partition[%s] with Offset-[%s] and Sequence Number[%s] has contents: '%s'%n",
                partitionContext.getPartitionId(),
                event.getOffset(),
                event.getSequenceNumber(),
                contents);
    };

    private final Consumer<Throwable> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor");
        errorContext.printStackTrace();
    };


    public ClientSecretCredential createClientSecretCredential() {
        ClientSecretCredential clientSecretCredential = new ClientSecretCredentialBuilder()
                .clientId("****************")
                .clientSecret("******************")
                .tenantId("**********************")
                .build();
        return clientSecretCredential;
    }

我能够从一个订阅中读取所有事件。但是,我也需要对来自不同订阅的这些事件进行数据分析。如何配置 Azure Eventhub 以收听多个订阅?

我阅读了有关创建消费者组以解决此问题的 Stackoverflow 建议,但是我无法弄清楚如何?我确实创建了消费者组,但是如何将新创建的消费者组连接到我的 azure aaccount 中的不同订阅,并将事件流式传输到我刚刚创建的 eventthub?[注意:我已关注 - https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-create 在 azure 上创建一个 evenhub]

以防万一我需要澄清我在说什么订阅,下面是截图 在此处输入图像描述

我如何实现这一目标?先感谢您

马尼

4

1 回答 1

2

所以我想出了一种方法来解决我的问题(如上所述)。在浏览了 Microsoft 文档和一些反复试验的方法后,我是这样解决的;

我有 SUBSCRIPTION-1 和 SUBSCRIPTION-2。我在 SUBSCRIPTION-2 中创建了一个 eventthub。我转到 SUBSCRIPTION-1 并创建一个资源组。创建资源组后,我创建了一个 EVENT-GRID。在 eventgrid 中,我创建了一个 EVENT-SUBSCRIPTION,它提供了一个将其指向端点的选项。我选择了端点并选择了在 SUBSCRIPTION-1 中创建的 eventhub。

现在,我可以将所有事件从 SUBSCRIPTION-1 流式传输到 SUBSCRIPTION-2。

-马尼

于 2020-12-26T17:58:38.987 回答