0

我正在处理套接字编程 tcp/ip 中的 c# concurrent-queue 和多线程

首先,我已经完成了套接字编程本身。这意味着,我已经完成了关于客户端、服务器和通信本身的编码

基本结构是流水线的(生产者-消费者问题),现在我正在做位转换

以下是关于我的代码的简短摘要

client-socket -> server-socket -> concurrent_queue_1(byte[65536],Thread_1 处理这个) -> concurrent_queue_2(double[40,3500], Thread_2 处理这个) -> display-data 或其他工作(它可以是gpu-work)

*(double[40,3500]可以改成其他大小)

到目前为止,我已经将 put_data 放入 queue1(Thread1) 并仅将所有 (Thread2) 出列,它的速度约为 700Mbps

我使用两个 concurrent_queue 的原因是,我希望在后台处理通信和类型转换工作,而不管有关控制事物的主要过程。

这是关于我自己的带有阻塞的 concurrent_queue 的代码

public class BlockingConcurrentQueue<T> : IDisposable
    {
        private readonly ConcurrentQueue<T> _internalQueue;
        private AutoResetEvent _autoResetEvent;
        private long _consumed;
        private long _isAddingCompleted = 0;
        private long _produced;
        private long _sleeping;

        public BlockingConcurrentQueue()
        {
            _internalQueue = new ConcurrentQueue<T>();
            _produced = 0;
            _consumed = 0;
            _sleeping = 0;
            _autoResetEvent = new AutoResetEvent(false);
        }
        public bool IsAddingCompleted
        {
            get
            {
                return Interlocked.Read(ref _isAddingCompleted) == 1;
            }
        }
        public bool IsCompleted
        {
            get
            {
                if (Interlocked.Read(ref _isAddingCompleted) == 1 && _internalQueue.IsEmpty)
                    return true;
                else
                    return false;
            }
        }
        public void CompleteAdding()
        {
            Interlocked.Exchange(ref _isAddingCompleted, 1);
        }
        public void Dispose()
        {
            _autoResetEvent.Dispose();
        }
        public void Enqueue(T item)
        {
            _internalQueue.Enqueue(item);

            if (Interlocked.Read(ref _isAddingCompleted) == 1)
                throw new InvalidOperationException("Adding Completed.");

            Interlocked.Increment(ref _produced);
            if (Interlocked.Read(ref _sleeping) == 1)
            {
                Interlocked.Exchange(ref _sleeping, 0);
                _autoResetEvent.Set();
            }
        }
        public bool TryDequeue(out T result)
        {
            if (Interlocked.Read(ref _consumed) == Interlocked.Read(ref _produced))
            {
                Interlocked.Exchange(ref _sleeping, 1);
                _autoResetEvent.WaitOne();
            }
            if (_internalQueue.TryDequeue(out result))
            {
                Interlocked.Increment(ref _consumed);
                return true;
            }
            return false;
        }
    }

我的问题在这里

正如我上面提到的,concurrent_queue1 的类型是 byte[65536] 并且 65536 bytes = 8192 double data。(40 * 3500=8192 * 17.08984375) 我想将多个 8192 双数据合并成双 [40,3500] 的形式(大小可以更改)并使用 Thread2 排队到 concurrent_queue2

使用简单的方法(使用许多复杂的 for 循环)很容易做到这一点,但速度很慢,它会复制所有数据并暴露给上层或层。

我正在搜索自动以匹配大小排队的方法,例如 foreach 循环自动以行主要方式遍历二维数组,尚未找到

有没有什么快速的方法可以将 1D 字节数组合并为 2D 双数组形式并将其入队?

谢谢你的帮助!

4

1 回答 1

0

我试图理解你的转换规则,所以我写了这个转换代码。用于Parallel加快计算速度。

int maxSize = 65536;
byte[] dim1Array = new byte[maxSize];
for (int i = 0; i < maxSize; ++i)
{
    dim1Array[i] = byte.Parse((i % 256).ToString());
}

int dim2Row = 40;
int dim2Column = 3500;
int byteToDoubleRatio = 8;
int toDoubleSize = maxSize / byteToDoubleRatio;
double[,] dim2Array = new double[dim2Row, dim2Column];
Parallel.For(0, toDoubleSize, i =>
{
    int row = i / dim2Column;
    int col = i % dim2Column;
    int originByteIndex = row * dim2Column * byteToDoubleRatio + col * byteToDoubleRatio;
    dim2Array[row, col] = BitConverter.ToDouble(
        dim1Array,
        originByteIndex);
});
于 2020-07-27T06:09:50.807 回答