4

与我的其他问题相关,除了现在我尝试异步希望它能解决问题。它没有。

我正在尝试创建一个简单的 SOCKS5 服务器。我将浏览器(firefox)设置为将此程序用作 SOCKS5。这个想法是一个程序连接到代理服务器,给它所需的信息,服务器只是简单地从一个连接读取/写入数据到另一个。这个只是简单地做到这一点,不记录也不过滤任何东西。它非常简单,但是由于 CPU 问题以及在您点击几页后连接到站点需要几秒钟的事实,这使得它完全无法使用。这到底是怎么吃掉这么多CPU的?为什么连接网站需要很长时间?异步和同步都受此影响

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Net.Sockets;
using System.Timers;
using System.IO;
using System.Net;
using System.Threading;

namespace ProxyTest
{
    class Program
    {
        static ManualResetEvent tcpClientConnected =new ManualResetEvent(false);
        static void Main(string[] args)
        {
            var s2 = new TcpListener(9998);
            s2.Start();
            Task.Run(() =>
            {
                while (true)
                {
                    tcpClientConnected.Reset();
                    s2.BeginAcceptTcpClient(Blah, s2);
                    tcpClientConnected.WaitOne();
                }
            });
            while (true)
                System.Threading.Thread.Sleep(10000000);
        }

        static void Blah(IAsyncResult ar)
        {
            try
            {
                Console.WriteLine("Connection");
                TcpListener listener = (TcpListener)ar.AsyncState;
                using (var socketin = listener.EndAcceptTcpClient(ar))
                {
                    tcpClientConnected.Set();
                    var ns1 = socketin.GetStream();
                    var r1 = new BinaryReader(ns1);
                    var w1 = new BinaryWriter(ns1);

                    if (false)
                    {
                        var s3 = new TcpClient();
                        s3.Connect("127.0.0.1", 9150);
                        var ns3 = s3.GetStream();
                        var r3 = new BinaryReader(ns3);
                        var w3 = new BinaryWriter(ns3);
                        while (true)
                        {
                            while (ns1.DataAvailable)
                            {
                                var b = ns1.ReadByte();
                                w3.Write((byte)b);
                                //Console.WriteLine("1: {0}", b);
                            }
                            while (ns3.DataAvailable)
                            {
                                var b = ns3.ReadByte();
                                w1.Write((byte)b);
                                Console.WriteLine("2: {0}", b);
                            }
                        }
                    }

                    {
                        if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1))
                            return;
                        var c = r1.ReadByte();
                        for (int i = 0; i < c; ++i)
                            r1.ReadByte();
                        w1.Write((byte)5);
                        w1.Write((byte)0);
                    }
                    {
                        if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1))
                            return;
                        if (r1.ReadByte() != 0)
                            return;
                    }
                    byte[] ipAddr = null;
                    string hostname = null;
                    var type = r1.ReadByte();
                    switch (type)
                    {
                        case 1:
                            ipAddr = r1.ReadBytes(4);
                            break;
                        case 3:
                            hostname = Encoding.ASCII.GetString(r1.ReadBytes(r1.ReadByte()));
                            break;
                        case 4:
                            throw new Exception();
                    }
                    var nhport = r1.ReadInt16();
                    var port = IPAddress.NetworkToHostOrder(nhport);

                    var socketout = new TcpClient();
                    if (hostname != null)
                        socketout.Connect(hostname, port);
                    else
                        socketout.Connect(new IPAddress(ipAddr), port);

                    w1.Write((byte)5);
                    w1.Write((byte)0);
                    w1.Write((byte)0);
                    w1.Write(type);
                    switch (type)
                    {
                        case 1:
                            w1.Write(ipAddr);
                            break;
                        case 2:
                            w1.Write((byte)hostname.Length);
                            w1.Write(Encoding.ASCII.GetBytes(hostname), 0, hostname.Length);
                            break;
                    }
                    w1.Write(nhport);

