我有一些来自工作示例的代码片段。此示例是在服务结构中进行 REST 调用 (WebAPI) 和轮询器来轮询请求。有五个参与者 (1) FileImportValidator用于验证文件名 (2) FileParser用于解析文件 (3) AgeValidator用于验证年龄 (4) FilePersister将名称和年龄作为事件持久保存。
请分享此设计是否符合使用 AKKA.NET 对事件源系统进行演员建模的预期。
PS。要解析的文件已经上传。REST 调用仅提供文件名。我有目的地消除了一些验证逻辑。
//WebAPI:
[HttpPost]
[Route("import")]
public async Task<IHttpActionResult> Import(FileImportRequest request)
{
IReliableQueue<string> queue = await stateManager.GetOrAddAsync<IReliableQueue<string>>("inputQueue");
using (ITransaction tx = stateManager.CreateTransaction())
{
await queue.EnqueueAsync(tx, request.FileName);
await tx.CommitAsync();
}
return Ok();
}
// Poller in Microsoft Service Fabric MicroService:
public class FileImportMicroService : StatefulService
{
public FileImportMicroService()
{
domainActorSystem = ActorSystem.Create("DomainActorSystem");
fileImportValidator = domainActorSystem.ActorOf(Props.Create<FileImportValidator>(), "FileImportValidator");
}
protected override ICommunicationListener CreateCommunicationListener()
{
ServiceEventSource.Current.CreateCommunicationListener(typeof(FileImportMicroService).Name);
return new OwinCommunicationListener(typeof(FileImportMicroService).Name, new StartUp(StateManager));
}
protected override async Task RunAsync(CancellationToken cancellationToken)
{
var queue = await StateManager.GetOrAddAsync<IReliableQueue<string>>("inputQueue");
while (!cancellationToken.IsCancellationRequested)
{
using (ITransaction tx = this.StateManager.CreateTransaction())
{
ConditionalResult<string> dequeuReply = await queue.TryDequeueAsync(tx);
if (dequeuReply.HasValue)
{
FileImportValidator.Tell(new ValidateFileCommand(dequeuReply.Value));
}
ServiceEventSource.Current.Message(dequeuReply.Value);
await tx.CommitAsync();
}
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
}
}
ActorSystem domainActorSystem;
IActorRef fileImportValidator;
}
//FileImportValidator Actor
public class FileImportValidator : UntypedActor
{
protected override void OnReceive(object message)
{
Handle((dynamic) message);
}
public void Handle(ValidateFileCommand command)
{
_fileParser = Context.ActorOf(Props.Create(() => new FileParser()));
...
_fileParser.Tell(new ValidationSuccessfulEvent(command.FileName));
}
private IActorRef _fileParser;
}
//FileParser Actor:
public class FileParser : UntypedActor
{
private IActorRef _ageValidator;
protected override void OnReceive(object message)
{
Handle((dynamic) message);
}
public void Handle(ValidationSuccessfulEvent message)
{
var lines = File.ReadLines(message.FileName);
foreach(var line in lines)
{
var cols = line.Split(',');
var File = new { Name = cols[0], Age = cols[1] };
_ageValidator.Tell(new ValidateAge(File.Name, File.Age));
}
}
protected override void PreStart()
{
_ageValidator = Context.ActorOf(Props.Create(() => new AgeValidator()));
base.PreStart();
}
}
//AgeValidator Actor:
public class AgeValidator : UntypedActor
{
protected override void OnReceive(object message)
{
if (message is ValidateAge)
{
_filePersistor.Tell(new SaveNameAndAge(message));
}
}
protected override void PreStart()
{
_filePersistor = Context.ActorOf(Props.Create<FilePersistor>(), "file-persistor");
base.PreStart();
}
private IActorRef _filePersistor;
}
//Persistent Actor:
public class FilePersistor : PersistentActor
{
...
protected override bool ReceiveCommand(object message)
{
Persist(/* Handler to persist name and age */);
return true;
}
...
}