我正在学习TPL Dataflow
图书馆。到目前为止,这正是我想要的。
我创建了一个执行以下功能的简单类(如下)
- 执行后,
ImportPropertiesForBranch
我转到第 3 方 api 并获取属性列表 - 返回一个 xml 列表并将其反序列化为一组属性数据(id、api 端点、lastupdated)。大约有 400 多处房产(如房屋)。
- 然后我使用一个属性数据到我
Parallel.For
的SendAsync
propertyBufferBlock
propertyBufferBlock
链接到 a (它propertyXmlBlock
本身就是 aTransformBlock
)。- 然后
propertyXmlBlock
(异步)返回 API(使用属性数据中提供的 api 端点)并获取属性 xml 以进行反序列化。 - 一旦 xml 返回并可用,我们就可以反序列化
- 稍后,我将添加更多
TransformBlock
s 以将其持久化到数据存储中。
所以我的问题是;
- 是否有任何潜在的瓶颈或代码区域可能会造成麻烦?我知道我没有包含任何错误处理或取消(即将到来)。
- 可以在 a 内进行
await
异步调用TransformBlock
还是这是一个瓶颈? Parallel.For
虽然代码有效,BufferBlock
但我担心TransformBlock
. 我不确定这是最好的方法,我可能会混淆一些概念。
欢迎任何指导、改进和陷阱建议。
using System.Diagnostics;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using My.Interfaces;
using My.XmlService.Models;
namespace My.ImportService
{
public class ImportService
{
private readonly IApiService _apiService;
private readonly IXmlService _xmlService;
private readonly IRepositoryService _repositoryService;
public ImportService(IApiService apiService,
IXmlService xmlService,
IRepositoryService repositoryService)
{
_apiService = apiService;
_xmlService = xmlService;
_repositoryService = repositoryService;
ConstructPipeline();
}
private BufferBlock<propertiesProperty> propertyBufferBlock;
private TransformBlock<propertiesProperty, string> propertyXmlBlock;
private TransformBlock<string, propertyType> propertyDeserializeBlock;
private ActionBlock<propertyType> propertyCompleteBlock;
public async Task<bool> ImportPropertiesForBranch(string branchName, int branchUrlId)
{
var propertyListXml = await _apiService.GetPropertyListAsync(branchUrlId);
if (string.IsNullOrEmpty(propertyListXml))
return false;
var properties = _xmlService.DeserializePropertyList(propertyListXml);
if (properties?.property == null || properties.property.Length == 0)
return false;
// limited to the first 20 for testing
Parallel.For(0, 20,
new ParallelOptions {MaxDegreeOfParallelism = 3},
i => propertyBufferBlock.SendAsync(properties.property[i]));
propertyBufferBlock.Complete();
await propertyCompleteBlock.Completion;
return true;
}
private void ConstructPipeline()
{
propertyBufferBlock = GetPropertyBuffer();
propertyXmlBlock = GetPropertyXmlBlock();
propertyDeserializeBlock = GetPropertyDeserializeBlock();
propertyCompleteBlock = GetPropertyCompleteBlock();
propertyBufferBlock.LinkTo(
propertyXmlBlock,
new DataflowLinkOptions {PropagateCompletion = true});
propertyXmlBlock.LinkTo(
propertyDeserializeBlock,
new DataflowLinkOptions {PropagateCompletion = true});
propertyDeserializeBlock.LinkTo(
propertyCompleteBlock,
new DataflowLinkOptions {PropagateCompletion = true});
}
private BufferBlock<propertiesProperty> GetPropertyBuffer()
{
return new BufferBlock<propertiesProperty>();
}
private TransformBlock<propertiesProperty, string> GetPropertyXmlBlock()
{
return new TransformBlock<propertiesProperty, string>(async propertiesProperty =>
{
Debug.WriteLine($"getting xml {propertiesProperty.prop_id}");
var propertyXml = await _apiService.GetXmlAsStringAsync(propertiesProperty.url);
return propertyXml;
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 2
});
}
private TransformBlock<string, propertyType> GetPropertyDeserializeBlock()
{
return new TransformBlock<string, propertyType>(xmlAsString =>
{
Debug.WriteLine($"deserializing");
var propertyType = _xmlService.DeserializeProperty(xmlAsString);
return propertyType;
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 2
});
}
private ActionBlock<propertyType> GetPropertyCompleteBlock()
{
return new ActionBlock<propertyType>(propertyType =>
{
Debug.WriteLine($"complete {propertyType.id}");
Debug.WriteLine(propertyType.address.display);
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 2
});
}
}
}