                    var buf1 = new byte[4096];
                    var buf2 = new byte[4096];
                    var ns2 = socketout.GetStream();
                    var r2 = new BinaryReader(ns2);
                    var w2 = new BinaryWriter(ns2);
                    Task.Run(() =>
                    {
                        var re = new ManualResetEvent(false);
                        while (true)
                        {
                            re.Reset();
                            ns1.BeginRead(buf1, 0, buf1.Length, ReadCallback, new A() { buf = buf1, thisSocket = socketin, otherSocket = socketout, thisStream = ns1, otherStream = ns2, re=re });
                            re.WaitOne();
                        }
                    });
                    Task.Run(() =>
                    {
                        var re = new ManualResetEvent(false);
                        while (true)
                        {
                            re.Reset();
                            ns2.BeginRead(buf2, 0, buf2.Length, ReadCallback, new A() { buf = buf2, thisSocket = socketout, otherSocket = socketin, thisStream = ns2, otherStream = ns1, re = re });
                            re.WaitOne();
                        }
                    });
                    while (true)
                    {
                        if (socketin.Connected == false)
                            return;
                        Thread.Sleep(100);
                    }
                }
            }
            catch { }
        }
        class A { public byte[] buf; public TcpClient thisSocket, otherSocket; public NetworkStream thisStream, otherStream; public ManualResetEvent re;};
        static void ReadCallback(IAsyncResult ar)
        {
            try
            {
                var a = (A)ar.AsyncState;
                var ns1 = a.thisStream;
                var len = ns1.EndRead(ar);
                a.otherStream.Write(a.buf, 0, len);
                a.re.Set();
            }
            catch
            {
            }
        }
    }
}
4

5 回答 5

6

警告:我不得不稍微调整一下,因为我没有使用 4.5。

Task.Run() --> new Thread().Start()

您正在使用太多线程。简单地尝试在 stackoverflow 中加载这个问题会导致产生 30 多个线程,这重现了使用 Task.Run() 看到的行为。

随着您的代码减少到每个连接一个线程,我的 CPU 使用率徘徊在 0% 左右。一切都加载得很快。

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.ComponentModel;
using System.Data;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Net.Sockets;
using System.Timers;
using System.IO;
using System.Net;
using System.Threading;

namespace SOCKS5
{
    static class Program
    {
        static void Main()
        {
            var s2 = new TcpListener(9998);
            s2.Start();

            while (true)
            {
                if (s2.Pending())
                {
                    Thread test = new Thread(() =>
                    {
                        using (TcpClient client = s2.AcceptTcpClient())
                        {
                            Blah(client);
                        }
                    });

                    test.Start();
                }

                Thread.Sleep(10);
            }
        }

