3

让我们假设我从一个TransformBlock<Uri, string>(它本身是一个实现IPropagatorBlock<Uri, string>)开始,它接受Uri然后在 a 中获取内容string(这是一个网络爬虫):

var downloader = new TransformBlock<Uri, string>(async uri => {
    // Download and return string asynchronously...
});

一旦我将内容包含在字符串中,我就会解析它以获取链接。由于一个页面可以有多个链接,我使用 aTransformManyBlock<string, Uri>将单个结果(内容)映射到多个链接:

// The discovered item block.
var parser = new TransformManyBlock<string, Uri>(s => {
    // Parse the content here, return an IEnumerable<Uri>.
});

解析器的关键是它可以传回一个空序列,表明没有更多项目需要解析。

但是,这仅适用于树的一个分支(或网络的一部分)。

然后我将下载器链接到解析器,然后返回到下载器,如下所示:

downloader.LinkTo(parser);
parser.LinkTo(downloader);

现在,我知道我可以让所有东西都停止在块之外(通过调用Complete其中一个),但是我怎样才能从块发出信号表明它已经完成?

还是我必须自己以某种方式管理这种状态?

现在,它只是挂起,因为在下载并解析了所有内容之后,下载程序块被饿死了。

这是一个完全包含的测试方法,它在调用时挂起Wait

[TestMethod]
public void TestSpider()
{
    // The list of numbers.
    var numbers = new[] { 1, 2 };

    // Transforms from an int to a string.
    var downloader = new TransformBlock<Tuple<int, string>, string>(
        t => t.Item2 + t.Item1.ToString(CultureInfo.InvariantCulture),

        // Let's assume four downloads to a domain at a time.
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }
    );

    // Gets the next set of strings.
    var parser = new TransformManyBlock<string, Tuple<int, string>>(s => {
        // If the string length is greater than three, return an
        // empty sequence.
        // This is the signal for this branch to stop.
        if (s.Length > 3) return Enumerable.Empty<Tuple<int, string>>();

        // Branch out.
        return numbers.Select(n => new Tuple<int, string>(n, s));
    }, 
    // These are simple transformations/parsing, no need to not parallelize.
    // The dataflow blocks will handle the task allocation.
    new ExecutionDataflowBlockOptions {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });

    // For broadcasting to an action.
    var parserBroadcaster = new BroadcastBlock<Tuple<int, string>>(
        // Clone.
        t => new Tuple<int, string>(t.Item1, t.Item2));

    // Indicate what was parsed.
    var parserConsumer = new ActionBlock<Tuple<int, string>>(
        t => Debug.WriteLine(
            string.Format(CultureInfo.InvariantCulture, 
                "Consumed - Item1: {0}, Item2: \"{1}\"",
            t.Item1, t.Item2)));

    // Link downloader to parser.
    downloader.LinkTo(parser);

    // Parser to broadcaster.
    parser.LinkTo(parserBroadcaster);

    // Broadcaster to consumer.
    parserBroadcaster.LinkTo(parserConsumer);

    // Broadcaster back to the downloader.
    parserBroadcaster.LinkTo(downloader);

    // Start the downloader.
    downloader.Post(new Tuple<int, string>(1, ""));

    // Wait on the consumer to finish.
    parserConsumer.Completion.Wait();
}

其输出(如预期的那样,在挂起之前)是:

Consumed - Item1: 1, Item2: "1"
Consumed - Item1: 2, Item2: "1"
Consumed - Item1: 1, Item2: "11"
Consumed - Item1: 2, Item2: "11"
Consumed - Item1: 1, Item2: "12"
Consumed - Item1: 2, Item2: "12"
Consumed - Item1: 1, Item2: "111"
Consumed - Item1: 2, Item2: "111"
Consumed - Item1: 1, Item2: "112"
Consumed - Item1: 2, Item2: "112"
Consumed - Item1: 1, Item2: "121"
Consumed - Item1: 2, Item2: "121"
Consumed - Item1: 1, Item2: "122"
Consumed - Item1: 2, Item2: "122"
4

1 回答 1

2

TPL 数据流框架没有任何东西可以开箱即用地处理这个问题。这更像是一个状态管理问题。

也就是说,关键是跟踪已下载的 URL,以及仍需要下载的 URL。

处理这个问题的理想地方是解析器块;这是您拥有内容(将转换为更多下载链接)和下载内容的 URL 的地方。

