4

我试图开发一个使用异步方法调用的方法管道。流水线的逻辑如下

  1. 集合中有 n 个数据必须输入到管道中的 m 个方法中
  2. 枚举 T 的集合
  3. 将第一个元素提供给第一个方法
  4. 获取输出,异步提供给第二个方法
  5. 同时,将集合的第二个元素喂给第一个方法
  6. 第一种方法完成后,将结果馈送到第二种方法(如果第二种方法仍在运行,则将结果放入其队列并开始执行第一种方法的第三个元素)
  7. 当第二个方法完成执行时,从队列中取出第一个元素并执行等等(每个方法都应该异步运行,没有人应该等待下一个完成)
  8. 在第 m 个方法处,执行完数据后,将结果存储到一个列表中
  9. 在第 m 个方法完成第 n 个元素后,将结果列表(n 个结果)返回到第一级。

我想出了如下代码,但它没有按预期工作,结果永远不会返回,而且它没有按应有的顺序执行。

static class Program
    {
        static void Main(string[] args)
        {
            var list = new List<int> { 1, 2, 3, 4 };
            var result = list.ForEachPipeline(Add, Square, Add, Square);
            foreach (var element in result)
            {
                Console.WriteLine(element);
                Console.WriteLine("---------------------");
            }
            Console.ReadLine();
        }

        private static int Add(int j)
        {
            return j + 1;
        }

        private static int Square(int j)
        {
            return j * j;
        }

        internal static void AddNotify<T>(this List<T> list, T item)
        {
            Console.WriteLine("Adding {0} to the list", item);
            list.Add(item);
        }    
    }

    internal class Function<T>
    {
        private readonly Func<T, T> _func;

        private readonly List<T> _result = new List<T>();
        private readonly Queue<T> DataQueue = new Queue<T>();
        private bool _isBusy;
        static readonly object Sync = new object();
        readonly ManualResetEvent _waitHandle = new ManualResetEvent(false);

        internal Function(Func<T, T> func)
        {
            _func = func;
        }

        internal Function<T> Next { get; set; }
        internal Function<T> Start { get; set; }
        internal int Count;

        internal IEnumerable<T> Execute(IEnumerable<T> source)
        {
            var isSingle = true;
            foreach (var element in source) {
                var result = _func(element);
                if (Next != null)
                {
                    Next.ExecuteAsync(result, _waitHandle);
                    isSingle = false;
                }
                else
                    _result.AddNotify(result);
            }
            if (!isSingle)
                _waitHandle.WaitOne();
            return _result;
        }


        internal void ExecuteAsync(T element, ManualResetEvent resetEvent)
        {
            lock(Sync)
            {
                if(_isBusy)
                {
                    DataQueue.Enqueue(element);
                    return;
                }
                _isBusy = true;

                _func.BeginInvoke(element, CallBack, resetEvent);
            }           
        }

        internal void CallBack(IAsyncResult result)
        {
            bool set = false;
            var worker = (Func<T, T>) ((AsyncResult) result).AsyncDelegate;
            var resultElement = worker.EndInvoke(result);
            var resetEvent = result.AsyncState as ManualResetEvent;

            lock(Sync)
            {
                _isBusy = false;
                if(Next != null)
                    Next.ExecuteAsync(resultElement, resetEvent);
                else
                    Start._result.AddNotify(resultElement);

                if(DataQueue.Count > 1)
                {
                    var element = DataQueue.Dequeue();
                    ExecuteAsync(element, resetEvent);
                }
                if(Start._result.Count == Count)
                    set = true;
            }
            if(set)
              resetEvent.Set();
        }
    }

    public static class Pipe
    {
        public static IEnumerable<T> ForEachPipeline<T>(this IEnumerable<T> source, params Func<T, T>[] pipes)
        {
            Function<T> start = null, previous = null;
            foreach (var function in pipes.Select(pipe => new Function<T>(pipe){ Count = source.Count()}))
            {
                if (start == null)
                {
                    start = previous = function;
                    start.Start = function;
                    continue;
                }
                function.Start = start;
                previous.Next = function;
                previous = function;
            }
            return start != null ? start.Execute(source) : null;
        }
    }

你们能帮我做这件事吗?如果这种设计不适合实际的方法管道,请随时提出不同的建议。

编辑:我必须严格遵守.Net 3.5。

4

3 回答 3

1

采取管道方法的任何特殊原因?IMO,为每个输入启动一个单独的线程,所有函数一个接一个地链接起来,这样编写起来更简单,执行起来也更快。例如,

function T ExecPipe<T>(IEnumerable<Func<T, T>> pipe, T input)
{
  T value = input;
  foreach(var f in pipe)
  {
    value = f(value);
  }
  return value;
}

var pipe = new List<Func<int, int>>() { Add, Square, Add, Square };
var list = new List<int> { 1, 2, 3, 4 };
foreach(var value in list)
{
  ThreadPool.QueueUserWorkItem(o => ExecPipe(pipe, (int)o), value);
}

现在,谈到您的代码,我相信要使用 M 阶段进行准确的管道实现,您必须拥有恰好 M 个线程,因为每个阶段都可以并行执行 - 现在,一些线程可能是空闲的,因为 i/p 尚未到达它们。我不确定您的代码是否正在启动任何线程,以及特定时间的线程数是多少。

于 2010-11-15T10:20:31.177 回答
1

我没有立即在您的代码中发现问题,但您可能有点过于复杂了。这可能是一种更简单的方法来做你想做的事。

public static class Pipe 
{
   public static IEnumerable<T> Execute<T>(
      this IEnumerable<T> input, params Func<T, T>[] functions)
   {
      // each worker will put its result in this array
      var results = new T[input.Count()];

      // launch workers and return a WaitHandle for each one
      var waitHandles = input.Select(
         (element, index) =>
         {
            var waitHandle = new ManualResetEvent(false);
            ThreadPool.QueueUserWorkItem(
               delegate
               {
                  T result = element;
                  foreach (var function in functions)
                  {
                     result = function(result);
                  }
                  results[index] = result;
                  waitHandle.Set();
               });
            return waitHandle;
         });

      // wait for each worker to finish
      foreach (var waitHandle in waitHandles)
      {
          waitHandle.WaitOne();
      }
      return results;
   }
}

这不会像您自己的尝试那样为管道的每个阶段创建锁。我省略了它,因为它似乎没有用。但是,您可以通过包装如下函数轻松添加它:

var wrappedFunctions = functions.Select(x => AddStageLock(x));

这是哪里AddStageLock

private static Func<T,T> AddStageLock<T>(Func<T,T> function)
{
   object stageLock = new object();
   Func<T, T> wrappedFunction =
      x =>
      {
         lock (stageLock)
         {
            return function(x);
         }
      };
   return wrappedFunction;
}

编辑:实现Execute可能会比单线程执行慢,除非要为每个单独元素完成的工作使创建等待句柄和在线程池上调度任务的开销相形见绌,要真正从多线程中受益,你需要限制开销;.NET 4 中的 PLINQ 通过对数据进行分区来做到这一点。

于 2010-11-15T11:06:41.817 回答
0

为什么不为每次迭代中断一个线程并将结果聚合到锁定资源中。你只需要做。可以为此使用 PLinq。我认为您可能将方法误认为资源。如果一个方法正在处理一个包含共享资源的关键块,您只需要锁定一个方法。通过选择一个资源并从那里进入一个新线程,您无需管理您的第二种方法。

IE: Method X 调用 Method1 然后将值传递给 Method2 Foreach 项目在 arr Async(MethodX(item));

于 2010-11-15T10:22:53.197 回答