0

我有一些来自工作示例的代码片段。此示例是在服务结构中进行 REST 调用 (WebAPI) 和轮询器来轮询请求。有五个参与者 (1) FileImportValidator用于验证文件名 (2) FileParser用于解析文件 (3) AgeValidator用于验证年龄 (4) FilePersister将名称和年龄作为事件持久保存。

请分享此设计是否符合使用 AKKA.NET 对事件源系统进行演员建模的预期。

PS。要解析的文件已经上传。REST 调用仅提供文件名。我有目的地消除了一些验证逻辑。

//WebAPI:

        [HttpPost]
        [Route("import")]
        public async Task<IHttpActionResult> Import(FileImportRequest request)
        {
            IReliableQueue<string> queue = await stateManager.GetOrAddAsync<IReliableQueue<string>>("inputQueue");

            using (ITransaction tx = stateManager.CreateTransaction())
            {
                await queue.EnqueueAsync(tx, request.FileName);

                await tx.CommitAsync();
            }

            return Ok();
        }

  // Poller in Microsoft Service Fabric MicroService:

    public class FileImportMicroService : StatefulService
    {
        public FileImportMicroService()
        {
            domainActorSystem = ActorSystem.Create("DomainActorSystem");

            fileImportValidator = domainActorSystem.ActorOf(Props.Create<FileImportValidator>(), "FileImportValidator");
        }

        protected override ICommunicationListener CreateCommunicationListener()
        {
            ServiceEventSource.Current.CreateCommunicationListener(typeof(FileImportMicroService).Name);

            return new OwinCommunicationListener(typeof(FileImportMicroService).Name, new StartUp(StateManager));
        }

        protected override async Task RunAsync(CancellationToken cancellationToken)
        {
            var queue = await StateManager.GetOrAddAsync<IReliableQueue<string>>("inputQueue");

            while (!cancellationToken.IsCancellationRequested)
            {
                using (ITransaction tx = this.StateManager.CreateTransaction())
                {
                    ConditionalResult<string> dequeuReply = await queue.TryDequeueAsync(tx);

                    if (dequeuReply.HasValue)
                    {
                        FileImportValidator.Tell(new ValidateFileCommand(dequeuReply.Value));
                    }

                    ServiceEventSource.Current.Message(dequeuReply.Value);

                    await tx.CommitAsync();
                }

                await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
            }
        }

        ActorSystem domainActorSystem;

        IActorRef fileImportValidator;
    }


//FileImportValidator Actor

    public class FileImportValidator : UntypedActor
    {
        protected override void OnReceive(object message)
        {
            Handle((dynamic) message);
        }

        public void Handle(ValidateFileCommand command)
        {
            _fileParser = Context.ActorOf(Props.Create(() => new FileParser()));

            ...

            _fileParser.Tell(new ValidationSuccessfulEvent(command.FileName));
        }

        private IActorRef _fileParser;
    }

//FileParser Actor:

    public class FileParser : UntypedActor
    {
        private IActorRef _ageValidator;

        protected override void OnReceive(object message)
        {
            Handle((dynamic) message);
        }

        public void Handle(ValidationSuccessfulEvent message)
        {
            var lines = File.ReadLines(message.FileName);

            foreach(var line in lines)
            {
                var cols = line.Split(',');

                var File = new { Name = cols[0], Age = cols[1] };

                _ageValidator.Tell(new ValidateAge(File.Name, File.Age));
            }
        }

        protected override void PreStart()
        {
            _ageValidator = Context.ActorOf(Props.Create(() => new AgeValidator()));

            base.PreStart();
        }
    }

//AgeValidator Actor:

    public class AgeValidator : UntypedActor
    {
        protected override void OnReceive(object message)
        {
            if (message is ValidateAge)
            {
                _filePersistor.Tell(new SaveNameAndAge(message));
            }
        }

        protected override void PreStart()
        {
            _filePersistor = Context.ActorOf(Props.Create<FilePersistor>(), "file-persistor");

            base.PreStart();
        }

        private IActorRef _filePersistor;
    }

//Persistent Actor:

    public class FilePersistor : PersistentActor
    {
...
        protected override bool ReceiveCommand(object message)
        {
            Persist(/* Handler to persist name and age */);

            return true;
        }
...
    }
4

1 回答 1

2

您可能会考虑的另一种方法是在服务中使用 ReliableDictionary 来“保留”系统的状态(已处理的文件)。上传新文件时,您将创建一个新参与者并传递一个 FileId,以便参与者可以检索数据并对其进行处理。完成后,它会调用服务,以便可以从列表中删除该项目。通过这种方式,您可以并行化文件处理。

于 2015-06-25T18:37:47.870 回答