让我们假设我从一个TransformBlock<Uri, string>
(它本身是一个实现IPropagatorBlock<Uri, string>
)开始,它接受Uri
然后在 a 中获取内容string
(这是一个网络爬虫):
var downloader = new TransformBlock<Uri, string>(async uri => {
// Download and return string asynchronously...
});
一旦我将内容包含在字符串中,我就会解析它以获取链接。由于一个页面可以有多个链接,我使用 aTransformManyBlock<string, Uri>
将单个结果(内容)映射到多个链接:
// The discovered item block.
var parser = new TransformManyBlock<string, Uri>(s => {
// Parse the content here, return an IEnumerable<Uri>.
});
解析器的关键是它可以传回一个空序列,表明没有更多项目需要解析。
但是,这仅适用于树的一个分支(或网络的一部分)。
然后我将下载器链接到解析器,然后返回到下载器,如下所示:
downloader.LinkTo(parser);
parser.LinkTo(downloader);
现在,我知道我可以让所有东西都停止在块之外(通过调用Complete
其中一个),但是我怎样才能从块内发出信号表明它已经完成?
还是我必须自己以某种方式管理这种状态?
现在,它只是挂起,因为在下载并解析了所有内容之后,下载程序块被饿死了。
这是一个完全包含的测试方法,它在调用时挂起Wait
:
[TestMethod]
public void TestSpider()
{
// The list of numbers.
var numbers = new[] { 1, 2 };
// Transforms from an int to a string.
var downloader = new TransformBlock<Tuple<int, string>, string>(
t => t.Item2 + t.Item1.ToString(CultureInfo.InvariantCulture),
// Let's assume four downloads to a domain at a time.
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }
);
// Gets the next set of strings.
var parser = new TransformManyBlock<string, Tuple<int, string>>(s => {
// If the string length is greater than three, return an
// empty sequence.
// This is the signal for this branch to stop.
if (s.Length > 3) return Enumerable.Empty<Tuple<int, string>>();
// Branch out.
return numbers.Select(n => new Tuple<int, string>(n, s));
},
// These are simple transformations/parsing, no need to not parallelize.
// The dataflow blocks will handle the task allocation.
new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
// For broadcasting to an action.
var parserBroadcaster = new BroadcastBlock<Tuple<int, string>>(
// Clone.
t => new Tuple<int, string>(t.Item1, t.Item2));
// Indicate what was parsed.
var parserConsumer = new ActionBlock<Tuple<int, string>>(
t => Debug.WriteLine(
string.Format(CultureInfo.InvariantCulture,
"Consumed - Item1: {0}, Item2: \"{1}\"",
t.Item1, t.Item2)));
// Link downloader to parser.
downloader.LinkTo(parser);
// Parser to broadcaster.
parser.LinkTo(parserBroadcaster);
// Broadcaster to consumer.
parserBroadcaster.LinkTo(parserConsumer);
// Broadcaster back to the downloader.
parserBroadcaster.LinkTo(downloader);
// Start the downloader.
downloader.Post(new Tuple<int, string>(1, ""));
// Wait on the consumer to finish.
parserConsumer.Completion.Wait();
}
其输出(如预期的那样,在挂起之前)是:
Consumed - Item1: 1, Item2: "1"
Consumed - Item1: 2, Item2: "1"
Consumed - Item1: 1, Item2: "11"
Consumed - Item1: 2, Item2: "11"
Consumed - Item1: 1, Item2: "12"
Consumed - Item1: 2, Item2: "12"
Consumed - Item1: 1, Item2: "111"
Consumed - Item1: 2, Item2: "111"
Consumed - Item1: 1, Item2: "112"
Consumed - Item1: 2, Item2: "112"
Consumed - Item1: 1, Item2: "121"
Consumed - Item1: 2, Item2: "121"
Consumed - Item1: 1, Item2: "122"
Consumed - Item1: 2, Item2: "122"