1

我一直在尝试在 C# 中实现分布式深度优先搜索。我已经成功到了一定程度,但出现了同步错误。我无法纠正错误。我想要做的是让每个节点使用任务并行数据流相互通信,从而在 DFS 中实现并行性。下面是我的代码:

public class DFS
{
static List<string> traversedList = new List<string>();

static List<string> parentList = new List<string>();
static Thread[] thread_array;
static BufferBlock<Object> buffer1 = new BufferBlock<Object>();

public static void Main(string[] args)
{

    int N = 100;
    int M = N * 4;
    int P = N * 16;

    Stopwatch stopwatch = new Stopwatch();
    stopwatch.Start();

    List<string> global_list = new List<string>();

    StreamReader file = new StreamReader(args[args.Length - 2]);


    string text = file.ReadToEnd();

    string[] lines = text.Split('\n');



    string[][] array1 = new string[lines.Length][];

    for (int i = 0; i < lines.Length; i++)
    {
        lines[i] = lines[i].Trim();
        string[] words = lines[i].Split(' ');

        array1[i] = new string[words.Length];

        for (int j = 0; j < words.Length; j++)
        {
            array1[i][j] = words[j];
        }
    }

    StreamWriter sr = new StreamWriter("E:\\Newtext1.txt");

    for (int i = 0; i < array1.Length; i++)
    {
        for (int j = 0; j < array1[i].Length; j++)
        {
            if (j != 0)
            {
                sr.Write(array1[i][0] + ":" + array1[i][j]);
                Console.WriteLine(array1[i][0] + ":" + array1[i][j]);
                sr.Write(sr.NewLine);
            }
        }

    }
    int start_no = Convert.ToInt32(args[args.Length - 1]);
    thread_array = new Thread[lines.Length];
    string first_message = "root";
    buffer1.Post(first_message);
    buffer1.Post(array1);
    buffer1.Post(start_no);
    buffer1.Post(1);

    for (int t = 1; t < lines.Length; t++)
    {
        Console.WriteLine("thread" + t);
        thread_array[t] = new Thread(new ThreadStart(thread_run));
        thread_array[t].Name = t.ToString();
        lock (thread_array[t])
        {
            Console.WriteLine("working");
            thread_array[t].Start();
            thread_array[t].Join();
        }

    }
    stopwatch.Stop();

    Console.WriteLine(stopwatch.Elapsed);
    Console.ReadLine();
}

private static void dfs(string[][] array, int point)
{
    for (int z = 1; z < array[point].Length; z++)
    {
        if ((!traversedList.Contains(array[point][z])))
        {
            traversedList.Add(array[point][z]);
            parentList.Add(point.ToString());
            dfs(array, int.Parse(array[point][z]));
        }

    }
    return;


}
public static void thread_run()
{
    try
    {
        string parent;
        string[][] array1;
        int point;
        int id;
        parent = (string)buffer1.Receive();
        array1 = (string[][])buffer1.Receive();
        point = (int)buffer1.Receive();
        id = (int)buffer1.Receive();
        object value;
        Console.WriteLine("times");

        if (Thread.CurrentThread.Name.Equals(point.ToString()))
        {
            if (!traversedList.Contains(point.ToString()))
            {
                Console.WriteLine("Node:" + point + " Parent:" + parent + " Id:" + id);
                traversedList.Add(point.ToString());
                parent = point.ToString();
                for (int x = 1; x < array1[point].Length; x++)
                {
                    Console.WriteLine("times");
                    if (buffer1.TryReceive(out value))
                    {
                        array1 = (string[][])value;
                    }
                    if (buffer1.TryReceive(out value))
                    {
                        id = (int)buffer1.Receive();
                    }
                    id++;
                    buffer1.Post(parent);
                    buffer1.Post(array1);
                    buffer1.Post(x);
                    buffer1.Post(id);
                    Console.WriteLine("times");
                    Monitor.PulseAll(Thread.CurrentThread);
                }

                //return;
            }
            else
            {
                buffer1.Post(parent);
                buffer1.Post(array1);
                buffer1.Post(point);
                buffer1.Post(id);
                Console.WriteLine("working 1");
                Monitor.PulseAll(Thread.CurrentThread);
            }
        }
        else
        {
            Console.WriteLine("working 2");
            Monitor.Wait(Thread.CurrentThread);
        }
        //Console.WriteLine(parent);
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.Message);
    }

}

}

在此处输入图像描述

4

3 回答 3

3

您的代码存在各种问题。

不正确使用锁定和从多个线程“接触” traversedList 是最明显的问题。

更重要的是,您的代码并没有真正使用 Dataflow,它以类似于 ConcurrentQueue 或任何其他并发集合的方式使用 BufferBlock。数据流的重点是使用ActionBlocks而不是线程来简化处理。默认情况下,操作块将仅使用单个线程进行处理,但您可以通过DataflowBlockOptions类指定任意数量的线程。

ActionBlock 有自己的输入和输出缓冲区,因此您不必为了缓冲而添加额外的 BufferBlock。

将多个相关值传递给一个块是另一个问题,因为它可能导致错误并使代码混乱。创建一个数据结构来保存所有值不需要任何成本。

假设您使用此类来保存处理消息:

    public class PointMessage
    {
        public string Message { get; set; }
        public string[][] Lines{get;set;}
        public int Point { get; set; }
        public int ID { get; set; }
    }

您可以创建一个 ActionBlock 来处理这些消息,如下所示:

static ActionBlock<PointMessage> _block;
...
var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExecutionDataflowBlockOptions.Unbounded };
_block=new ActionBlock<PointMessage>(msg=>ProcessMessage(msg),options);

