1
Have written a service which reads events from events hub, in local system its working but when deployed as an App service to cloud not able to reads the events.
Below is the stack trace while reading events from Azure eventhub.

> 2020-04-07 09:42:59.021  INFO 54245 --- [pool-4-thread-4]
> c.m.a.eventhubs.impl.BaseLinkHandler     : closeSession for
> clientName[cbs], linkName[cbs:sender], errorCondition[null],
> errorDescription[null] 2020-04-07 09:42:59.022  INFO 54245 ---
> [pool-4-thread-4] c.m.a.eventhubs.impl.BaseLinkHandler     :
> onLinkLocalClose clientName[cbs], linkName[cbs:receiver],
> errorCondition[null], errorDescription[null] 2020-04-07 09:42:59.022 
> INFO 54245 --- [pool-4-thread-4]
> c.m.azure.eventhubs.impl.SessionHandler  : onSessionLocalClose
> connectionId[cbs-session], entityName[MF_7a461b_1586238177759],
> condition[Error{condition=null, description='null', info=null}]
> 2020-04-07 09:42:59.061  INFO 54245 --- [pool-4-thread-4]
> c.m.a.eventhubs.impl.BaseLinkHandler     : onLinkRemoteClose
> clientName[cbs], linkName[cbs:sender], errorCondition[null],
> errorDescription[null] 2020-04-07 09:42:59.061  INFO 54245 ---
> [pool-4-thread-4] c.m.a.eventhubs.impl.BaseLinkHandler     :
> processOnClose clientName[cbs], linkName[cbs:sender],
> errorCondition[null], errorDescription[null] 2020-04-07 09:42:59.061 
> INFO 54245 --- [pool-4-thread-4] c.m.a.eventhubs.impl.BaseLinkHandler 
> : onLinkRemoteClose clientName[cbs], linkName[cbs:receiver],
> errorCondition[null], errorDescription[null] 2020-04-07 09:42:59.061 
> INFO 54245 --- [pool-4-thread-4] c.m.a.eventhubs.impl.BaseLinkHandler 
> : processOnClose clientName[cbs], linkName[cbs:receiver],
> errorCondition[null], errorDescription[null] 2020-04-07 09:42:59.061 
> INFO 54245 --- [pool-4-thread-4] c.m.a.e.impl.RequestResponseOpener   
> : requestResponseChannel.onClose complete
> clientId[MF_7a461b_1586238177759], session[cbs-session], link[cbs],
> endpoint[$cbs] 2020-04-07 09:42:59.062  INFO 54245 ---
> [pool-4-thread-4] c.m.a.eventhubs.impl.MessagingFactory    :
> messagingFactory[MF_7a461b_1586238177759],
> hostName[products-dev.servicebus.windows.net], info[cbsChannel closed]
> 2020-04-07 09:42:59.062  INFO 54245 --- [pool-4-thread-4]
> c.m.a.eventhubs.impl.ConnectionHandler   : onConnectionRemoteClose
> hostname[products-dev.servicebus.windows.net:5671],
> connectionId[MF_7a461b_1586238177759], errorCondition[null],
> errorDescription[null] 2020-04-07 09:42:59.062  WARN 54245 ---
> [pool-4-thread-4] c.m.a.eventhubs.impl.MessagingFactory    :
> onConnectionError messagingFactory[MF_7a461b_1586238177759],
> hostname[products-dev.servicebus.windows.net], error[null] 2020-04-07
> 09:42:59.062  INFO 54245 --- [pool-4-thread-4]
> c.m.a.eventhubs.impl.ConnectionHandler   : onTransportClosed
> hostname[products-dev.servicebus.windows.net:5671],
> connectionId[MF_7a461b_1586238177759], error[n/a] 2020-04-07
> 09:42:59.062  INFO 54245 --- [pool-4-thread-4]
> c.m.a.eventhubs.impl.CustomIOHandler     : onTransportClosed
> name[MF_7a461b_1586238177759],
> hostname[products-dev.servicebus.windows.net:5671] 2020-04-07
> 09:42:59.062  INFO 54245 --- [pool-4-thread-4]
> c.m.a.eventhubs.impl.ConnectionHandler   : onConnectionUnbound
> hostname[products-dev.servicebus.windows.net:5671],
> connectionId[MF_7a461b_1586238177759], state[CLOSED],
> remoteState[CLOSED] 2020-04-07 09:42:59.062  INFO 54245 ---
> [pool-4-thread-4] c.m.azure.eventhubs.impl.SessionHandler  :
> onSessionFinal connectionId[MF_7a461b_1586238177759],
> entityName[cbs-session], condition[null], description[null] 2020-04-07
> 09:42:59.062  INFO 54245 --- [pool-4-thread-4]
> c.m.azure.eventhubs.impl.SessionHandler  : onSessionFinal
> connectionId[MF_7a461b_1586238177759], entityName[products],
> condition[null], description[null] 2020-04-07 09:42:59.062  INFO 54245
> --- [pool-4-thread-4] c.m.a.eventhubs.impl.ConnectionHandler   : onConnectionFinal hostname[products-dev.servicebus.windows.net:5671],
> connectionId[MF_7a461b_1586238177759], errorCondition[null],
> errorDescription[null] 2020-04-07 09:42:59.062  WARN 54245 ---
> [pool-4-thread-4] c.m.a.eventhubs.impl.MessagingFactory    :
> messagingFactory[MF_7a461b_1586238177759],
> hostName[products-dev.servicebus.windows.net], message[stopping the
> reactor because thread was interrupted or the reactor has no more
> events to process.]


