0

我想在 ServiceBusTopics 中找到重复的消息。下面是我的逻辑。如果我发现任何重复的消息正在添加到列表中并发送到另一个服务总线主题。但是在遍历列表之前,我无法读取订阅中存在的所有服务总线消息。我想从服务总线读取所有消息,然后循环遍历它。我不知道应该如何停止循环的执行,直到我从订阅中读取所有消息

    private static string _serviceBusConn = "servicebusconnectionstring";
    private static string _serviceBustopic = "topic1";
    private static string _topic = "test_v1";

    static void Main(string[] args)
    {
        IList<string> items = new List<string>();
        int count; 
        IList<string> itemsRepeated = new List<string>();

        var subClient = SubscriptionClient.CreateFromConnectionString(_serviceBusConn, _serviceBustopic, "DevTest");

        subClient.OnMessage(m =>
        {
            Stream stream = m.GetBody<Stream>();
            StreamReader reader = new StreamReader(stream);
            string s = reader.ReadToEnd();

            Console.WriteLine(s);

            items.Add(s);

        });


        List<string> copy1= new List<string>(items);
        List<string> copy2 = new List<string>(items);


        foreach (var item in copy1)
        {
            count = 0;
            foreach (var itemtopic in copy2)
            {
                count++;
                Console.WriteLine("{0}{1}{2}", items, itemtopic, count);
                if (item.Equals(itemtopic))
                {
                    count++;
                }

                if (count > 1)
                {
                    Console.WriteLine(count);
                    itemsRepeated.Add(itemtopic);
                }
            }
        }
        foreach (var repeateditem in itemsRepeated)
        {
            SendMessage(repeateditem);
        }

    }

    static void SendMessage(string message)
        {
            var topicClient = TopicClient.CreateFromConnectionString(_serviceBusConn, _topic);
            var msg = new BrokeredMessage(message);
            topicClient.Send(msg);
        }
4

2 回答 2

0

您可以使用 ReceiveBatch(Int32) 方法批量接收来自主题订阅的消息。您可以使用命名空间管理器的 Getsubscription() 方法获取主题订阅中的消息数。

var namespaceManager = NamespaceManager.CreateFromConnectionString(connnectionString);
var subscriptionDescription = namespaceManager.GetSubscription(topicName, subscriptionName);
var totalMessageCount= subscriptionDescription .MessageCount;

现在您可以在循环中调用 ReceiveBatch(minimumMessageCount) 并在接收到的消息计数达到 totalMessageCount 时终止循环。

int receivedMessageCount = 0;
List<BrokeredMessage> MessageList = new List<BrokeredMessage>();
do{
var messageList = subClient.ReceiveBatch(100);
receivedMessageCount += messageList.Count;
MessageList.AddRange(messageList); 
}while(receivedMessageCount < totalMessageCount);

现在 MessageList 将包含主题订阅中的所有消息。您可以使用消息列表上的自定义逻辑来执行重复检测并将其转发到另一个订阅。

注意:要接收来自 ServiceBusTopic 的所有消息,您必须接收来自该主题内所有主题订阅的所有消息。

于 2019-01-03T04:01:59.877 回答
0
 if (count > 1)
                {
                    Console.WriteLine(count);
                    itemsRepeated.Add(itemtopic);
                }

您可以像这样使用 break 来打破循环

   if (count > 1)
                        {
                            Console.WriteLine(count);
                            itemsRepeated.Add(itemtopic);
                            break;
                        }
于 2019-01-03T04:32:49.200 回答