并像这样处理每条消息:

    private static void ProcessMessage(PointMessage arg)
    {
        if (...)
        {
            ...
            arg.ID++;
            _block.Post(arg);
        }
        else
        {
             ...
            _block.Post(arg);
        }
    }

如果您的函数返回一个值,您可以使用TransformBlock而不是 ActionBlock。

我不明白你的代码做了什么,所以我不会尝试使用 DataFlow 重写它。如果你把它清理一下,它会更容易提供帮助。

于 2012-06-05T08:12:36.510 回答
1

问题是线程需要拥有监视器才能调用等待。因此,您需要锁定 Monitor.PulseAll 以及 Monitor.Wait 以确保您不会再遇到此类错误。

如果您需要我向您解释锁定,请打开另一个问题,我会完整解释!:)

于 2012-06-05T04:41:06.477 回答
0

编辑:忽略我的帖子-改为阅读@PanagiotisKanavos 的帖子...

这不会编译,但会为您设置使用锁的正确方向:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Threading;

public class DFS
{
    static List<string> traversedList = new List<string>();

    static List<string> parentList = new List<string>();
    static Thread[] thread_array;
    //static BufferBlock<Object> buffer1 = new BufferBlock<Object>();

    public static void Main(string[] args)
    {

        int N = 100;
        int M = N * 4;
        int P = N * 16;

        Stopwatch stopwatch = new Stopwatch();
        stopwatch.Start();

        List<string> global_list = new List<string>();

        StreamReader file = new StreamReader(args[args.Length - 2]);


        string text = file.ReadToEnd();

        string[] lines = text.Split('\n');



        string[][] array1 = new string[lines.Length][];

        for (int i = 0; i < lines.Length; i++)
        {
            lines[i] = lines[i].Trim();
            string[] words = lines[i].Split(' ');

            array1[i] = new string[words.Length];

            for (int j = 0; j < words.Length; j++)
            {
                array1[i][j] = words[j];
            }
        }

        StreamWriter sr = new StreamWriter("E:\\Newtext1.txt");

        for (int i = 0; i < array1.Length; i++)
        {
            for (int j = 0; j < array1[i].Length; j++)
            {
                if (j != 0)
                {
                    sr.Write(array1[i][0] + ":" + array1[i][j]);
                    Console.WriteLine(array1[i][0] + ":" + array1[i][j]);
                    sr.Write(sr.NewLine);
                }
            }

        }
        int start_no = Convert.ToInt32(args[args.Length - 1]);
        thread_array = new Thread[lines.Length];
        string first_message = "root";
        //buffer1.Post(first_message);
        //buffer1.Post(array1);
        //buffer1.Post(start_no);
        //buffer1.Post(1);

        for (int t = 1; t < lines.Length; t++)
        {
            Console.WriteLine("thread" + t);
            thread_array[t] = new Thread(new ThreadStart(thread_run));
            thread_array[t].Name = t.ToString();
            lock (thread_array[t])
            {
                Console.WriteLine("working");
                thread_array[t].Start();
                thread_array[t].Join();
            }

        }
        stopwatch.Stop();

        Console.WriteLine(stopwatch.Elapsed);
        Console.ReadLine();
    }

    private static void dfs(string[][] array, int point)
    {
        for (int z = 1; z < array[point].Length; z++)
        {
            if ((!traversedList.Contains(array[point][z])))
            {
                traversedList.Add(array[point][z]);
                parentList.Add(point.ToString());
                dfs(array, int.Parse(array[point][z]));
            }

        }
        return;


    }

    bool busy;
    private readonly object syncLock = new object();

    public static void thread_run()
    {
        try
        {
            string parent;
            string[][] array1;
            int point;
            int id;
            //parent = (string)buffer1.Receive();
            //array1 = (string[][])buffer1.Receive();
            //point = (int)buffer1.Receive();
            //id = (int)buffer1.Receive();
            object value;
            Console.WriteLine("times");

            if (Thread.CurrentThread.Name.Equals("Point.ToString()"))
            {
                if (!traversedList.Contains("Point.ToString()"))
                {
                    for (int x = 1; x < 99999; x++)
                    {
                        Console.WriteLine("times");
                        //if (buffer1.TryReceive(out value))
                        //{
                        //    array1 = (string[][])value;
                        //}
                        //if (buffer1.TryReceive(out value))
                        //{
                        //    id = (int)buffer1.Receive();
                        //}
                        //id++;
                        //buffer1.Post(parent);
                        //buffer1.Post(array1);
                        //buffer1.Post(x);
                        //buffer1.Post(id);
                        Console.WriteLine("times");

                        lock (syncLock)
                        {
                            while (busy)
                            {
                                busy = false;
                                Monitor.PulseAll(Thread.CurrentThread);
                            }
                            busy = true; // we've got it!
                        }


                    }

                    //return;
                }
                else
                {
                    //buffer1.Post(parent);
                    //buffer1.Post(array1);
                    //buffer1.Post(point);
                    //buffer1.Post(id);
                    lock (syncLock)
                    {
                        while (busy)
                        {
                            busy = false;
                            Monitor.PulseAll(Thread.CurrentThread);
                        }
                        busy = true; // we've got it!
                    }
                }
            }
            else
            {
                Console.WriteLine("working 2");
                lock (syncLock)
                {
                    while (busy)
                    {
                        Monitor.Wait(Thread.CurrentThread);
                    }
                    busy = true; // we've got it!
                }

            }
            //Console.WriteLine(parent);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }

    }

}
于 2012-06-05T06:29:44.260 回答