使用 Azure 服务总线作为传输,但计划的消息除了从 IConsumer 内部调用时不起作用。
我花了几个小时和几天,仍然不知道发生了什么。
有人可以解释我需要做什么才能使用 azure 服务总线从状态机中获取日程安排吗?也许为什么调度消息在 IConsumer 上下文中起作用,但在其他任何地方都不起作用。
public class BatchCollector : MassTransitStateMachine<BufferSaga>
{
public BatchCollector(IBatchFactory batchFactory)
{
InstanceState(saga => saga.State);
Event(() => BufferedCommandDetected,
_ => _.CorrelateById(context => context.Message.GetBatchId()));
Schedule(() => WindowElapsed, x => x.BatchCompletionId, x =>
{
x.Delay = TimeSpan.FromSeconds(5);
x.Received = e => e.CorrelateById(context => context.Message.CorrelationId);
});
Initially(
When(BufferedCommandDetected)
.Then(
context =>
{
context.Instance.CorrelationId = context.Data.GetBatchId();
context.Instance.Id = Guid.NewGuid().ToString("N");
context.Instance.Buffer.Add(context.Data);
context.Instance.BatchStartTime = DateTimeOffset.Now;
context.Instance.AbsoluteDeadLine = DateTimeOffset.Now + context.Data.AbsoluteWindowSpan;
context.Instance.SlidingDeadLine = DateTimeOffset.Now + context.Data.SlidingWindowSpan;
})
.Schedule(WindowElapsed,
context => new WindowElapsed {CorrelationId = context.Instance.CorrelationId },
delayProvider: scheduleDelayProvider => scheduleDelayProvider.Data.SlidingWindowSpan < scheduleDelayProvider.Data.AbsoluteWindowSpan ? scheduleDelayProvider.Data.SlidingWindowSpan : scheduleDelayProvider.Data.AbsoluteWindowSpan)
.TransitionTo(Waiting));
During(Waiting,
When(BufferedCommandDetected)
.Then(context =>
{
context.Instance.SlidingDeadLine += context.Data.SlidingWindowSpan;
context.Instance.Buffer.Add(context.Data);
}),
When(WindowElapsed.Received, context => context.Instance.SlidingDeadLine > DateTimeOffset.Now && context.Instance.AbsoluteDeadLine > DateTimeOffset.Now)
.Schedule(WindowElapsed, context => new WindowElapsed { CorrelationId = context.Instance.CorrelationId }),
When(WindowElapsed.Received, context => context.Instance.SlidingDeadLine <= DateTimeOffset.Now || context.Instance.AbsoluteDeadLine <= DateTimeOffset.Now)
//.Unschedule(WindowElapsed)
.Publish(context => new Batch()
{
BatchId = context.Instance.BatchCompletionId ?? Guid.NewGuid(),
Content = context.Instance.Buffer,
StartTime = context.Instance.BatchStartTime,
EndTime = DateTimeOffset.Now
})
.Finalize()
.TransitionTo(BufferCompleted));
SetCompletedWhenFinalized();
}
public Event<BufferedCommand> BufferedCommandDetected { get; private set; }
public Schedule<BufferSaga, WindowElapsed> WindowElapsed { get; private set; }
public State Waiting { get; private set; }
public State BufferCompleted { get; private set; }
}
总线初始化:
container.RegisterType<IBusControl>(
new HierarchicalLifetimeManager(),
new InjectionFactory(c =>
{
var bus = Bus.Factory.CreateUsingAzureServiceBus(
cfg =>
{
var azSbHost = cfg.Host(new Uri(CloudConfigurationManager.GetSetting("ServiceBus.Url"))
, host =>
{
host.TokenProvider = TokenProvider
.CreateSharedAccessSignatureTokenProvider
(CloudConfigurationManager.GetSetting("ServiceBus.SharedAccessKeyName"),
CloudConfigurationManager.GetSetting("ServiceBus.AccessKey"),
TokenScope.Namespace);
});
cfg.ReceiveEndpoint(
azSbHost,
"Quartz.Scheduler",
sbConfig =>
{
cfg.UseMessageScheduler(sbConfig.InputAddress);
sbConfig.Consumer(() => new ScheduleMessageConsumer(c.Resolve<IScheduler>()));
}
);
cfg.ReceiveEndpoint(
azSbHost,
Assembly.GetExecutingAssembly().GetName().Name,
sbConfig =>
{
AllClasses.FromAssembliesInBasePath()
.Where(
@class =>
(@class?.Namespace?.StartsWith("bcn",
StringComparison.OrdinalIgnoreCase) ?? false)
&&
@class.GetParentClasses()
.Any(
parent =>
parent.Name.StartsWith("MassTransitStateMachine`1")))
.ForEach(@class =>
{
//dynamic cast to avoid having to deal with generic typing when type is not known until runtime.
dynamic stateMachineExtension =
new DynamicStaticWrapper(typeof(StateMachineSubscriptionExtensions));
stateMachineExtension
.StateMachineSaga(
sbConfig,
c.Resolve(@class),
c.Resolve(typeof(ISagaRepository<>).MakeGenericType(
@class.GetParentClasses().First(parent =>
parent.Name.StartsWith("MassTransitStateMachine`1"))
.GetGenericArguments().First())));
});
AllClasses.FromAssembliesInBasePath()
.Where(
@class =>
(@class?.Namespace?.StartsWith("bcn", StringComparison.OrdinalIgnoreCase) ??
false)
&& @class.GetInterfaces().Any(
@interface =>
@interface?.FullName?.StartsWith("MassTransit.IConsumer`1") ??
false))
.ForEach(@class =>
{
var factoryType = typeof(UnityConsumerFactory<>).MakeGenericType(@class);
//Automatically register consumers.
dynamic consumerFactory = Activator.CreateInstance(
factoryType,
container);
var consumingMethod = typeof(ConsumerExtensions).
GetMethods()
.First(
m =>
m.Name == "Consumer" && m.IsGenericMethod &&
m.GetGenericArguments().Length == 1 &&
m.GetParameters().Length == 3)
.MakeGenericMethod(@class)
.Invoke(null, new object[] {sbConfig, consumerFactory, null});
//Automatically detect which payload contains message data. This message data is stored in blob.
@class.GetInterfaces().Where(
@interface =>
@interface.FullName.StartsWith("MassTransit.IConsumer`1"))
.Select(@interface => @interface.GetGenericArguments().First())
.Where(payload => payload.GetProperties()
.Any(prop => prop.PropertyType.Name.StartsWith("MessageData`1")))
.ForEach(
BlobType =>
typeof(MessageDataConfiguratorExtensions)
.GetMethods()
.First(
method =>
method.GetParameters().First().ParameterType ==
typeof(IConsumePipeConfigurator)
&&
method.GetParameters().Last().ParameterType ==
typeof(IMessageDataRepository))
.MakeGenericMethod(BlobType)
.Invoke(null,
new object[]
{sbConfig, c.Resolve<IMessageDataRepository>()}));
});
});
cfg.UseServiceBusMessageScheduler();
//azSbHost.
});
return bus;
}));
container.RegisterType<IBus, IBusControl>();
container.RegisterType<IBus, IBusControl>(new ContainerControlledLifetimeManager());
然后开始:
var container = UnityConfig.GetConfiguredContainer();
var bus = container.Resolve<IBusControl>();
bus.Start();
var scheduler = container.Resolve<IScheduler>();
scheduler.Start();
bus.Publish<BufferedCommand>(new BufferedCommandAdapter<decimal>(10m, TimeSpan.FromSeconds(5),
TimeSpan.FromSeconds(5)));