3

我对队列的功能完全感到困惑。我正在尝试(并且失败)编写一个小型多线程应用程序来收集和显示 C# 中的数据。
在阅读了Albahari 的书并使用他描述的消费者/生产者模式之后,我得到了大部分工作,除了我的数据似乎在队列中被打乱了。在排队之前,我的对象中的字段具有以下值

时间戳 = 6
数据[] ={4936, 9845, 24125, 44861}

出队后的数据看起来像

时间戳 = 6
数据 [] = {64791、19466、47772、65405}

我不明白为什么数据字段中的值在出队后被更改?我很困惑,所以我想我会把它扔在那里,看看是否有人能指出我正确的方向来解决这个问题,或者让我指向不同的方向继续前进。


相关代码


用于数据存储的自定义对象

相关对象和字段。sensorData 类是一个单独的类,用于存储我的计算。

public class sensorData
{
    public const int bufSize = 4;
    public UInt16[] data = new UInt16[4];
    public double TimeStamp = 0; 
    public int timeIndex = 0;
}

以下字段用于设置入队和出队线程之间的队列和信号。

EventWaitHandle wh = new AutoResetEvent(false);
Queue<sensorData> dataQ = new Queue<sensorData>();
object locker = new object();

入队方法/线程

这是我的工作线程,它计算四个正弦曲线并将结果排队等待处理。我还将结果写入一个文件,这样我就知道它计算了什么。

private void calculateAndEnqueueData(BackgroundWorker worker, DoWorkEventArgs e)
{
    int j = 0;
    double time = 0;
    double dist;
    UInt16[] intDist = new UInt16[sensorData.bufSize];
    UInt16 uint16Dist;

    // Frequencies of the four Sine curves
    double[] myFrequency = { 1, 2, 5, 10 };

    // Creates the output file.
    StreamWriter sw2 = File.CreateText("c:\\tmp\\QueuedDataTest.txt"); 

    // Main loop to calculate my Sine curves
    while (!worker.CancellationPending)
    {
        // Calculate four Sine curves
        for (int i = 0; i < collectedData.numberOfChannels; i++)
        {
            dist = Math.Abs(Math.Sin(2.0 * Math.PI * myFrequency[i] * time);
            uint16Dist = (UInt16)dist;
            intDist[i] = uint16Dist;
        }

        // Bundle the results and Enqueue them
        sensorData dat = new sensorData();
        dat.data = intDist;
        dat.TimeStamp = time;
        dat.timeIndex = j;

        lock (locker) dataQ.Enqueue(dat);
        wh.Set

        // Output results to file.
        sw2.Write(j.ToString() + ", ");
        foreach (UInt16 dd in dat.intData)
        {
            sw2.Write(dd.ToString() + ", ");
        }
        sw2.WriteLine();

        // Increments time and index.
        j++;
        time += 1 / collectedData.signalFrequency;

        Thread.Sleep(2);
    }
    // Clean up
    sw2.Close();
    lock (locker) dataQ.Enqueue(null);
    wh.Set();
    sw2.Close();
}

输出文件QueuedDataTest.txt中的示例行

6、4936、9845、24125、44861、

出队数据方法

此方法从队列中取出元素并将它们写入文件。直到在队列中找到一个空元素,此时作业完成。

    private void dequeueDataMethod()
    {
        StreamWriter sw = File.CreateText("C:\\tmp\\DequeueDataTest.txt");

        while (true)
        {
            sensorData data = null;

            // Dequeue available element if any are there.
            lock (locker)
                if (dataQ.Count > 0)
                {
                    data = dataQ.Dequeue();
                    if (data == null)
                    {
                        sw.Close();
                        return;
                    }
                }

            // Check to see if an element was dequeued. If it was write it to file.
            if (data != null)
            {
                sw.Write(data.timeIndex.ToString() + ", ");
                foreach (UInt16 dd in data.data)
                    sw.Write(dd.ToString() + ", ");
                sw.WriteLine();
            }
            else
            {
                wh.WaitOne();
            }
        }

数据出列并写入DequeueDataTest.txt后的输出结果

6、64791、19466、47772、65405、


更新1:

当前代码中锁的位置。


我已经编辑了代码以锁定将数据写入文件。所以我有锁的代码块如下。

CalculateAndEnqueueData()方法中,我有

lock (locker) dataQ.Enqueue(dat);
wh.Set

lock(locker)
{
  sw2.Write(j.ToString() + ", ");
  foreach (UInt16 dd in dat.intData)
  {
     sw2.Write(dd.ToString() + ", ");
  }
  sw2.WriteLine();
}

dequeueDataMethod()我有两个带锁的区域第一个在这里

lock(locker) 
    if (dataQ.Count > 0)
    {
       data = dataQ.Dequeue();
       if (data == null)
       {
           sw.Close();
           return;
        }
    }

我假设为if块中的代码锁定了储物柜。第二个是我在这里写入文件的地方

lock (locker)
{
    sw.Write(data.timeIndex.ToString() + ", ");
    foreach (UInt16 dd in data.intData)
        sw.Write(dd.ToString() + ", ");
    sw.WriteLine();
    if (icnt > 10)
    {
        sw.Close();
        return;
    }
    this.label1.Text = dataQ.Count.ToString();
}

这就是他们的全部。


4

2 回答 2

9

问题是由于您正在写入的 StreamWriter 上没有同步。顺序不是顺序的。

于 2009-01-26T00:52:14.883 回答
1

是因为你UInt16[] intDist一遍又一遍地写入同一个数组吗?您不应该为每个sensorData对象使用单独的数组吗?(顺便说一句,sensorData.intData假设sensorData.data在您的示例代码中?)

澄清:

中只intDist创建了一个数组calculateAndEnqueueData(),所以不同sensorData的实例都共享同一个数组——如果添加+写入和删除+写入发生在锁步中,这是可以的;否则,某些数据点可能会丢失/重复。

建议:

直接填充sensorData实例,不使用intDist数组,在calculateAndEnqueueData()

    // create new sensorData instance
    sensorData dat = new sensorData();
    dat.TimeStamp = time;
    dat.timeIndex = j;

    // Calculate four Sine curves
    for (int i = 0; i < collectedData.numberOfChannels; i++)
    {
        dat.data[i] = (UInt16) Math.Abs(Math.Sin(2.0 * Math.PI * myFrequency[i] * time);
    }

    // enqueue
    lock (locker) dataQ.Enqueue(dat);
于 2009-01-26T01:15:19.737 回答