        static void Blah(TcpClient listener)
        {
            try
            {
                Console.WriteLine("Connection");
                //TcpListener listener = (TcpListener)ar.AsyncState;


                //tcpClientConnected.Set();
                var ns1 = listener.GetStream();
                var r1 = new BinaryReader(ns1);
                var w1 = new BinaryWriter(ns1);

                if (false)
                {
                    var s3 = new TcpClient();
                    s3.Connect("127.0.0.1", 9150);
                    var ns3 = s3.GetStream();
                    var r3 = new BinaryReader(ns3);
                    var w3 = new BinaryWriter(ns3);
                    while (true)
                    {
                        while (ns1.DataAvailable)
                        {
                            var b = ns1.ReadByte();
                            w3.Write((byte)b);
                            //Console.WriteLine("1: {0}", b);
                        }
                        while (ns3.DataAvailable)
                        {
                            var b = ns3.ReadByte();
                            w1.Write((byte)b);
                            Console.WriteLine("2: {0}", b);
                        }
                    }
                }

                {
                    if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1))
                        return;
                    var c = r1.ReadByte();
                    for (int i = 0; i < c; ++i)
                        r1.ReadByte();
                    w1.Write((byte)5);
                    w1.Write((byte)0);
                }
                {
                    if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1))
                        return;
                    if (r1.ReadByte() != 0)
                        return;
                }
                byte[] ipAddr = null;
                string hostname = null;
                var type = r1.ReadByte();
                switch (type)
                {
                    case 1:
                        ipAddr = r1.ReadBytes(4);
                        break;
                    case 3:
                        hostname = Encoding.ASCII.GetString(r1.ReadBytes(r1.ReadByte()));
                        break;
                    case 4:
                        throw new Exception();
                }
                var nhport = r1.ReadInt16();
                var port = IPAddress.NetworkToHostOrder(nhport);

                var socketout = new TcpClient();
                if (hostname != null)
                    socketout.Connect(hostname, port);
                else
                    socketout.Connect(new IPAddress(ipAddr), port);

                w1.Write((byte)5);
                w1.Write((byte)0);
                w1.Write((byte)0);
                w1.Write(type);
                switch (type)
                {
                    case 1:
                        w1.Write(ipAddr);
                        break;
                    case 2:
                        w1.Write((byte)hostname.Length);
                        w1.Write(Encoding.ASCII.GetBytes(hostname), 0, hostname.Length);
                        break;
                }
                w1.Write(nhport);

                var buf1 = new byte[4096];
                var buf2 = new byte[4096];
                var ns2 = socketout.GetStream();

                DateTime last = DateTime.Now;

                while ((DateTime.Now - last).TotalMinutes < 5.0)
                {
                    if (ns1.DataAvailable)
                    {
                        int size = ns1.Read(buf1, 0, buf1.Length);
                        ns2.Write(buf1, 0, size);
                        last = DateTime.Now;
                    }
                    if (ns2.DataAvailable)
                    {
                        int size = ns2.Read(buf2, 0, buf2.Length);
                        ns1.Write(buf2, 0, size);
                        last = DateTime.Now;
                    }

                    Thread.Sleep(10);
                }
            }
            catch { }
            finally
            {
                try
                {
                    listener.Close();
                }
                catch (Exception) { }
            }
        }
    }
}

编辑:

这最终变得有点有趣。

在通过这个路由 Firefox 流量几个小时后,一些观察结果。

从未注意到确定何时关闭连接的常规模式。让线程在空闲 5 分钟后终止(无 rx/tx)会使线程计数相当低。这是一个非常安全的界限,允许 gmail 聊天等服务继续运行。

出于某种原因,程序偶尔不会收到来自浏览器的请求,这会报告超时。程序中没有关于错过请求的通知,什么都没有。仅在浏览 stackoverflow 时才注意到。还没有想通那个。

于 2013-07-24T16:03:46.093 回答
5

这里发生了一些事情!

异步调用都称为同步风格。如,启动操作的线程调用 WaitOne - 这基本上只是使其等同于同步调用,没有什么不同。

睡眠循环很糟糕。 sleep(1) 循环将快速响应但使用一些 CPU,sleep(1000) 循环将响应缓慢但使用较少 CPU。在睡眠循环中拥有十几个线程并不会占用太多 CPU,但如果线程数不断增加,CPU 使用率将变得非常重要。最好的方法是使用异步调用而不是轮询。

很多任务循环运行。如果没有保证的退出路径,这些会导致线程数猛增。

如果要将数据从套接字 A 转发到套接字 B,则需要在任一套接字关闭时采取行动:停止转发,确保待处理的写入完成并关闭套接字。

当前的实现不能正确确保两个转发任务在关闭时都关闭,如果任务在设置事件之前发生异常,则启动任务然后阻塞手动重置事件的技术可能会失败。这两种情况都会使任务无限运行。

检查Socket.Connected似乎是一件显而易见的事情,但实际上这只是最后一次 IO 操作是否遇到断开连接的缓存。我更喜欢对“零接收”采取行动,这是您断开连接的第一个通知。

