1

在我的应用程序中,我有来自我的数据库的人员列表。对于每个人,我必须调用 5 个(目前)服务来搜索一些信息。如果服务返回信息,我将其添加到那个人(特定人的订单列表)
因为服务独立工作,我想我可以尝试并行运行它们。我已经这样创建了我的代码:

using System;
using System.Collections.Generic;
using System.Threading;

namespace Testy
{
    internal class Program
    {
        internal class Person
        {
            public int Id { get; set; }
            public string Name { get; set; }
            public List<string> Orders { get; private set; }

            public Person()
            {
                // thanks for tip @juharr
                Orders = new List<string>();
            }

            public void AddOrder(string order)
            {
                lock (Orders) //access across threads
                {
                    Orders.Add(order);
                }
            }
        }

        internal class Service
        {
            public int Id { get; private set; }

            public Service(int id)
            {
                Id = id;
            }

            //I get error when I use IList instead of List
            public void Search(ref List<Person> list) 
            {
                foreach (Person p in list)
                {
                    lock (p) //should I lock Person here? and like this???
                    {
                        Search(p);
                    }
                }
            }
            private void Search(Person p)
            {
                Thread.Sleep(50);
                p.AddOrder(string.Format("test order from {0,2}",
                                      Thread.CurrentThread.ManagedThreadId));
                Thread.Sleep(100);
            }
        }

        private static void Main()
        {
            //here I load my services from external dll's
            var services = new List<Service>();
            for (int i = 1; i <= 5; i++)
            {
                services.Add(new Service(i));
            }

            //sample data load from db    
            var persons = new List<Person>();

            for (int i = 1; i <= 10; i++)
            {
                persons.Add(
                    new Person {Id = i, 
                    Name = string.Format("Test {0}", i)});
            }

            Console.WriteLine("Number of services: {0}", services.Count);
            Console.WriteLine("Number of persons: {0}", persons.Count);

            ManualResetEvent resetEvent = new ManualResetEvent(false);
            int toProcess = services.Count;

            foreach (Service service in services)
            {
                new Thread(() =>
                    {
                        service.Search(ref persons);
                        if (Interlocked.Decrement(ref toProcess) == 0)
                            resetEvent.Set();
                    }
                    ).Start();
            }

            // Wait for workers.
            resetEvent.WaitOne();

            foreach (Person p in persons)
            {
                Console.WriteLine("{0,2} Person name: {1}",p.Id,p.Name);
                if (null != p.Orders)
                {
                    Console.WriteLine("    Orders:");
                    foreach (string order in p.Orders)
                    {
                        Console.WriteLine("    Order: {0}", order);
                    }
                }
                else
                {
                    Console.WriteLine("    No orders!");
                }
            }
            Console.ReadLine();
        }
    }
}

我的代码有两个问题:

  1. 当我运行我的应用程序时,我应该得到 10 人的列表,每个人有 5 个订单,但有时(3-5 次运行的)第一人我只得到 4 个订单。我怎样才能防止这种行为?
    解决了!感谢@juharr
  2. 如何从我的线程报告进度?我想得到的是我的程序类中的一个函数,每次从服务中添加订单时都会调用它——我需要它来显示每个报告的某种进展。
    我正在尝试此处描述的解决方案:https ://stackoverflow.com/a/3874184/965722 ,但我想知道是否有更简单的方法。理想情况下,我想将委托添加到Service类并将所有线程代码放在那里。
    我应该如何将事件和委托添加到Service类以及如何在 Main 方法中订阅它?

我正在使用 .NET 3.5

我添加了此代码以便能够获取进度报告:

using System;
using System.Collections.Generic;
using System.Threading;

namespace Testy
{
    internal class Program
    {
        public class ServiceEventArgs : EventArgs
        {
            public ServiceEventArgs(int sId, int progress)
            {
                SId = sId;
                Progress = progress;
            }

            public int SId { get; private set; }
            public int Progress { get; private set; }
        }

        internal class Person
        {
            private static readonly object ordersLock = new object();

            public int Id { get; set; }
            public string Name { get; set; }
            public List<string> Orders { get; private set; }

