0

我有一个名为“IntegrationEvent”的 Azure ServiceBus 主题和一个名为“TestSubscription”的订阅。我为此订阅注册了两个订阅者,我拥有CompetingConsumers

两个订阅者都处理了大量消息。我需要改变什么以使这种情况不再发生?我认为每条消息都应该只由一个订阅者处理?

发件人:

class Program
{
    static async Task Main(string[] args)
    {
        await SendMessageAsync();
    }

    private static async Task SendMessageAsync()
    {
        var sendClient = new TopicClient("ConnectionString");

        var testBlock = new ActionBlock<string>(
            async id =>
            {
                string jsonMessage = JsonConvert.SerializeObject(id);
                byte[] body = Encoding.UTF8.GetBytes(jsonMessage);

                var messageToSend = new Message(body)
                {
                    CorrelationId = id,
                };

                await sendClient.SendAsync(messageToSend);

            }, new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 25
            });

        for (int i = 0; i < 10000; i++)
        {
            testBlock.Post(Guid.NewGuid().ToString());
        }

        testBlock.Complete();
        await testBlock.Completion;
    }
}

我使用两个订阅者/消费者(不是订阅)来监听 IntegrationEvent。

class Program
{
    static SubscriptionClient subscriptionClient;

    static async Task Main(string[] args)
    {
        var builder = new ServiceBusConnectionStringBuilder("ConnectionString");
        if (string.IsNullOrWhiteSpace(builder.EntityPath))
        {
            builder.EntityPath = "IntegrationEvent";
        }

        subscriptionClient = new SubscriptionClient(builder, "TestSubscription");

        await subscriptionClient.RemoveRuleAsync(RuleDescription.DefaultRuleName);
        await subscriptionClient.AddRuleAsync(new RuleDescription(RuleDescription.DefaultRuleName, new TrueFilter()));

        ListenForMessages();

        Console.Read();
    }

    protected static void ListenForMessages()
    {
        var options = new MessageHandlerOptions(ExceptionReceivedHandler)
        {
            AutoComplete = false,
            MaxConcurrentCalls = 10
        };
        subscriptionClient.RegisterMessageHandler(ReceiveMessageAsync, options);
    }

    private static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs arg)
    {
        return Task.CompletedTask;
    }

    private static async Task ReceiveMessageAsync(Message arg1, CancellationToken arg2)
    {
        string integrationEvent = Encoding.UTF8.GetString(arg1.Body);
        Console.WriteLine($"{ arg1.MessageId}, { arg1.CorrelationId}, {integrationEvent}");
        await subscriptionClient.CompleteAsync(arg1.SystemProperties.LockToken);
    }
}
4

1 回答 1

0

在尝试任何事情之前了解订阅的工作原理很重要。有关于这一点的文档。具体来说,影响订阅将接收什么的规则。

现在您的代码已设置为接收所有消息(捕获所有规则)。使用 SQL 或关联过滤器创建更具体的规则后,您将能够控制每个单独订阅将收到的内容。不久前我写了一篇文章,其中包含一些信息。

于 2019-09-22T22:40:40.820 回答