我通过 NuGet 使用 PowerThreading 构建了原始同步例程的快速异步版本(这是在框架 4.5 之前执行异步例程的一种方式)。这适用TcpListener于零 CPU 使用率和非常少的线程数。

这可以在 vanilla c# 中使用 async/await 完成......我只是不知道如何:)

using System;
using System.Collections.Generic;
using System.Text;

namespace AeProxy
{
    using System.IO;
    using System.Net;
    using System.Net.Sockets;
    using System.Threading;
    // Need to install Wintellect.Threading via NuGet for this:
    using Wintellect.Threading.AsyncProgModel;

    class Program
    {
        static void Main(string[] args)
        {
            var ae = new AsyncEnumerator() {SyncContext = null};
            var mainOp = ae.BeginExecute(ListenerFiber(ae), null, null);
            // block until main server is finished
            ae.EndExecute(mainOp);
        }

        static IEnumerator<int> ListenerFiber(AsyncEnumerator ae)
        {
            var listeningServer = new TcpListener(IPAddress.Loopback, 9998);
            listeningServer.Start();
            while (!ae.IsCanceled())
            {
                listeningServer.BeginAcceptTcpClient(ae.End(0, listeningServer.EndAcceptTcpClient), null);
                yield return 1;
                if (ae.IsCanceled()) yield break;
                var clientSocket = listeningServer.EndAcceptTcpClient(ae.DequeueAsyncResult());
                var clientAe = new AsyncEnumerator() { SyncContext = null };
                clientAe.BeginExecute(
                    ClientFiber(clientAe, clientSocket),
                    ar =>
                        {
                            try
                            {
                                clientAe.EndExecute(ar);
                            }
                            catch { }
                    }, null);
            }
        }

        static long clients = 0;

        static IEnumerator<int> ClientFiber(AsyncEnumerator ae, TcpClient clientSocket)
        {
            Console.WriteLine("ClientFibers ++{0}", Interlocked.Increment(ref clients));
            try
            {
                // original code to do handshaking and connect to remote host
                var ns1 = clientSocket.GetStream();
                var r1 = new BinaryReader(ns1);
                var w1 = new BinaryWriter(ns1);

                if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1)) yield break;
                var c = r1.ReadByte();
                for (int i = 0; i < c; ++i) r1.ReadByte();
                w1.Write((byte)5);
                w1.Write((byte)0);

