2

我正在学习TPL Dataflow图书馆。到目前为止,这正是我想要的。

我创建了一个执行以下功能的简单类(如下)

  • 执行后,ImportPropertiesForBranch我转到第 3 方 api 并获取属性列表
  • 返回一个 xml 列表并将其反序列化为一组属性数据(id、api 端点、lastupdated)。大约有 400 多处房产(如房屋)。
  • 然后我使用一个属性数据到我Parallel.ForSendAsyncpropertyBufferBlock
  • propertyBufferBlock链接到 a (它propertyXmlBlock本身就是 a TransformBlock)。
  • 然后propertyXmlBlock(异步)返回 API(使用属性数据中提供的 api 端点)并获取属性 xml 以进行反序列化。
  • 一旦 xml 返回并可用,我们就可以反序列化
  • 稍后,我将添加更多TransformBlocks 以将其持久化到数据存储中。

所以我的问题是;

  • 是否有任何潜在的瓶颈或代码区域可能会造成麻烦?我知道我没有包含任何错误处理或取消(即将到来)。
  • 可以在 a 内进行await异步调用TransformBlock还是这是一个瓶颈?
  • Parallel.For虽然代码有效,BufferBlock但我担心TransformBlock. 我不确定这是最好的方法,我可能会混淆一些概念。

欢迎任何指导、改进和陷阱建议。

using System.Diagnostics;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using My.Interfaces;
using My.XmlService.Models;

namespace My.ImportService
{
    public class ImportService
    {

        private readonly IApiService _apiService;
        private readonly IXmlService _xmlService;
        private readonly IRepositoryService _repositoryService;

        public ImportService(IApiService apiService,
            IXmlService xmlService,
            IRepositoryService repositoryService)
        {
            _apiService = apiService;
            _xmlService = xmlService;
            _repositoryService = repositoryService;

            ConstructPipeline();
        }

        private BufferBlock<propertiesProperty> propertyBufferBlock;
        private TransformBlock<propertiesProperty, string> propertyXmlBlock;
        private TransformBlock<string, propertyType> propertyDeserializeBlock;
        private ActionBlock<propertyType> propertyCompleteBlock;

        public async Task<bool> ImportPropertiesForBranch(string branchName, int branchUrlId)
        {
            var propertyListXml = await _apiService.GetPropertyListAsync(branchUrlId);

            if (string.IsNullOrEmpty(propertyListXml))
                return false;

            var properties = _xmlService.DeserializePropertyList(propertyListXml);

            if (properties?.property == null || properties.property.Length == 0)
                return false;

            // limited to the first 20 for testing
            Parallel.For(0, 20,
                new ParallelOptions {MaxDegreeOfParallelism = 3},
                i => propertyBufferBlock.SendAsync(properties.property[i]));

            propertyBufferBlock.Complete();

            await propertyCompleteBlock.Completion;

            return true;
        }

        private void ConstructPipeline()
        {
            propertyBufferBlock = GetPropertyBuffer();
            propertyXmlBlock = GetPropertyXmlBlock();
            propertyDeserializeBlock = GetPropertyDeserializeBlock();
            propertyCompleteBlock = GetPropertyCompleteBlock();

            propertyBufferBlock.LinkTo(
                propertyXmlBlock,
                new DataflowLinkOptions {PropagateCompletion = true});

            propertyXmlBlock.LinkTo(
                propertyDeserializeBlock,
                new DataflowLinkOptions {PropagateCompletion = true});

            propertyDeserializeBlock.LinkTo(
                propertyCompleteBlock,
                new DataflowLinkOptions {PropagateCompletion = true});
        }

        private BufferBlock<propertiesProperty> GetPropertyBuffer()
        {
            return new BufferBlock<propertiesProperty>();
        }

        private TransformBlock<propertiesProperty, string> GetPropertyXmlBlock()
        {
            return new TransformBlock<propertiesProperty, string>(async propertiesProperty =>
                {
                    Debug.WriteLine($"getting xml {propertiesProperty.prop_id}");
                    var propertyXml = await _apiService.GetXmlAsStringAsync(propertiesProperty.url);
                    return propertyXml;
                },
                new ExecutionDataflowBlockOptions
                {
                    MaxDegreeOfParallelism = 1,
                    BoundedCapacity = 2
                });
        }

        private TransformBlock<string, propertyType> GetPropertyDeserializeBlock()
        {
            return new TransformBlock<string, propertyType>(xmlAsString =>
                {
                    Debug.WriteLine($"deserializing");
                    var propertyType = _xmlService.DeserializeProperty(xmlAsString);
                    return propertyType;
                },
                new ExecutionDataflowBlockOptions
                {
                    MaxDegreeOfParallelism = 1,
                    BoundedCapacity = 2
                });
        }

        private ActionBlock<propertyType> GetPropertyCompleteBlock()
        {
            return new ActionBlock<propertyType>(propertyType =>
                {
                    Debug.WriteLine($"complete {propertyType.id}");
                    Debug.WriteLine(propertyType.address.display);
                },
                new ExecutionDataflowBlockOptions
                {
                    MaxDegreeOfParallelism = 1,
                    BoundedCapacity = 2
                });
        }
    }
}
4

2 回答 2

3

您实际上以错误的方式做一些事情:

i => propertyBufferBlock.SendAsync(properties.property[i])

你需要await方法,否则你会创建太多的并发任务。

还有这一行:

MaxDegreeOfParallelism = 1

会将您的块的执行限制为后续执行,这会降低您的性能。

正如您在评论中所说,您切换到同步方法Post并通过设置BoundedCapacity. 应谨慎使用此变体,因为您需要检查它的返回值,说明消息是否已被接受。

至于您担心等待async块内的方法 - 绝对没问题,应该像在其他async使用方法的情况下一样。

于 2017-05-20T20:56:11.100 回答
2

是否有任何潜在的瓶颈或代码区域可能会造成麻烦?

一般来说,您的方法看起来不错,潜在的瓶颈是您正在限制使用MaxDegreeOfParallelism = 1. 根据对问题的描述,每个项目都可以独立于其他项目进行处理,这就是您可以一次处理多个项目的原因。

可以在 a 中等待异步调用TransformBlock还是这是一个瓶颈?

这很好,因为 TPL DataFlow 支持异步操作。

Parallel.For虽然代码有效,BufferBlock但我担心TransformBlock. 我不确定这是最好的方法,我可能会混淆一些概念。

一,你的代码中可能让你在脚下开枪的潜在问题是调用 async 方法Parallel.For,然后调用propertyBufferBlock.Complete();. 这里的问题是Parallel.For不支持异步操作,并且您调用它的方式将propertyBufferBlock.SendAsync在返回的任务完成之前调用并继续。这意味着到Parallel.For退出时,某些操作可能仍处于运行状态,并且项目尚未添加到缓冲区块中。如果您随后调用propertyBufferBlock.Complete();这些待处理的项目将引发异常并且项目不会被添加到处理中。你会得到未观察到的异常。

您可以使用此博客文章ForEachAsync的形式确保在完成块之前将所有项目添加到块中。但是,如果您仍然将处理限制为 1 个操作,您可以一次添加一个项目。我不确定如何实现,但可能会在内部限制一次添加一项,因此并行添加没有任何意义。propertyBufferBlock.SendAsync

于 2017-05-20T13:58:22.883 回答