            public Person()
            {
                Orders = new List<string>();
            }

            public void AddOrder(string order)
            {
                lock (ordersLock) //access across threads
                {
                    Orders.Add(order);
                }
            }
        }

        internal class Service
        {
            public event EventHandler<ServiceEventArgs> ReportProgress;

            public int Id { get; private set; }
            public string Name { get; private set; }

            private int counter;

            public Service(int id, string name)
            {
                Id = id;
                Name = name;
            }

            public void Search(List<Person> list) //I get error when I use IList instead of List
            {
                counter = 0;
                foreach (Person p in list)
                {
                    counter++;
                    Search(p);
                    Thread.Sleep(3000);
                }
            }

            private void Search(Person p)
            {
                p.AddOrder(string.Format("Order from {0,2}", Thread.CurrentThread.ManagedThreadId));

                EventHandler<ServiceEventArgs> handler = ReportProgress;
                if (handler != null)
                {
                    var e = new ServiceEventArgs(Id, counter);
                    handler(this, e);
                }
            }
        }

        private static void Main()
        {
            const int count = 5;
            var services = new List<Service>();
            for (int i = 1; i <= count; i++)
            {
                services.Add(new Service(i, "Service " + i));
            }

            var persons = new List<Person>();

            for (int i = 1; i <= 10; i++)
            {
                persons.Add(new Person {Id = i, Name = string.Format("Test {0}", i)});
            }

            Console.WriteLine("Number of services: {0}", services.Count);
            Console.WriteLine("Number of persons: {0}", persons.Count);
            Console.WriteLine("Press ENTER to start...");
            Console.ReadLine();

            ManualResetEvent resetEvent = new ManualResetEvent(false);
            int toProcess = services.Count;

            foreach (Service service in services)
            {
                new Thread(() =>
                    {
                        service.ReportProgress += service_ReportProgress;
                        service.Search(persons);
                        if (Interlocked.Decrement(ref toProcess) == 0)
                            resetEvent.Set();
                    }
                    ).Start();
            }

            // Wait for workers.
            resetEvent.WaitOne();

            foreach (Person p in persons)
            {
                if (p.Orders.Count != count)
                    Console.WriteLine("{0,2} Person name: {1}, orders: {2}", p.Id, p.Name, p.Orders.Count);
            }
            Console.WriteLine("END :)");
            Console.ReadLine();
        }

        private static void service_ReportProgress(object sender, ServiceEventArgs e)
        {
            Console.CursorLeft = 0;
            Console.CursorTop = e.SId;
            Console.WriteLine("Id: {0,2}, Name: {1,2} - Progress: {2,2}", e.SId, ((Service) sender).Name, e.Progress);
        }
    }
}

我为服务类添加了自定义 EventArgs,事件。在此配置中,我应该运行 5 个服务,但其中只有 3 个报告进度。
我想象如果我有 5 个服务,我应该有 5 个事件(5 行显示进度)。
这可能是因为线程,但我不知道如何解决这个问题。

示例输出现在如下所示:

Number of services: 5
Number of persons: 10
Press ENTER to start...
Id:  3, Name: Service 3 - Progress: 10
Id:  4, Name: Service 4 - Progress: 10
Id:  5, Name: Service 5 - Progress: 19
END :)

它应该如下所示:

Number of services: 5
Number of persons: 10
Press ENTER to start...
Id:  1, Name: Service 1 - Progress: 10
Id:  2, Name: Service 2 - Progress: 10
Id:  3, Name: Service 3 - Progress: 10
Id:  4, Name: Service 4 - Progress: 10
Id:  5, Name: Service 5 - Progress: 10
END :)

上次编辑
我已将所有线程创建移至单独的类,ServiceManager现在我的代码如下所示:

using System;
using System.Collections.Generic;
using System.Threading;

namespace Testy
{
    internal class Program
    {
        public class ServiceEventArgs : EventArgs
        {
            public ServiceEventArgs(int sId, int progress)
            {
                SId = sId;
                Progress = progress;
            }

            public int SId { get; private set; } // service id
            public int Progress { get; private set; }
        }

