我使用以下示例作为参考点来创建部署 MassTransit。我的目标是模拟恢复(它适用)缩放(也适用)取消(无法使其工作并需要帮助)继续响应/进度(工作进度 - 可能是我的下一个任务)
(更多描述,因为我被搁置了感谢指导我的家伙)这些是需要的 nuget 包
MassTransit MassTransit.RabbitMQ RabbitMQ.Client TopShelf
这就是我拥有的制作人。它只需要一个字符串来触发一个作业并从该作业中获取响应。
内部课程 { 私有静态无效 Main() { IBusControl busControl = CreateBus(); 总线控制.Start(); 尝试 { 为了 (; ; ) { Console.Write("要处理的消息(退出退出):"); 字符串 customerId = Console.ReadLine(); if (customerId == "退出") 休息;var serviceAddress = new Uri("rabbitmq://localhost/request_service"); IRequestClient<ISimpleRequest, ISimpleResponse> c2 = busControl.CreateRequestClient<ISimpleRequest, ISimpleResponse>(serviceAddress, TimeSpan.FromDays(10)); Task.Run(async () => { ISimpleResponse response = await c2.Request(new SimpleRequest(customerId)); Console.WriteLine("Processing of message {0} is successful", response.CusomerName); }); } } catch (Exception ex) {Console.WriteLine("Exception!!! OMG!!! {0}", ex);} finally { busControl.Stop(); } } private static IBusControl CreateBus() { return Bus.Factory.CreateUsingRabbitMq(x => x.Host(new Uri("rabbitmq://localhost"), h => { h.Username("guest"); h.Password("guest"); })); } }
Below is the code for consumer. The consumer just picks up a message and have a Thread of 30 sec to simulate long running jobs.
class Program { static int Main(string[] args) { return (int)HostFactory.Run(x => x.Service()); } }public class InitiateServiceBus : ServiceControl { IBusControl _busControl; public bool Start(HostControl hostControl) { _busControl = Bus.Factory.CreateUsingRabbitMq(x => { IRabbitMqHost host = x.Host(new Uri("rabbitmq://localhost"), h => { h.Username("guest"); h.Password("guest"); }); x.ReceiveEndpoint(host, "request_service", e => { e.Consumer<RequestConsumer>(); }); }); _busControl.Start(); return true; } public bool Stop(HostControl hostControl) { _busControl.Stop(); return true; } } public class RequestConsumer : IConsumer<ISimpleRequest> { private readonly ILog _log = Logger.Get<RequestConsumer>(); public async Task Consume(ConsumeContext<ISimpleRequest> context) { _log.InfoFormat("I am consumer {0}", context.Message.CustomerId); Thread.Sleep(TimeSpan.FromSeconds(30)); context.Respond(new SimpleResponse { CusomerName = "Processing Finished " }); } private class SimpleResponse :ISimpleResponse { public string CusomerName { get; set; } } }
And these are the messages i send public interface ISimpleRequest { DateTime Timestamp { get; } string CustomerId { get; } }
public interface ISimpleResponse { string CusomerName { get; } } public class SimpleRequest : ISimpleRequest { private readonly string _customerId; private readonly DateTime _timestamp; public SimpleRequest(string customerId) { _customerId = customerId; _timestamp = DateTime.UtcNow; } public DateTime Timestamp { get { return _timestamp; } } public string CustomerId { get { return _customerId; } } }
任何提示都会非常有帮助。