0

我将消息发送到天蓝色事件中心。但我无法从事件中心下载消息。

enter code here
string eventHubConnectionString = "<connection string>";
string eventHubName = "<event Hub name>";
string storageAccountName = "<event hub storage>";
string storageAccountKey = "<storage Key>";
string storageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}",storageAccountName, storageAccountKey);


EventProcessorHost eventProcessorHost = new EventProcessorHost("message", eventHubName,  EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString);
          eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>().Wait();


IEventProcessor:
enter code here
class SimpleEventProcessor : IEventProcessor

{


    Stopwatch checkpointStopWatch;

    async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
    {
        Console.WriteLine(string.Format("Processor Shuting Down.  Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason.ToString()));
        if (reason == CloseReason.Shutdown)
        {
            await context.CheckpointAsync();
        }
    }

    Task IEventProcessor.OpenAsync(PartitionContext context)
    {
        Console.WriteLine(string.Format("SimpleEventProcessor initialize.  Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset));
        this.checkpointStopWatch = new Stopwatch();
        this.checkpointStopWatch.Start();
        return Task.FromResult<object>(null);
    }

    async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        foreach (EventData eventData in messages)
        {
            string data = Encoding.UTF8.GetString(eventData.GetBytes());

            Console.WriteLine(string.Format("Message received.  Partition: '{0}', Data: '{1}'",
                context.Lease.PartitionId, data));
        }

        //Call checkpoint every 5 minutes, so that worker can resume processing from the 5 minutes back if it restarts.
        if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
        {
            await context.CheckpointAsync();
            lock (this)
            {
                this.checkpointStopWatch.Reset();
            }
        }
    }

}

它显示以下错误聚合异常处理。发生一个或多个错误。消息详细信息:不知道这样的主机 EventProcessor 主机名是什么?

它在这一行显示错误: eventProcessorHost.RegisterEventProcessorAsync().Wait();

它没有调用 IEventprocessor。是否有其他方法可以从事件中心消费消息?

4

2 回答 2

2

您可以在调试时跟踪您的异常并查找内部异常,因此这应该给您一个线索,真正的原因是什么。我也有这个愚蠢的异常,这是因为当您将 eventHubName 变量与 EventProcessorHost 一起使用时,它应该是小写的,(仅包含字母/数字和“-”,后面必须跟字母或数字,这意味着不支持“--”。eventHubName 也应该以字母开头)

即使事件中心名称是“myEventHub123”,您的变量也必须类似于:

string eventHubName = "myeventhub123";

希望这会对某人有所帮助..

于 2015-02-04T12:25:39.113 回答
0

我已经使用位于此处的示例代码成功构建了一个事件处理器。

很难从您的示例代码中看出错误是什么,因为它可能与您的连接字符串/eventhub/存储帐户名称中的拼写错误有关,因为它没有提供(您做得对,不要发布您的连接字符串敏感数据)。

示例从连接字符串加载事件中心信息的方式与您提供的代码方式之间的区别在于如何通过 Evenhub 客户端提供信息。尝试更新您构建的方式,EventProcessorHost如下例所示:

    EventHubClient eventHubClient = EventHubClient.CreateFromConnectionString(eventHubConnectionString, this.eventHubName); 

    // Get the default Consumer Group 
    defaultConsumerGroup = eventHubClient.GetDefaultConsumerGroup(); 
    string blobConnectionString = ConfigurationManager.AppSettings["AzureStorageConnectionString"]; // Required for checkpoint/state 
    eventProcessorHost = new EventProcessorHost("singleworker", eventHubClient.Path, defaultConsumerGroup.GroupName, this.eventHubConnectionString, blobConnectionString); 
    eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>().Wait(); 
于 2015-07-17T21:32:22.620 回答