4

这是我在这里的第一个问题,我的名字是安娜!

我的问题:我的客户有几个个人设备(带 gps 的黑匣子)来定位人/车……大约有 12000 人/车使用该设备……它将他们的位置发送到指定的 IP/端口……我可以在那一边不要做任何事情......

我的工作?开发一个侦听器以捕获设备发送的所有数据并使用 .NET 将其加载到数据库中...

我的想法:使用线程的窗口服务(也许是 ThreadPool?)。所以该服务将捕获所有传入的消息,创建一个线程并放入数据库......

这是解决问题的最佳方法吗?我在这里阅读有关消息队列(MSMQ)的信息...您认为我应该使用它吗?

安娜

4

2 回答 2

3

位置/时间的数量以及位置信息的传输方式(是 TCP、UDP 等)将有助于确定最佳方法。

一些问题:

  • 12000 台设备多久发送一次消息?
  • 位置是如何传输的?UDP?TCP?
  • 你有什么可靠性要求?

从它的声音来看,拥有一个可以捕获请求并仅在内部维护一个队列以保存到数据库的服务应该可以正常工作。除非您可以更改传输端,否则我不相信 MSMQ 对您有用,即使那样,它可能有必要也可能没有必要。


编辑:鉴于下面的评论,我建议您有一个 TCP 侦听器将请求传递给线程池来处理。

我会看一下关于使用线程池设置 TCP 侦听服务器的教程。我看到的最大潜在问题是请求的数量——从你所说的来看,你将有大约 400 个请求/秒。如果没有一个非常好的系统,这将是一个挑战。线程池可能会比尝试执行自己的线程执行得更好,因为您希望避免必须不断创建新线程的开销。您肯定希望主处理循环中的延迟非常小(例如 Sleep(0),或者根本不睡眠),因为平均每 2.5 毫秒会有一个请求。单个睡眠往往会在 14-15 毫秒的最小值处进行时间切片,因此您可能不希望在循环中进行任何睡眠。

不过,我想说的是,您可能会发现这不太好用,因为原始连接数量可能有问题。如果有任何方法可以转换为正在发送的 UDP 数据包,它可能会为您带来更好的吞吐量(以牺牲一些可靠性为代价)。

于 2009-04-03T00:54:51.100 回答
2

下面的代码未经测试或仅应被视为指导。它使用线程池来处理所有事情。我将所有内容都放在同一个文件中,仅用于示例。您应该将所有内容分成几个文件。

重要的是不要让所有客户端直接保存到数据库,因为这会使线程池饿死。尝试使用“MaxThreads”常量来获得一个适用于您的数据库/服务器的值。

还要记住,我在下面根本不处理任何异常。例如,您需要在 BeginRead、EndRead 和 TcpListener 方法上处理 SocketException。

我尝试使用最少数量的线程同步锁,代码应该非常有效。瓶颈很可能是数据库。

using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;

namespace FastListener
{
    /// <summary>
    /// Example position class, replace with a real definition
    /// </summary>
    public class Position
    {
        public int X { get; set; }
        public int Y { get; set; }
    }

    /// <summary>
    /// Needed to be able to pass socket/buffer info
    /// between asynchronous requests.
    /// </summary>
    public struct ClientContext
    {
        public Socket socket;
        public byte[] buffer;
    }

    class Program
    {
        /// <summary>
        /// Positions received from mobile clients but not yet saved
        /// into the database.
        /// </summary>
        private readonly Queue<Position> _positions = new Queue<Position>();

        /// <summary>
        /// Number of threads currently saving stuff to the database.
        /// </summary>
        private int _poolThreads;

        /// <summary>
        /// Maximum number of threads that can save info to the database.
        /// </summary>
        private const int MaxThreads = 10;

        static void Main(string[] args)
        {
            new Program().Start();
        }

        private void Start()
        {
            TcpListener listener = new TcpListener(IPAddress.Any, 1343);
            listener.Start(50);
            listener.BeginAcceptSocket(OnAccept, listener);
        }

        // Listener got a new connection
        private void OnAccept(IAsyncResult ar)
        {
            TcpListener listener = (TcpListener) ar.AsyncState;

            // It's very important to start listening ASAP
            // since you'll have a lot of incoming connections.
            listener.BeginAcceptSocket(OnAccept, listener);

            // I recommend that you create a buffer pool to improve performance
            byte[] buffer = new byte[400];

            // Now accept the socket.
            Socket socket = listener.EndAcceptSocket(ar);
            StartRead(new ClientContext {buffer = buffer, socket = socket});
        }

        private void StartRead(ClientContext context)
        {
            // start reading from the client.
            context.socket.BeginReceive(context.buffer, 0, 400, SocketFlags.None, OnReceive, context);
        }


        // Stuff from a client.
        private void OnReceive(IAsyncResult ar)
        {
            ClientContext context = (ClientContext) ar.AsyncState;

            int bytesRead = context.socket.EndReceive(ar);
            if (bytesRead == 0)
            {
                // Put the buffer back in the pool
                context.socket.Close();
                return;
            }

            // convert bytes to position.
            // i'll just fake that here.
            Position pos = new Position();

            // Either handle the request directly
            if (_poolThreads < MaxThreads)
                ThreadPool.QueueUserWorkItem(SaveToDatabase, pos);
            else
            {
                // Or enqueue it to let a already active
                // thread handle it when done with the previous position
                lock (_positions)
                    _positions.Enqueue(pos);
            }

            // Don't forget to read from the client again
            StartRead(context); 
        }

        // will save stuff to the database.
        private void SaveToDatabase(object state)
        {
            // Could use Interlocked.Increment, but not really vital if 
            // one more more extra threads are saving to the db.
            ++_poolThreads; 

            Position position = (Position) state;
            while (true)
            {
                // IMPLEMENT DB SAVE LOGIC HERE.


                // check if another position is in the queue.
                lock (_positions)
                {
                    if (_positions.Count > 0)
                        position = _positions.Dequeue();
                    else
                        break; // jump out of the loop
                }
            }

            --_poolThreads;
        }
    }
}
于 2009-04-04T12:25:16.827 回答