我想将来自不同订阅的事件流式传输到 azure 上的单个 eventthub。目前,我已将 eventthub 配置为单个订阅,并且正在流式传输事件。我有一个 Java 客户端,它使用这些事件并将其存储在我的持久层上。我的java客户端看起来像..
private void processUsingProcessorClient(){
List<Disposable> subscriptions = null;
try {
EventHubConsumerAsyncClient eventHubConsumerAsyncClient = new EventHubClientBuilder()
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.connectionString(CONNECTION_STRING, EVENT_HUB_NAME)
.credential("*******.servicebus.windows.net","maney-event-hub",createClientSecretCredential())
.buildAsyncConsumerClient();
ReceiveOptions receiveOptions = new ReceiveOptions().setTrackLastEnqueuedEventProperties(true);
List<String> block = eventHubConsumerAsyncClient.getPartitionIds().collectList().block();
Iterator<String> iterator = block.stream().iterator();
String partitionID = null;
subscriptions = new ArrayList<>(block.size());
while(iterator.hasNext()){
partitionID = iterator.next();
Disposable subscription = eventHubConsumerAsyncClient.receiveFromPartition(
partitionID,
EventPosition.fromOffset(0),receiveOptions).subscribe(PARTITION_PROCESSOR,ERROR_HANDLER);
subscriptions.add(subscription);
}
System.in.read();
}catch (Exception ex){
ex.printStackTrace();
} finally {
if(subscriptions != null){
subscriptions.forEach( subscrip -> {
subscrip.dispose();
});
}
}
}
private final Consumer<PartitionEvent> PARTITION_PROCESSOR = partitionEvent -> {
EventData event = partitionEvent.getData();
PartitionContext partitionContext = partitionEvent.getPartitionContext();
String contents = new String(event.getBody(), UTF_8);
LastEnqueuedEventProperties properties = partitionEvent.getLastEnqueuedEventProperties();
System.out.printf("Information received at %s. Last enqueued sequence number: %s%n",properties.getRetrievalTime(), properties.getSequenceNumber());
System.out.printf("Partition[%s] with Offset-[%s] and Sequence Number[%s] has contents: '%s'%n",
partitionContext.getPartitionId(),
event.getOffset(),
event.getSequenceNumber(),
contents);
};
private final Consumer<Throwable> ERROR_HANDLER = errorContext -> {
System.out.printf("Error occurred in partition processor");
errorContext.printStackTrace();
};
public ClientSecretCredential createClientSecretCredential() {
ClientSecretCredential clientSecretCredential = new ClientSecretCredentialBuilder()
.clientId("****************")
.clientSecret("******************")
.tenantId("**********************")
.build();
return clientSecretCredential;
}
我能够从一个订阅中读取所有事件。但是,我也需要对来自不同订阅的这些事件进行数据分析。如何配置 Azure Eventhub 以收听多个订阅?
我阅读了有关创建消费者组以解决此问题的 Stackoverflow 建议,但是我无法弄清楚如何?我确实创建了消费者组,但是如何将新创建的消费者组连接到我的 azure aaccount 中的不同订阅,并将事件流式传输到我刚刚创建的 eventthub?[注意:我已关注 - https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-create 在 azure 上创建一个 evenhub]
我如何实现这一目标?先感谢您
马尼