0

我正在尝试使用 EventProcessorHost 在 JAVA 中使用来自 eventthub 的消息。遵循here1here2给出的步骤,但是当我运行代码时,我遇到了错误..

Registering host named <name>

Failure while registering: java.util.concurrent.ExecutionException: com.microsoft.azure.storage.StorageException: The client could not finish the operation within specified maximum execution timeout.
    Press enter to stop

更多细节:

  1. 如博客中所述创建了新的存储帐户,还创建了 blob。

  2. EventProcessor 类和 ErrorNotificationHandler 类与 Microsoft 文档中提到的完全相同。

  3. 调用寄存器的代码如下...

    final String consumerGroupName = "test";
    final String namespaceName = "--namespace--";
    final String eventHubName = "--eventhub name --";
    final String sasKeyName = "--saskey name--";
    final String sasKey = "--sas-key";
    final String containerName = "--container name --";
    final String storageAccountName = "--storage account name --";
    final String storageAccountKey = "--storage -- account key--";
    final String connectionString = "--eventhub connection string copied from azure portal --";
    final String storageConString = "--storage connection string copied from azure portal --";
    
        EventProcessorHost host = new EventProcessorHost(
        namespaceName, // Also tried with full name <namespace name>.servicebus.windows.net
        eventHubName,
        consumerGroupName,
        connectionString,
        storageConString,
        containerName
        //        "<blobprefix>" Also tried with blob prefix
        );
    
        System.out.println("Registering host named " + host.getHostName());
        EventProcessorOptions options = new EventProcessorOptions();
        options.setReceiveTimeOut(Duration.ofMinutes(12 L));
        options.setExceptionNotification(new ErrorNotificationHandler());
    
        try {
    
        host.registerEventProcessor(EventProcessor.class, options).get(15, TimeUnit.MINUTES);
    
        } catch (Exception e) {
        System.out.print("Failure while registering: ");
        if (e instanceof ExecutionException) {
        Throwable inner = e.getCause();
        System.out.println(inner.toString());
        } else {
        System.out.println(e.toString());
        }
        }

注意:博客已弃用 EventProcessorHost 的构造函数。

在线搜索错误,我能找到的是增加存储超时,但不确定如何证明 EventProcessorHost 中存储的特定超时。任何帮助表示赞赏。

4

1 回答 1

1

我按照官方文档从事件中心接收消息,它对我来说效果很好。

在此处输入图像描述

消息存储在我配置的 Azure Blob 存储中。

在此处输入图像描述

根据您的描述,即使您在 EventProcessorOptions 中将 ReceiveTimeOut 设置为 12 分钟,仍然会发生超时异常。根据我的经验,即使是一条一条地收到消息,也不会花 12 分钟那么长的时间。而且我们通常不可能在我们的应用程序中设置这么长的超时时间。所以我认为错误不再来自 SDK 级别。可能是网络环境问题。

据我所知,事件中心消息发送客户端使用HTTP protocol并且不会被防火墙阻止。EPH 消息接收客户端使用 AMQP 协议,该协议本质TCP protocol上将受到代理服务器设置或防火墙的限制。我假设您的 Eph 客户端与事件中心没有连接。

我不知道你的实际网络环境,建议你检查一下你的代理设置或者防火墙白名单来排查网络环境的问题。

有任何疑虑,请告诉我。

于 2017-12-14T06:37:40.450 回答