我正在使用 azure 服务总线主题。因为我有大消息,所以我将大消息拆分并以带有 sessionid 和拆分顺序的小消息发送。我希望我的接收器具有事件驱动的架构。因为我必须接收具有相同 sessionid 的所有消息,并且必须使用正确的拆分顺序聚合它们。下面是我的代码。但只是我第一次从波纹管代码中收到消息。在第二条消息中它超时。
public class CRMESBListener : RoleEntryPoint
{
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);
public override void Run()
{
Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole is running");
try
{
DBMessageListener dbMessageListener = DBMessageListener.GetDBMessageListner();
dbMessageListener.Listen();
runCompleteEvent.WaitOne();
//this.RunAsync(this.cancellationTokenSource.Token).Wait();
}
finally
{
this.runCompleteEvent.Set();
}
}
public override bool OnStart()
{
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
// For information on handling configuration changes
// see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357.
bool result = base.OnStart();
Bootstrapper.Init();
Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole has been started");
return result;
}
public override void OnStop()
{
Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole is stopping");
this.cancellationTokenSource.Cancel();
this.runCompleteEvent.WaitOne();
base.OnStop();
Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole has stopped");
}
private async Task RunAsync(CancellationToken cancellationToken)
{
// TODO: Replace the following with your own logic.
while (!cancellationToken.IsCancellationRequested)
{
Trace.TraceInformation("Working");
await Task.Delay(1000);
}
}
}
public class DBMessageListener
{
#region Member Variables
private static DBMessageListener dbMessageListner;
private static object lockObject = new object();
private TopicSubscribeClientWrapper accountTopicClient;
private NamespaceManager namespaceManager;
private OnMessageOptions eventDrivenMessagingOptions;
private int crmIntegrationUserID = Common.CrmCurrentUser.UserID;
#endregion Member Variables
#region Constructors
private DBMessageListener()
{
string subscriptionName = "AllMessages";
namespaceManager = new NamespaceManager(ConfigurationSettings.ServiceBus_Pub_Sub_ConnectionString);
if (!namespaceManager.SubscriptionExists(ConfigurationSettings.ServiceBus_Pub_Sub_Internal_Account_ServicePath, subscriptionName))
{
namespaceManager.CreateSubscription(ConfigurationSettings.ServiceBus_Pub_Sub_Internal_Account_ServicePath, subscriptionName);
}
accountTopicClient = new TopicSubscribeClientWrapper(ConfigurationSettings.ServiceBus_Pub_Sub_ConnectionString, ConfigurationSettings.ServiceBus_Pub_Sub_Internal_Account_ServicePath);
accountTopicClient.SubscriptionName = subscriptionName;
eventDrivenMessagingOptions = new OnMessageOptions
{
AutoComplete = true
};
eventDrivenMessagingOptions.ExceptionReceived += OnExceptionReceived;
eventDrivenMessagingOptions.MaxConcurrentCalls = 5;
}
#endregion Constructors
#region Methods
private async System.Threading.Tasks.Task OnMessageArrived(BrokeredMessage message)
{
if (message != null)
{
try
{
await ProcessDBMessage(message.GetBody<ServiceBusMessage>());
}
catch (Exception ex)
{
//log exception
}
}
}
private void OnExceptionReceived(object sender, ExceptionReceivedEventArgs e)
{
if (e != null && e.Exception != null)
{
}
}
private async System.Threading.Tasks.Task ProcessDBMessage(ServiceBusMessage message)
{
//process message
}
public static DBMessageListener GetDBMessageListner()
{
if (dbMessageListner == null)
{
lock (lockObject)
{
if (dbMessageListner == null)
{
dbMessageListner = new DBMessageListener();
}
}
}
return dbMessageListner;
}
public void Listen()
{
accountTopicClient.OnMessageAsync(async message => await OnMessageArrived(message), eventDrivenMessagingOptions);
}
#endregion Methods
}
public class TopicSubscribeClientWrapper : IServiceBusClientWrapper
{
#region Member Variables
private readonly string _connectionString;
private readonly string _topicName;
private readonly TopicClient _topicClient;
private SubscriptionClient _subscriptionClient;
#endregion Member Variables
#region Properties
public string SubscriptionName { get; set; }
#endregion Properties
#region Constructors
public TopicSubscribeClientWrapper(string connectionString, string topicName)
{
_connectionString = connectionString;
_topicName = topicName;
_topicClient = TopicClient.CreateFromConnectionString(connectionString, topicName);
}
#endregion Constructors
#region Event Handlers
public void OnMessageAsync(Func<BrokeredMessage, Task> onMessageCallback, OnMessageOptions onMessageOptions)
{
_subscriptionClient = SubscriptionClient.CreateFromConnectionString(_connectionString, _topicName, SubscriptionName);
// _subscriptionClient.OnMessageAsync(onMessageCallback, onMessageOptions);
MemoryStream largeMessageStream = new MemoryStream();
MessageSession session = _subscriptionClient.AcceptMessageSession();
while (true)
{
BrokeredMessage subMessage = session.Receive(TimeSpan.FromSeconds(5));
if (subMessage != null)
{
Stream subMessageStream = subMessage.GetBody<Stream>();
subMessageStream.CopyTo(largeMessageStream);
subMessage.Complete();
//Console.Write(".");
}
else
{
//Console.WriteLine("Done!");
break;
}
}
BrokeredMessage largeMessage = new BrokeredMessage(largeMessageStream, true);
var message = onMessageCallback.Method.GetParameters();
message.SetValue(largeMessage, 1);
_subscriptionClient.OnMessageAsync(onMessageCallback, onMessageOptions);
}
#endregion Event Handlers
#region Methods
public Task SendAsync(BrokeredMessage message)
{
return _topicClient.SendAsync(message);
}
public void Close()
{
if (_subscriptionClient != null)
{
_subscriptionClient.Close();
}
_topicClient.Close();
}
#endregion Methods
}