                if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1)) yield break;
                if (r1.ReadByte() != 0) yield break;

                byte[] ipAddr = null;
                string hostname = null;
                var type = r1.ReadByte();
                switch (type)
                {
                    case 1:
                        ipAddr = r1.ReadBytes(4);
                        break;
                    case 3:
                        hostname = Encoding.ASCII.GetString(r1.ReadBytes(r1.ReadByte()));
                        break;
                    case 4:
                        throw new Exception();
                }
                var nhport = r1.ReadInt16();
                var port = IPAddress.NetworkToHostOrder(nhport);
                var socketout = new TcpClient();
                if (hostname != null) socketout.Connect(hostname, port);
                else socketout.Connect(new IPAddress(ipAddr), port);
                w1.Write((byte)5);
                w1.Write((byte)0);
                w1.Write((byte)0);
                w1.Write(type);
                switch (type)
                {
                    case 1:
                        w1.Write(ipAddr);
                        break;
                    case 3:
                        w1.Write((byte)hostname.Length);
                        w1.Write(Encoding.ASCII.GetBytes(hostname), 0, hostname.Length);
                        break;
                }
                w1.Write(nhport);
                using (var ns2 = socketout.GetStream())
                {
                    var forwardAe = new AsyncEnumerator() { SyncContext = null };
                    forwardAe.BeginExecute(
                        ForwardingFiber(forwardAe, ns1, ns2), ae.EndVoid(0, forwardAe.EndExecute), null);
                    yield return 1;
                    if (ae.IsCanceled()) yield break;
                    forwardAe.EndExecute(ae.DequeueAsyncResult());
                }
            }
            finally
            {
                Console.WriteLine("ClientFibers --{0}", Interlocked.Decrement(ref clients));
            }
        }

        private enum Operation { OutboundWrite, OutboundRead, InboundRead, InboundWrite } 

        const int bufsize = 4096;

        static IEnumerator<int> ForwardingFiber(AsyncEnumerator ae, NetworkStream inputStream, NetworkStream outputStream)
        {
            while (!ae.IsCanceled())
            {
                byte[] outputRead = new byte[bufsize], outputWrite = new byte[bufsize];
                byte[] inputRead = new byte[bufsize], inputWrite = new byte[bufsize];
                // start off output and input reads.
                // NB ObjectDisposedExceptions can be raised here when a socket is closed while an async read is in progress.
                outputStream.BeginRead(outputRead, 0, bufsize, ae.End(1, ar => outputStream.EndRead(ar)), Operation.OutboundRead);
                inputStream.BeginRead(inputRead, 0, bufsize, ae.End(1, ar => inputStream.EndRead(ar)), Operation.InboundRead);
                var pendingops = 2;
                while (!ae.IsCanceled())
                {
                    // wait for the next operation to complete, the state object passed to each async
                    // call can be used to find out what completed.
                    if (pendingops == 0) yield break;
                    yield return 1;
                    if (!ae.IsCanceled())
                    {
                        int byteCount;
                        var latestEvent = ae.DequeueAsyncResult();
                        var currentOp = (Operation)latestEvent.AsyncState;
                        if (currentOp == Operation.InboundRead)
                        {
                            byteCount = inputStream.EndRead(latestEvent);
                            if (byteCount == 0)
                            {
                                pendingops--;
                                outputStream.Close();
                                continue;
                            }
                            Array.Copy(inputRead, outputWrite, byteCount);
                            outputStream.BeginWrite(outputWrite, 0, byteCount, ae.EndVoid(1, outputStream.EndWrite), Operation.OutboundWrite);
                            inputStream.BeginRead(inputRead, 0, bufsize, ae.End(1, ar => inputStream.EndRead(ar)), Operation.InboundRead);
                        }
                        else if (currentOp == Operation.OutboundRead)
                        {
                            byteCount = outputStream.EndRead(latestEvent);
                            if (byteCount == 0)
                            {
                                pendingops--;
                                inputStream.Close();
                                continue;
                            }
                            Array.Copy(outputRead, inputWrite, byteCount);
                            inputStream.BeginWrite(inputWrite, 0, byteCount, ae.EndVoid(1, inputStream.EndWrite), Operation.InboundWrite);
                            outputStream.BeginRead(outputRead, 0, bufsize, ae.End(1, ar => outputStream.EndRead(ar)), Operation.OutboundRead);
                        }
                        else if (currentOp == Operation.InboundWrite)
                        {
                            inputStream.EndWrite(latestEvent);
                        }
                        else if (currentOp == Operation.OutboundWrite)
                        {
                            outputStream.EndWrite(latestEvent);
                        }
                    }
                }
            }
        }
    }
}
于 2013-07-30T14:22:10.517 回答
0

你应该看看 Overlapped I/O。每个连接一个线程可能工作正常,但总的来说它很糟糕。

于 2013-07-25T06:42:31.677 回答
0

在这一行...

 while (true)
                    System.Threading.Thread.Sleep(10000000);

用一个简单的替换它不会更好:

Console.ReadKey();

是我看到的唯一的 CPU 消耗。

此外,作为建议,您应该限制传入连接的数量并使用线程池模式(在队列或其他东西中)。

于 2013-07-23T22:34:58.890 回答
0

您应该使用异步版本的 TcpClient 方法而不是生成线程。

于 2013-07-29T05:01:41.863 回答