6

我有一个 BufferBlock 来发布消息:

public class DelimitedFileBlock : ISourceBlock<string>
{
    private ISourceBlock<string> _source;
    _source = new BufferBlock<string>(new DataflowBlockOptions() { BoundedCapacity = 10000 });

    //Read a file
    While(!eof)
        row = read one row 
    //if consumers are slow, then sleep for a while
    while(!(_source as BufferBlock<string>).Post<string>(row))
    {
        Thread.Sleep(5000);
    }
}

这是一个 5GB 的文件,有 2400 万行。

我现在有一个使用 ActionBlock 的 Target 块:

public class SolaceTargetBlock : ITargetBlock<string>
       private ActionBlock<IBasicDataContract> _publishToSolaceBlock;

       public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, string messageValue, ISourceBlock<string> source, bool consumeToAccept)
    {
        //post to another block to publish
        bool success = _publishToSolaceBlock.Post(messageValue);

现在在控制台应用程序中,我指定:

 SolaceTargetBlock solaceTargetBlock1 = new SolaceTargetBlock("someparam", 
            new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });
 SolaceTargetBlock solaceTargetBlock2 = new SolaceTargetBlock("someparam", 
            new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });
 SolaceTargetBlock solaceTargetBlock3 = new SolaceTargetBlock("someparam", 
            new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });

 DelimitedFileBlock delimitedFileBlock = new DelimitedFileBlock(csvFileInfo);

我将有限容量保持为 1 仅用于测试。

现在我使用 LinkTo 将这三个消费者链接到我的源:

 delimitedFileBlock.LinkTo(solaceTargetBlock1);      
 delimitedFileBlock.LinkTo(solaceTargetBlock2);      
 delimitedFileBlock.LinkTo(solaceTargetBlock3);      

这在 10003 行之后转到 Thread.Sleep(5000) 语句,并且 while 循环中的 Post 始终返回 false。

我期待,因为我有 LinkTo,所以完成后 solaceTargetBlocks 将能够选择下一条消息,但 LinkTo 不会清除 BufferBlock。那么,如何在多个消费者之间进行负载平衡。我是否必须接收并编写一个简单的负载平衡逻辑才能在消费者之间分配?

4

1 回答 1

13

Post方法的文档中(强调我的):DataflowBlock<T>

一旦目标块决定接受或拒绝该项目,此方法将返回,

这意味着目标可以选择拒绝阻止(这是您看到的行为)。

此外,它指出:

对于支持延迟提供的消息的目标块,或者对于可能在其 Post 实现中进行更多处理的块,请考虑使用 SendAsync,它将立即返回,并使目标能够延迟发布的消息并稍后在 SendAsync 返回后使用它。

这意味着您可能会获得更好的结果(取决于目标块),因为您的消息可能会被推迟,但仍会被处理,而不是完全拒绝。

我想这三个实例的BoundedCapacity属性设置都与您所看到的有关:BufferBlock<T>ActionBlock<TInput>

  • 您的最大缓冲区BufferBlock<T>为 10000;一旦您将 10,000 个项目放入队列中,它将拒绝其余的(参见上面的第二个引用),因为它无法处理它们(SendAsync在这里也不起作用,因为它无法缓冲要推迟的消息)。

  • 您在实例上的最大缓冲区ActionBlock<TInput>为 1,并且您拥有其中的三个。

10,000 + (1 * 3) = 10,000 + 3 = 10,003

要解决这个问题,您需要做一些事情。

首先,您需要在创建实例时为属性设置一个更合理的值MaxDegreeOfParallelismExecutionDataFlowBlockOptionsActionBlock<TInput>

默认情况下,MaxDegreeOfParallelismfor anActionBlock<TInput>设置为 1;这保证了调用将被序列化,您不必担心线程安全。如果您希望ActionBlock<T>关注线程安全,请保留此设置。

如果ActionBlock<TInput> 线程安全的,那么你没有理由限制它,你应该设置MaxDegreeOfParallelismDataflowBlockOptions.Unbounded.

如果您正在访问某种ActionBlock<TInput>可以在有限的基础上同时访问的共享资源,那么您可能做错了事。

如果您有某种共享资源,那么您很可能应该通过另一个块运行它并将其设置MaxDegreeOfParallelism为.

其次,如果您关心吞吐量并且可以接受丢弃的项目,那么您应该设置该BoundedCapacity属性。

另请注意,您表示“如果消费者很慢,请睡一会儿”;如果你正确地连接你的块,就没有理由这样做,你应该让数据流过并且只在你需要的地方放置限制。你的生产者不应该负责限制消费者,让消费者负责限制。

最后,您的代码看起来不需要自己实现数据流块接口。你可以像这样构造它:

// The source, your read lines will be posted here.
var delimitedFileBlock = new BufferBlock<string>();

// The Action for the action blocks.
Action<string> action = 
    s => { /* Do something with the string here. */ };

// Create the action blocks, assuming that
// action is thread-safe, no need to have it process one at a time
// or to bound the capacity.
var solaceActionBlock1 = new ActionBlock<string>(action,
    new ExecutionDataflowBlockOptions { 
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    });
var solaceActionBlock2 = new ActionBlock<string>(action,
    new ExecutionDataflowBlockOptions { 
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    });
var solaceActionBlock3 = new ActionBlock<string>(action,
    new ExecutionDataflowBlockOptions { 
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    });

// Link everything.
delimitedFileBlock.LinkTo(solaceTargetBlock1);
delimitedFileBlock.LinkTo(solaceTargetBlock2);
delimitedFileBlock.LinkTo(solaceTargetBlock3);

// Now read the file, and post to the BufferBlock<T>:
// Note: This is pseudo-code.
while (!eof)
{
    // Read the row.
    string row = ...;

    delimitedFileBlock.Post(read);
}

另请注意,拥有三个ActionBlock<TInput>实例是不必要的,除非您需要将输出过滤为不同的操作(您在这里没有这样做),所以上面确实减少了这一点(假设您的操作是线程安全的,所以您要去增加到反正)MaxDegreeOfParallelismUnbounded

// The source, your read lines will be posted here.
var delimitedFileBlock = new BufferBlock<string>();

// The Action for the action blocks.
Action<string> action = 
    s => { /* Do something with the string here. */ };

// Create the action blocks, assuming that
// action is thread-safe, no need to have it process one at a time
// or to bound the capacity.
var solaceActionBlock1 = new ActionBlock<string>(action,
    new ExecutionDataflowBlockOptions { 
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    });

// Link everything.
delimitedFileBlock.LinkTo(solaceTargetBlock);

// Now read the file, and post to the BufferBlock<T>:
// Note: This is pseudo-code.
while (!eof)
{
    // Read the row.
    string row = ...;

    delimitedFileBlock.Post(read);
}
于 2012-11-16T21:09:13.653 回答