处理上面的示例,需要引入一种捕获下载结果以及从中下载它的 URI 的方法(我会使用 aTuple但它会使事情变得太混乱):

public class DownloadResult
{
    public Tuple<int, string> Uri { get; set; }
    public string Content { get; set; }
}

从那里开始,下载块几乎相同,只是更新为输出上述结构:

[TestMethod]
public void TestSpider2()
{
    // The list of numbers.
    var numbers = new[] { 1, 2 };

    // Performs the downloading.
    var downloader = new TransformBlock<Tuple<int, string>, DownloadResult>(
        t => new DownloadResult { 
            Uri = t, 
            Content = t.Item2 + 
                t.Item1.ToString(CultureInfo.InvariantCulture) 
        },

        // Let's assume four downloads to a domain at a time.
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }
    );

解析器的使用者不需要更改,但需要提前声明,因为解析器必须向使用者发出信号,它应该停止使用,我们希望在传递给解析器的闭包中捕获它:

// Indicate what was parsed.
var parserConsumer = new ActionBlock<Tuple<int, string>>(
    t => Debug.WriteLine(
        string.Format(CultureInfo.InvariantCulture, 
            "Consumed - Item1: {0}, Item2: \"{1}\"",
            t.Item1, t.Item2)));

现在必须引入状态管理器:

// The dictionary indicating what needs to be processed.
var itemsToProcess = new HashSet<Tuple<int, string>>();

起初,我想只使用 a ,但是由于必须围绕删除ConcurrentDictionary<TKey, TValue>多次添加执行原子操作,它没有提供所需的内容。一个简单的陈述是最好的选择。lock

解析器是变化最大的。它正常解析项目,但也原子地执行以下操作:

看起来像这样:

// Changes content into items and new URLs to download.
var parser = new TransformManyBlock<DownloadResult, Tuple<int, string>>(
    r => {
        // The parsed items.
        IEnumerable<Tuple<int, string>> parsedItems;

        // If the string length is greater than three, return an
        // empty sequence.
        // This is the signal for this branch to stop.
        parsedItems = (r.Uri.Item2.Length > 3) ? 
            Enumerable.Empty<Tuple<int, string>>() :
            numbers.Select(n => new Tuple<int, string>(n, r.Content));

        // Materialize the list.
        IList<Tuple<int, string>> materializedParsedItems = 
            parsedItems.ToList();

        // Lock here, need to make sure the removal from
        // from the items to process dictionary and
        // the addition of the new items are atomic.
        lock (itemsToProcess)
        {
            // Remove the item.
            itemsToProcess.Remove(r.Uri);

            // If the materialized list has zero items, and the new
            // list has zero items, finish the action block.
            if (materializedParsedItems.Count == 0 && 
                itemsToProcess.Count == 0)
            {
                // Complete the consumer block.
                parserConsumer.Complete();
            }

            // Add the items.
            foreach (Tuple<int, string> newItem in materializedParsedItems) 
                itemsToProcess.Add(newItem);

                // Return the items.
                return materializedParsedItems;
            }
        }, 

        // These are simple transformations/parsing, no need to not 
        // parallelize.  The dataflow blocks will handle the task 
        // allocation.
        new ExecutionDataflowBlockOptions {
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
        });

广播者和链接是相同的:

// For broadcasting to an action.
var parserBroadcaster = new BroadcastBlock<Tuple<int, string>>(
    // Clone.
    t => new Tuple<int, string>(t.Item1, t.Item2));

// Link downloader to parser.
downloader.LinkTo(parser);

// Parser to broadcaster.
parser.LinkTo(parserBroadcaster);

// Broadcaster to consumer.
parserBroadcaster.LinkTo(parserConsumer);

// Broadcaster back to the downloader.
parserBroadcaster.LinkTo(downloader);

启动块时,状态机必须在将根传递给Post方法之前使用要下载的 URL 进行初始化:

// The initial post to download.
var root = new Tuple<int, string>(1, "");

// Add to the items to process.
itemsToProcess.Add(root);

// Post to the downloader.
downloader.Post(root);

并且对类上的Wait方法的调用是相同的,现在将在不挂起的情况下完成:Task

    // Wait on the consumer to finish.
    parserConsumer.Completion.Wait();
}
于 2012-11-04T15:34:53.353 回答