3

编辑和更新 -我现在在我的个人计算机上尝试了相同的代码,它工作得非常好。我能够使用相同的代码复制任何类型的文件而没有任何问题。当我在我的工作计算机上运行代码时,我遇到了这个问题。我只是不明白这如何以及为什么这取决于计算机。请让我知道我是否在这里遗漏了什么。

在 readTask 中,我按顺序读取文件并将字节添加到 BlockingCollection。在消费任务中,我正在读取 BlockingCollection 中出现的数据并将其写入文件。因为,默认情况下,BlockingCollection 是 ConcurrentQueue 的包装器,我希望从阻塞队列中读取的顺序与写入它的顺序相同。但是当我将目标文件与源文件进行比较时,它完全不同,有时我会看到重复。
我的源文件只是一个数字序列,每个数字都在新行上,如下所示。

1
2
3
4
5
6
7
8
9
10

在我的文件中,我有大约 5000 个数字让文件有足够的大小。这段代码有问题吗?或者这不是阻止收集应该起作用的方式。在此示例中,我正在写入文件,但实际上我需要将此文件推送到 Rest API,并且按顺序发送数据很重要。如果字节不能按顺序发送,则文件存储在服务器上时将损坏。

static void Main(string[] args)
    {
        BlockingCollection<byte[]> bc = new BlockingCollection<byte[]>(10);

        Task consumeTask = Task.Factory.StartNew(() =>
        {
            var fs = File.OpenWrite(@"C:\Temp\pass_new.txt");
            foreach (byte[] data in bc.GetConsumingEnumerable())
            {
                fs.Write(data, 0, data.Length);
            }
            fs.Close();
        });

        Task readTask = Task.Factory.StartNew(() =>
        {
            var fs = File.OpenRead(@"C:\Temp\pass.txt");
            var bufferSize = 4096;
            var buffer = new byte[bufferSize];
            int bytesRead = 0;
            while ((bytesRead = fs.Read(buffer, 0, buffer.Length)) != 0)
            {
                byte[] dataToWrite = buffer;
                if (bytesRead < bufferSize)
                {
                    dataToWrite = new byte[bytesRead];
                    Array.Copy(buffer, dataToWrite, bytesRead);
                }
                bc.Add(dataToWrite);
            }
            fs.Close();
        }).ContinueWith(ant => bc.CompleteAdding());

        consumeTask.Wait();

    }
4

2 回答 2

2

我相信这是因为您在bytesRead == bufferSize. 想象一下这两个序列(我在这里松散地使用术语“指针”来指代参考变量,我认为它更好地表达了这一点)。

首先,当您低于缓冲区大小时:

  1. buffer指出内存中大小为 4096 的新字节数组。
  2. fs.Read将 20 个字节写入对象buffer点。
  3. dataToWrite指出与缓冲区相同的对象。
  4. dataToWrite指出一个 20 字节大小的新字节数组。
  5. 您将 20 个字节从对象buffer点复制到对象dataToWrite点。
  6. dataToWrite您在阻塞集合中放置一个指向对象指向的指针。
  7. fs.Read将 30 个字节写入对象buffer点。

现在将其与满足缓冲区大小的情况进行比较。

  1. buffer指出内存中大小为 4096 的新字节数组。
  2. fs.Read将 4096 字节写入对象buffer指向的位置。
  3. dataToWrite指出与缓冲区相同的对象。
  4. dataToWrite您在阻塞集合中放置一个指向对象指向的指针。
  5. fs.Read将 30 个字节写入对象buffer点。

因为dataToWrite,buffer和您添加到阻塞集合中的项目都指向同一个对象,所以最后一个fs.Read将修改刚刚存储在集合中的字节数组。

删除您的 if 语句并始终分配一个新的dataToWrite,您的程序应该可以正常工作。

于 2014-03-13T13:57:27.087 回答
-1

我会尝试 Take() (而不是 .GetConsumingEnumerable())
GetConsumingEnumerable 应该仍然可以工作
即使它是默认值,我也会使用 ConcurrentConcurrentQueue

请参阅文档

BlockingCollection.Take 方法

删除项目的顺序取决于用于创建 BlockingCollection 实例的集合类型。创建 BlockingCollection 对象时,可以指定要使用的集合类型。例如,您可以为先进先出 (FIFO) 行为指定 ConcurrentConcurrentQueue 对象,或为后进先出 (LIFO) 行为指定 ConcurrentStack 对象。

至于重复。我用过很多次,但从来不知道 BlockingCollection 会产生重复项。我用它来提供一个数据库,其中重复会违反约束。

于 2014-03-12T21:52:30.267 回答