1

据我了解,TPL 数据流为 .NET 程序员提供了 Actor 编程模型(不涉及以前可用的 3rd 方解决方案)。Actor 模型本身声明每个 Actor 可以支持三种基本操作:“发送”、“创建”和“成为”。在 TPL Dataflow 中处理“成为”语义的“正确”方法是什么?

请考虑以下示例:

static void TestBecome()
{
    TransformBlock<string, string> dispatcher = null;
    dispatcher = new TransformBlock<string, string>
    (
        val =>
        {
            Console.WriteLine("Received for processing {0}", val);

            switch (val)
            {
                case "CREATE":  // create linked node
                {
                    dispatcher.LinkTo(CreateNewNode().Item2);
                    break;
                } 
                case "BECOME":  // transform the node ('become' semantics)
                {
                    var newNode = CreateNewNode(); 
                    Console.WriteLine("Dispatcher transformed to {0}", newNode.Item1);
                    dispatcher = newNode.Item2;

                    break; 
                }

                default: return val;    // dispatch the value to linked node (one of)
            }

            return string.Empty;    // 'empty unit'
        }
    );

    dispatcher.SendAsync("CREATE").ContinueWith(res => Console.WriteLine("Send CREATE: {0}", res.Result));
    dispatcher.SendAsync("CREATE").ContinueWith(res => Console.WriteLine("Send CREATE: {0}", res.Result));
    dispatcher.SendAsync("msg1").ContinueWith(res => Console.WriteLine("Send msg1: {0}", res.Result));
    dispatcher.SendAsync("msg2").ContinueWith(res => Console.WriteLine("Send msg2: {0}", res.Result)); ;

    Thread.Sleep(1000);

    dispatcher.SendAsync("BECOME").ContinueWith(res => Console.WriteLine("Send  BECOME: {0}", res.Result)); ;

    Thread.Sleep(1000);

    dispatcher.SendAsync("msg3").ContinueWith(res => Console.WriteLine("Send msg3: {0}", res.Result));
    dispatcher.SendAsync("msg4").ContinueWith(res => Console.WriteLine("Send msg4: {0}", res.Result));
    dispatcher.SendAsync("msg5").ContinueWith(res => Console.WriteLine("Send msg5: {0}", res.Result));
}

static Tuple<string, TransformBlock<string, string>> CreateNewNode()
{
    var id = Guid.NewGuid().ToString("N");
    var node = new TransformBlock<string, string>
    (
        val =>
        {
            if (string.IsNullOrWhiteSpace(val)) // pass trough 'empty unit'
                return val;
            Console.WriteLine("NODE {0}: {1}", id, val);
            return val;
        }
        , new ExecutionDataflowBlockOptions { BoundedCapacity = 3 }
    );

    return Tuple.Create(id, node);
}

我发现这种“变得”笨拙的方式:我没有改变演员的行为,而是改变了演员实例本身(这会导致不希望的效果)。什么是“正确”的方式?

还有一个问题:据我所知,标准的 TDF 块要么永远不会将消息传递给链接的笔记(例如 ActionBlock,如果这样的逻辑不是手工编写的),要么总是这样做(大多数块)。我是否正确,仅在某些情况下(并非总是)发送消息的逻辑应作为自定义块实现?

4

1 回答 1

2

首先,TPL 数据流基于参与者模型,但大多数时候,您将只使用它的特定子集。例如,在收到消息后创建新的数据流块或决定将消息发送到哪个块(这是通常使用的链接)是非常罕见的。虽然如果你想这样做,你可以。

其次,是的,您正在做的事情非常笨拙,并且在许多情况下都不起作用(例如,如果某个块已链接到dispatcher)。

为此,我将使用在块中执行的委托可以具有内部状态这一事实。对于简单的情况,您可以在 lambda 中使用捕获的变量。例如,如果您有一个将 GUID 附加到 a 的块string

var id = Guid.NewGuid().ToString("N");
var block = new TransformBlock<string, string>(
    input =>
    {
        if (input == "BECOME")
        {
            id = Guid.NewGuid().ToString("N");
            return string.Empty;
        }

        return string.Format("{0}: {1}", id, input);
    });

对于更复杂的情况,您可以创建一个将其状态存储在字段中的类,并为该类的实例方法创建一个委托:

class IdPrepender
{
    private string id = Guid.NewGuid().ToString("N");

    public string Process(string input)
    {
        if (input == "BECOME")
        {
            id = Guid.NewGuid().ToString("N");
            return string.Empty;
        }

        return string.Format("{0}: {1}", id, input);
    }
}

…

var block = new TransformBlock<string, string>(new IdPrepender().Process);

在这两种情况下,如果块可以并行执行,则必须确保代码是线程安全的。

另外,我不会string像这样超载。在第二种情况下,您可以利用 TPL 数据流不是纯参与者模型这一事实,并为类 ( IdPrepender) 添加一个“成为”方法。

我是否正确,仅在某些情况下(并非总是)发送消息的逻辑应作为自定义块实现?

为此,您不需要自定义块。您可以使用TransformManyBlock谁的委托总是返回 0 或 1 个项目。

于 2013-07-16T11:39:04.287 回答