Code:

向 EventProcessorHost 的实例注册事件处理器类会启动事件处理。主机实例在事件中心的某些分区上获得租约,可能会从其他主机实例中窃取一些租约,以一种在所有主机实例上均匀分布的方式收敛。对于每个租用的分区,主机实例创建所提供事件处理器类的实例,然后从该分区接收事件并将它们传递给事件处理器实例。

EventProcessorHost 中有两个错误通知系统。与特定分区相关的错误通知(例如接收器失败)通过 onError 方法传递给该分区的事件处理器实例。与特定分区无关的错误通知(例如初始化失败)将传递给通过 EventProcessorOptions 对象指定的通用通知处理程序。您不需要提供这样的通知处理程序,但如果您不提供,那么您可能不知道发生了某些错误。

@RestController 公共类 ReceiveEventsController {

private static final Logger logger = LoggerFactory.getLogger(ReceiveEventsController.class);

@Value("${spring.cloud.azure.eventhub.namespace}")
private String namespaceName;

@Value("${spring.cloud.azure.eventhub.name}")
private String eventHubName;

@Value("${spring.cloud.azure.eventhub.sas.key.name}")
private String sasKeyName;

@Value("${spring.cloud.azure.eventhub.sas.key.value}")
private String sasKey;

@Value("${spring.cloud.stream.bindings.input.group}")
private String consumerGroupName;

@Value("${spring.cloud.azure.eventhub.storage.connection.string}")
private String storageConnectionString;

@Value("${spring.cloud.azure.eventhub.checkpoint-container}")
private String storageContainerName;

@Value("${spring.cloud.azure.eventhub.storage.hostname.prefix}")
private String hostNamePrefix;

@PostMapping("/receive/events")
public String postMessage() throws EventHubException, IOException, InterruptedException, ExecutionException, URISyntaxException {

    URI uri = new URI("sb://products-dev.servicebus.windows.net");
    ConnectionStringBuilder eventHubConnectionString = new ConnectionStringBuilder()
            .setEndpoint(uri)
            .setEventHubName(eventHubName)
            .setSasKeyName(sasKeyName)
            .setSasKey(sasKey);
    EventProcessorHost host = EventProcessorHost.EventProcessorHostBuilder
            .newBuilder(EventProcessorHost.createHostName(hostNamePrefix), consumerGroupName)
            .useAzureStorageCheckpointLeaseManager(storageConnectionString, storageContainerName, null)
            .useEventHubConnectionString(eventHubConnectionString.toString(), eventHubName)
            .build();

    logger.info("Registering host named " + host.getHostName()+ "Endpoint " + eventHubConnectionString.getEndpoint());
    EventProcessorOptions options = new EventProcessorOptions();
    options.setExceptionNotification(new ErrorNotificationHandler());

    host.registerEventProcessor(EventProcessor.class, options)
    .whenComplete((unused, e) ->
    {
        if (e != null)
        {
            logger.info("Failure while registering: " + e.toString());
            if (e.getCause() != null)
            {
                logger.info("Inner exception: " + e.getCause().toString());
            }
        }
    })
    .thenAccept((unused) ->
    {
        logger.info("Press enter to stop.");
        try 
        {
            System.in.read();
        }
        catch (Exception e)
        {
            logger.info("Keyboard read failed: " + e.toString());
        }
    })
    .thenCompose((unused) ->
    {
        return host.unregisterEventProcessor();
    })
    .exceptionally((e) ->
    {
        logger.info("Failure while unregistering: " + e.toString());
        if (e.getCause() != null)
        {
            logger.info("Inner exception: " + e.getCause().toString());
        }
        return null;
    })
    .get();

    logger.info("End of PRODUCT");
    return "Event Received";
}

}

4

0 回答 0