1

我使用以下示例作为参考点来创建部署 MassTransit。我的目标是模拟恢复(它适用)缩放(也适用)取消(无法使其工作并需要帮助)继续响应/进度(工作进度 - 可能是我的下一个任务)

(更多描述,因为我被搁置了感谢指导我的家伙)这些是需要的 nuget 包

MassTransit MassTransit.RabbitMQ RabbitMQ.Client TopShelf

这就是我拥有的制作人。它只需要一个字符串来触发一个作业并从该作业中获取响应。

内部课程
    {
        私有静态无效 Main()
        {
            IBusControl busControl = CreateBus();
            总线控制.Start();
            尝试
            {
                为了 (; ; )
                {
                    Console.Write("要处理的消息(退出退出):");
                    字符串 customerId = Console.ReadLine();
                    if (customerId == "退出")
                        休息;

                var serviceAddress = new Uri("rabbitmq://localhost/request_service");
                IRequestClient<ISimpleRequest, ISimpleResponse> c2 =
                    busControl.CreateRequestClient<ISimpleRequest, ISimpleResponse>(serviceAddress, TimeSpan.FromDays(10));
                Task.Run(async () =>
                {
                    ISimpleResponse response = await c2.Request(new SimpleRequest(customerId));

                    Console.WriteLine("Processing of message {0} is successful", response.CusomerName);
                });
            }
        }
        catch (Exception ex)
        {Console.WriteLine("Exception!!! OMG!!! {0}", ex);}
        finally
        {
            busControl.Stop();
        }
    }

    private static IBusControl CreateBus()
    {
        return Bus.Factory.CreateUsingRabbitMq(x => x.Host(new Uri("rabbitmq://localhost"), h =>
        {
            h.Username("guest");
            h.Password("guest");
        }));
    }
}

Below is the code for consumer. The consumer just picks up a message and have a Thread of 30 sec to simulate long running jobs.

    class Program
    {
        static int Main(string[] args)
        {
            return (int)HostFactory.Run(x => x.Service());
        }
    }

public class InitiateServiceBus : ServiceControl { IBusControl _busControl; public bool Start(HostControl hostControl) { _busControl = Bus.Factory.CreateUsingRabbitMq(x => { IRabbitMqHost host = x.Host(new Uri("rabbitmq://localhost"), h => { h.Username("guest"); h.Password("guest"); }); x.ReceiveEndpoint(host, "request_service", e => { e.Consumer<RequestConsumer>(); }); }); _busControl.Start(); return true; } public bool Stop(HostControl hostControl) { _busControl.Stop(); return true; } } public class RequestConsumer : IConsumer<ISimpleRequest> { private readonly ILog _log = Logger.Get<RequestConsumer>(); public async Task Consume(ConsumeContext<ISimpleRequest> context) { _log.InfoFormat("I am consumer {0}", context.Message.CustomerId); Thread.Sleep(TimeSpan.FromSeconds(30)); context.Respond(new SimpleResponse { CusomerName = "Processing Finished " }); } private class SimpleResponse :ISimpleResponse { public string CusomerName { get; set; } } }

And these are the messages i send public interface ISimpleRequest { DateTime Timestamp { get; } string CustomerId { get; } }

public interface ISimpleResponse { string CusomerName { get; } } public class SimpleRequest : ISimpleRequest { private readonly string _customerId; private readonly DateTime _timestamp; public SimpleRequest(string customerId) { _customerId = customerId; _timestamp = DateTime.UtcNow; } public DateTime Timestamp { get { return _timestamp; } } public string CustomerId { get { return _customerId; } } }

任何提示都会非常有帮助。

4

2 回答 2

1

我找到了一个解决方案,所以如果我使用它

e.Consumer(() => { return reqConume; //这是一个单例 });

并作为单身人士而不是

e.消费者();

在消费者中,我有相同的消费者实例,我可以直接监控消费者。也向它发送一个取消事件。抱歉给各位添麻烦了

于 2015-12-04T15:03:47.707 回答
0

您应该使用任务取消来实现这一点。根据 MSDN

System.Threading.Tasks.Task 和 System.Threading.Tasks.Task 类支持通过使用 .NET Framework 中的取消标记进行取消。有关详细信息,请参阅托管线程中的取消。

请查看以下链接了解更多详情 https://msdn.microsoft.com/en-us/library/dd997396(v=vs.110).aspx

于 2015-12-04T11:41:45.887 回答