我有一个 BufferBlock 来发布消息:
public class DelimitedFileBlock : ISourceBlock<string>
{
private ISourceBlock<string> _source;
_source = new BufferBlock<string>(new DataflowBlockOptions() { BoundedCapacity = 10000 });
//Read a file
While(!eof)
row = read one row
//if consumers are slow, then sleep for a while
while(!(_source as BufferBlock<string>).Post<string>(row))
{
Thread.Sleep(5000);
}
}
这是一个 5GB 的文件,有 2400 万行。
我现在有一个使用 ActionBlock 的 Target 块:
public class SolaceTargetBlock : ITargetBlock<string>
private ActionBlock<IBasicDataContract> _publishToSolaceBlock;
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, string messageValue, ISourceBlock<string> source, bool consumeToAccept)
{
//post to another block to publish
bool success = _publishToSolaceBlock.Post(messageValue);
现在在控制台应用程序中,我指定:
SolaceTargetBlock solaceTargetBlock1 = new SolaceTargetBlock("someparam",
new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });
SolaceTargetBlock solaceTargetBlock2 = new SolaceTargetBlock("someparam",
new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });
SolaceTargetBlock solaceTargetBlock3 = new SolaceTargetBlock("someparam",
new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });
DelimitedFileBlock delimitedFileBlock = new DelimitedFileBlock(csvFileInfo);
我将有限容量保持为 1 仅用于测试。
现在我使用 LinkTo 将这三个消费者链接到我的源:
delimitedFileBlock.LinkTo(solaceTargetBlock1);
delimitedFileBlock.LinkTo(solaceTargetBlock2);
delimitedFileBlock.LinkTo(solaceTargetBlock3);
这在 10003 行之后转到 Thread.Sleep(5000) 语句,并且 while 循环中的 Post 始终返回 false。
我期待,因为我有 LinkTo,所以完成后 solaceTargetBlocks 将能够选择下一条消息,但 LinkTo 不会清除 BufferBlock。那么,如何在多个消费者之间进行负载平衡。我是否必须接收并编写一个简单的负载平衡逻辑才能在消费者之间分配?