        internal class Person
        {
            private static readonly object ordersLock = new object();

            public int Id { get; set; }
            public string Name { get; set; }
            public List<string> Orders { get; private set; }

            public Person()
            {
                Orders = new List<string>();
            }

            public void AddOrder(string order)
            {
                lock (ordersLock) //access across threads
                {
                    Orders.Add(order);
                }
            }
        }

        internal class Service
        {
            public event EventHandler<ServiceEventArgs> ReportProgress;

            public int Id { get; private set; }
            public string Name { get; private set; }

            public Service(int id, string name)
            {
                Id = id;
                Name = name;
            }

            public void Search(List<Person> list)
            {
                int counter = 0;
                foreach (Person p in list)
                {
                    counter++;
                    Search(p);
                    var e = new ServiceEventArgs(Id, counter);
                    OnReportProgress(e);
                }
            }

            private void Search(Person p)
            {
                p.AddOrder(string.Format("Order from {0,2}", Thread.CurrentThread.ManagedThreadId));
                Thread.Sleep(50*Id);
            }

            protected virtual void OnReportProgress(ServiceEventArgs e)
            {
                var handler = ReportProgress;
                if (handler != null)
                {
                    handler(this, e);
                }
            }
        }

        internal static class ServiceManager
        {
            private static IList<Service> _services;

            public static IList<Service> Services
            {
                get
                {
                    if (null == _services)
                        Reload();
                    return _services;
                }
            }

            public static void RunAll(List<Person> persons)
            {
                ManualResetEvent resetEvent = new ManualResetEvent(false);
                int toProcess = _services.Count;

                foreach (Service service in _services)
                {
                    var local = service;
                    local.ReportProgress += ServiceReportProgress;
                    new Thread(() =>
                        {
                            local.Search(persons);
                            if (Interlocked.Decrement(ref toProcess) == 0)
                                resetEvent.Set();
                        }
                        ).Start();
                }
                // Wait for workers.
                resetEvent.WaitOne();
            }

            private static readonly object consoleLock = new object();

            private static void ServiceReportProgress(object sender, ServiceEventArgs e)
            {
                lock (consoleLock)
                {
                    Console.CursorTop = 1 + (e.SId - 1)*2;
                    int progress = (100*e.Progress)/100;
                    RenderConsoleProgress(progress, '■', ConsoleColor.Cyan, String.Format("{0} - {1,3}%", ((Service) sender).Name, progress));
                }
            }

            private static void ConsoleMessage(string message)
            {
                Console.CursorLeft = 0;
                int maxCharacterWidth = Console.WindowWidth - 1;
                if (message.Length > maxCharacterWidth)
                {
                    message = message.Substring(0, maxCharacterWidth - 3) + "...";
                }
                message = message + new string(' ', maxCharacterWidth - message.Length);
                Console.Write(message);
            }

            private static void RenderConsoleProgress(int percentage, char progressBarCharacter,
                                                      ConsoleColor color, string message)
            {
                ConsoleColor originalColor = Console.ForegroundColor;
                Console.ForegroundColor = color;
                Console.CursorLeft = 0;
                int width = Console.WindowWidth - 1;
                var newWidth = (int) ((width*percentage)/100d);
                string progBar = new string(progressBarCharacter, newWidth) + new string(' ', width - newWidth);
                Console.Write(progBar);
                if (!String.IsNullOrEmpty(message))
                {
                    Console.CursorTop++;
                    ConsoleMessage(message);
                    Console.CursorTop--;
                }
                Console.ForegroundColor = originalColor;
            }

            private static void Reload()
            {
                if (null == _services)
                    _services = new List<Service>();
                else
                    _services.Clear();

                for (int i = 1; i <= 5; i++)
                {
                    _services.Add(new Service(i, "Service " + i));
                }
            }
        }

