我使用以下示例作为参考点来创建部署 MassTransit。我的目标是模拟恢复(它适用)缩放(也适用)取消(无法使其工作并需要帮助)继续响应/进度(工作进度 - 可能是我的下一个任务)
(更多描述,因为我被搁置了感谢指导我的家伙)这些是需要的 nuget 包
MassTransit MassTransit.RabbitMQ RabbitMQ.Client TopShelf
这就是我拥有的制作人。它只需要一个字符串来触发一个作业并从该作业中获取响应。
内部课程
{
私有静态无效 Main()
{
IBusControl busControl = CreateBus();
总线控制.Start();
尝试
{
为了 (; ; )
{
Console.Write("要处理的消息(退出退出):");
字符串 customerId = Console.ReadLine();
if (customerId == "退出")
休息;
var serviceAddress = new Uri("rabbitmq://localhost/request_service");
IRequestClient<ISimpleRequest, ISimpleResponse> c2 =
busControl.CreateRequestClient<ISimpleRequest, ISimpleResponse>(serviceAddress, TimeSpan.FromDays(10));
Task.Run(async () =>
{
ISimpleResponse response = await c2.Request(new SimpleRequest(customerId));
Console.WriteLine("Processing of message {0} is successful", response.CusomerName);
});
}
}
catch (Exception ex)
{Console.WriteLine("Exception!!! OMG!!! {0}", ex);}
finally
{
busControl.Stop();
}
}
private static IBusControl CreateBus()
{
return Bus.Factory.CreateUsingRabbitMq(x => x.Host(new Uri("rabbitmq://localhost"), h =>
{
h.Username("guest");
h.Password("guest");
}));
}
}
Below is the code for consumer. The consumer just picks up a message and have a Thread of 30 sec to simulate long running jobs.
class Program
{
static int Main(string[] args)
{
return (int)HostFactory.Run(x => x.Service());
}
}
public class InitiateServiceBus : ServiceControl
{
IBusControl _busControl;
public bool Start(HostControl hostControl)
{
_busControl = Bus.Factory.CreateUsingRabbitMq(x =>
{
IRabbitMqHost host = x.Host(new Uri("rabbitmq://localhost"), h =>
{
h.Username("guest");
h.Password("guest");
});
x.ReceiveEndpoint(host, "request_service",
e =>
{
e.Consumer<RequestConsumer>();
});
});
_busControl.Start();
return true;
}
public bool Stop(HostControl hostControl)
{
_busControl.Stop();
return true;
}
}
public class RequestConsumer : IConsumer<ISimpleRequest>
{
private readonly ILog _log = Logger.Get<RequestConsumer>();
public async Task Consume(ConsumeContext<ISimpleRequest> context)
{
_log.InfoFormat("I am consumer {0}", context.Message.CustomerId);
Thread.Sleep(TimeSpan.FromSeconds(30));
context.Respond(new SimpleResponse
{
CusomerName = "Processing Finished "
});
}
private class SimpleResponse :ISimpleResponse
{
public string CusomerName { get; set; }
}
}
And these are the messages i send
public interface ISimpleRequest
{
DateTime Timestamp { get; }
string CustomerId { get; }
}
public interface ISimpleResponse
{
string CusomerName { get; }
}
public class SimpleRequest : ISimpleRequest
{
private readonly string _customerId;
private readonly DateTime _timestamp;
public SimpleRequest(string customerId)
{
_customerId = customerId;
_timestamp = DateTime.UtcNow;
}
public DateTime Timestamp
{
get { return _timestamp; }
}
public string CustomerId
{
get { return _customerId; }
}
}
任何提示都会非常有帮助。