1

我有一个天蓝色工作人员角色,其事件处理器主机连接到天蓝色事件中心。由于某些未知原因 - 它不会收到任何消息。

日志显示它EventProcessor为每个分区打开一个 - 并且没有错误 - 但从ProcessEventsAsync未被调用。

使用服务总线资源管理器,我可以看到它在处理器关闭时接收消息,而当它启动时,它会抛出接收器打开的异常。

  • 我确实让它工作过一次,但重新启动后它没有继续工作

我不知道下一步该往哪里看 - 但是这是工人角色的代码

public class WorkerRole : RoleEntryPoint
{
    private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
    private readonly ManualResetEvent _runCompleteEvent = new ManualResetEvent(false);

    private EventProcessorHost _eventProcessorHost;
    private IEventProcessorFactory _processorFactory;
    private ConfigurationProvider configuration = new ConfigurationProvider();
    private string _eventHubConnectionString;
    private string _storageAccountConnectionString;
    private string _dbConnectionString;

    public override void Run()
    {
        Trace.TraceInformation("EventHubWorker is running");


        try
        {
            RunAsync(_cancellationTokenSource.Token).Wait();
        }
        finally
        {
            _runCompleteEvent.Set();
        }
    }   

    public override bool OnStart()
    {
        Trace.TraceInformation("EventHubWorker is starting");
        CompositeResolver.RegisterAndSetAsDefault(FormattersResolver.Instance, ContractlessStandardResolver.Instance, StandardResolver.Instance);
        // Set the maximum number of concurrent connections
        ServicePointManager.DefaultConnectionLimit = 12;
        SqlMapper.AddTypeHandler(new DateTimeHandler());
        _eventHubConnectionString = configuration.EventHubConnectionString;
        _dbConnectionString = configuration.DbConnectionString;
        _storageAccountConnectionString = configuration.StorageConnectionString;
        string hostName = Guid.NewGuid().ToString();
        var eventClient = EventHubClient.CreateFromConnectionString(_eventHubConnectionString, configuration.EventHubName);

        _eventProcessorHost = new EventProcessorHost(hostName, eventClient.Path, configuration.ConsumerGroupName,
            _eventHubConnectionString, _storageAccountConnectionString);

        var partitionOptions = new PartitionManagerOptions()
        {
            LeaseInterval = new TimeSpan(0, 5, 0)
        };
        _processorFactory = new EventProcessorFactory(/* some data for dependency injection */);

        return base.OnStart();
    }

    public override void OnStop()
    {
        Trace.TraceInformation("EventHubWorker is stopping");

        _cancellationTokenSource.Cancel();
        _runCompleteEvent.WaitOne();
        base.OnStop();

        Trace.TraceInformation("EventHubWorker has stopped");
    }

    private async Task RunAsync(CancellationToken cancellationToken)
    {
        int retryCount = 0;
        var exceptions = new List<Exception>();
        async Task StartProcessing()
        {
            if (retryCount > 5)
            {
                throw new AggregateException($"failed to run service, tried {retryCount} times",exceptions);
            }
            try
            {
                await _eventProcessorHost.RegisterEventProcessorFactoryAsync(_processorFactory, new EventProcessorOptions
                {
                    InitialOffsetProvider = o => DateTime.UtcNow,
                    MaxBatchSize = 100,
                    PrefetchCount = 10,
                    ReceiveTimeOut = TimeSpan.FromSeconds(20),
                });
            }
            catch(MessagingException e) when (e.IsTransient)
            {
                retryCount++;
                exceptions.Add(e);
                await StartProcessing();
            }
        }
        var options = new EventProcessorOptions();
        options.ExceptionReceived += Options_ExceptionReceived;

        await StartProcessing();

        cancellationToken.WaitHandle.WaitOne();
        await _eventProcessorHost.UnregisterEventProcessorAsync();
    }

    private void Options_ExceptionReceived(object sender, ExceptionReceivedEventArgs e)
    {
        Trace.TraceError(e.Exception.Message);
    }
}

这是 EventProcessor 代码 - 工厂本身似乎无关紧要

class EventProcessor : IEventProcessor
{
    public async Task CloseAsync(PartitionContext context, CloseReason reason)
    {
                   //never logged
        Trace.TraceInformation($"Partition {context.Lease.PartitionId} Closed");
        if (reason == CloseReason.Shutdown)
        {
            await context.CheckpointAsync();
        }
        else
        {
            Trace.TraceError(reason.ToString());
        }
    }

    public Task OpenAsync(PartitionContext context)
    {
                    //always logs with the expected lease information
        Trace.TraceInformation($"Partition {context.Lease.PartitionId} initiailized with epoch {context.Lease.Epoch}");
        return Task.FromResult<object>(null);
    }

    public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        Trace.TraceInformation("processing event"); //never called
        // processing code
    }
4

1 回答 1

0

PartitionManagerOptions 的最大租约间隔为 60 秒,(与 blob 租约相同)EventProcessorHost 在最初获取租约时不会抛出异常。尝试将租用间隔设置为 60 秒而不是 5 分钟。

于 2018-01-26T07:23:29.973 回答