        private static void Main()
        {
            var services = ServiceManager.Services;
            int count = services.Count;

            var persons = new List<Person>();

            for (int i = 1; i <= 100; i++)
            {
                persons.Add(new Person {Id = i, Name = string.Format("Test {0}", i)});
            }

            Console.WriteLine("Services: {0}, Persons: {1}", services.Count, persons.Count);
            Console.WriteLine("Press ENTER to start...");
            Console.ReadLine();
            Console.Clear();
            Console.CursorVisible = false;

            ServiceManager.RunAll(persons);

            foreach (Person p in persons)
            {
                if (p.Orders.Count != count)
                    Console.WriteLine("{0,2} Person name: {1}, orders: {2}", p.Id, p.Name, p.Orders.Count);
            }
            Console.CursorTop = 12;
            Console.CursorLeft = 0;
            Console.WriteLine("END :)");
            Console.CursorVisible = true;
            Console.ReadLine();
        }
    }
}
4

2 回答 2

1

基本上你有一个创建订单的竞争条件。想象一下两个线程的以下执行。

线程 1 检查 Orders 是否为 null 并且它是。
线程 2 检查 Orders 是否为 null 并且它是。
线程 1 将 Orders 设置为一个新列表。
线程 1 获得锁。
线程 1 添加到订单列表。
线程 2 将 Order 设置为一个新列表。(您刚刚丢失了 Thread 1 添加的内容)

您需要在锁中包含订单的创建。

public void AddOrder(string order)
{
    lock (Orders) //access across threads
    {
        if (null == Orders)
            Orders = new List<string>();
        Orders.Add(order);
    }
}

或者你真的应该在 Person 构造函数中创建 Order 列表

public Person()
{
    Orders = new List<Order>();
}

此外,您应该真正创建一个单独的对象进行锁定。

private object ordersLock = new object();


public void AddOrder(string order)
{
    lock (ordersLock) //access across threads
    {
        Orders.Add(order);
    }
}

编辑:

在您创建线程的 foreach 中,您需要创建服务的本地副本以在 lambda 表达式中使用。这是因为 foreach 将更新服务变量,而线程最终可能会捕获错误的变量。所以像这样。

foreach (Service service in services)
{
    Service local = service;
    local.ReportProgress += service_ReportProgress;
    new Thread(() =>
        {
            local.Search(persons);
            if (Interlocked.Decrement(ref toProcess) == 0)
                resetEvent.Set();
        }
    ).Start();
}

请注意,订阅不需要在线程内。

Search或者,您可以在类的方法中移动线程的创建Service

此外,您可能希望在类中创建一个OnReportProgress方法,如下所示:Service

protected virtual void OnReportProgress(ServiceEventArgs e)
{
    EventHandler<ServiceEventArgs> handler = ReportProgress;
    if (handler != null)
    {
        handler(this, e);
    }
}

Search然后在你的方法中调用它。就我个人而言,我会在公共Search方法中调用它并使计数器成为局部变量,以允许Service在另一个列表中重用该对象。

最后,在写入控制台时,您需要在事件处理程序中添加一个额外的锁,以确保一个线程在另一个线程写入输出之前不会更改光标位置。

private static object consoleLock = new object();

private static void service_ReportProgress(object sender, ServiceEventArgs e)
{
    lock (consoleLock)
    {
        Console.CursorLeft = 0;
        Console.CursorTop = e.SId;
        Console.WriteLine("Id: {0}, Name: {1} - Progress: {2}", e.SId, ((Service)sender).Name, e.Progress);
    }
}

此外,您可能希望Console.Clear()在以下位置使用:

...
Console.WriteLine("Number of services: {0}", services.Count);
Console.WriteLine("Number of persons: {0}", persons.Count);
Console.WriteLine("Press ENTER to start...");
Console.Clear();
Console.ReadLine();
...

在写出结束语句之前,您需要更新光标位置。

Console.CursorTop = 6;
Console.WriteLine("END :)");
于 2013-01-28T13:26:44.797 回答
0

这可能无法完全回答您的问题(但我认为您可能有竞争条件),当您开始处理线程时,您需要在从不同线程更新对象时实现适当的同步。您应该确保在任何给定时间只有一个线程能够更新 person 类的实例。p.AddOrder( 应该具有确保只有一个线程在更新 Person 对象的互斥锁。

于 2013-01-28T13:27:17.307 回答