2

我已经构建了一个 Windows 服务,它使用 ClientWebSocket 实时订阅大约 10,000 个股票代码。如果我订阅 1,000 个代码,我会收到所有数据点(每秒接收几百条消息),一旦我收到 2,000 个代码,我似乎没有收到我应该收到的数据,10,000(接收每秒数千条消息)更糟。我运行了比较报告,看起来我丢失了多达 60% 的数据包。我已经就这个问题与多边形(实时数据的提供者)进行了交谈,他们声称他们的 Socket 是一根消防软管,所有应该熄灭的东西都会熄灭,而且他们的其他客户都没有抱怨。所以这里唯一合乎逻辑的事情就是假设它是我的代码,或者一些限制。也许它' 接收方法的任务部分?也许窗口有一个最大任务限制,我超过了它。

我还在具有 10gb 连接的高性能专用服务器上对此进行了测试,因此它似乎不是连接或硬件限制。

我也通过了我的 BlockingCollection 缓存,问题仍然存在。

望各位大侠有见解,谢谢!

这是我的代码:

        public static ConcurrentDictionary<string, TradeObj> TradeFeed = new ConcurrentDictionary<string, TradeObj>();
        public static ConcurrentDictionary<string, QuoteObj> QuoteFeed = new ConcurrentDictionary<string, QuoteObj>();
        public static ConcurrentDictionary<string, AggObj> AggFeed = new ConcurrentDictionary<string, AggObj>();
 public static BlockingCollection<byte[]> packets = new BlockingCollection<byte[]>();

        private static void Start(string[] args)
        {
            try
            {
                Polygon.StartSub();

                int HowManyConsumers = 2;

                  for (int i = 0; i < HowManyConsumers; i++)
                  {
                      Task.Factory.StartNew(Polygon.ConsumePackets);
                  }

            } catch(Exception e)
            {
                Console.WriteLine(e.Message);
            }

            Console.ReadKey();
        }

        public static async Task StartSub()
        {
            do
            {
                using (var socket = new ClientWebSocket())
                    try
                    {
                       // socket.Options.KeepAliveInterval = TimeSpan.Zero;
                        var Connection = "wss://socket.polygon.io/stocks";

                        await socket.ConnectAsync(new Uri(Connection), CancellationToken.None);

                        Console.WriteLine("Websocket opened to Polygon.");
                        await Send(socket, "{\"action\":\"auth\",\"params\":\""+ConfigurationManager.AppSettings["PolygonAPIToken"]+"\"}");
                        List<List<string>> batches = new List<List<string>>();

                        for (int i = 0; i < FeedCache.Tickers.Count(); i += 500)
                        {
                            var tempList = new List<string>();
                            tempList.AddRange(FeedCache.Tickers.Skip(i).Take(500));
                            batches.Add(tempList);
                        }

                        int bNum = 0;
                        string[] quoteStrings = new string[batches.Count()];

                        foreach (var tList in batches)
                        {
                            var tQuery = "";

                            tQuery = tQuery + "T." + string.Join(",T.", tList.ToArray());
                            tQuery = tQuery + ",A." + string.Join(",A.", tList.ToArray());
                            tQuery = tQuery + ",Q." + string.Join(",Q.", tList.ToArray());
                            quoteStrings[bNum] = tQuery;
                            bNum++;
                        }

                        for (int i = 0; i < quoteStrings.Count(); i++)
                        {
                            string SubscribeString = "{\"action\":\"subscribe\",\"params\":\"" + quoteStrings[i] + "\"}";
                            await Send(socket, SubscribeString);
                        }


                        await Receive(socket);

                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine($"ERROR - {ex.Message}");
                        Console.WriteLine(ex.ToString());
                    }
            } while (true);
        }

        static async Task Send(ClientWebSocket socket, string data)
        {
            var segment = new ArraySegment<byte>(Encoding.UTF8.GetBytes(data));
            await socket.SendAsync(segment, WebSocketMessageType.Text, true, CancellationToken.None);
        }


        static async Task Receive(ClientWebSocket socket)
        {

            do {
                WebSocketReceiveResult result;
            var buffer = new ArraySegment<byte>(new byte[2000]);
                using (var ms = new MemoryStream())
                {
                    do
                    {
                        result = await socket.ReceiveAsync(buffer, CancellationToken.None);
                        ms.Write(buffer.Array, buffer.Offset, result.Count);
                    } while (!result.EndOfMessage);


                    if (result.MessageType == WebSocketMessageType.Close)
                    {
                        await socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closed in server by the client", CancellationToken.None);
                        Console.WriteLine("Socket disconnecting, trying to reconnect.");
                        await StartSub();
                    }
                    else
                    {
                     packets.Add(ms.ToArray());
                    }
                }
            } while (true);
        }

        public static async void ConsumePackets()
        {

            foreach (var buffer in packets.GetConsumingEnumerable())
            {

                using (var ms = new MemoryStream(buffer))
                {
                    ms.Seek(0, SeekOrigin.Begin);
                    using (var reader = new StreamReader(ms, Encoding.UTF8))
                    {

                        var data = await reader.ReadToEndAsync();
                        try
                        {
                            var j = JArray.Parse(data);


                        if (j != null)
                        {
                            string id = (string)j[0]["ev"];
                            switch (id)
                            {
                                case "T":
                                    AddOrUpdateTrade((string)j[0]["sym"], j);
                                    break;
                                case "Q":
                                    AddOrUpdateQuote((string)j[0]["sym"], j);
                                    break;
                                case "A":
                                    AddOrUpdateAgg((string)j[0]["sym"], j);
                                    break;
                            }
                        }
                        }
                        catch (Exception e)
                        {
                            Console.WriteLine(e.ToString());
                        }
                    }

                }

            }
        }

        public static void AddOrUpdateTrade(string ticker, JArray data)
        {

            TradeFeed.AddOrUpdate(ticker, new TradeObj {
                LastPrice = (double)data[0]["p"],
                TradeCount = 1
            }, (key, existingVal) =>
            {
                return new TradeObj {
                    LastPrice = (double)data[0]["p"],
                    TradeCount = existingVal.TradeCount + 1,
                    PriceDirection = (double)data[0]["p"] < existingVal.LastPrice ? "D" : "U"
                };
            });

        }

        public static void AddOrUpdateAgg(string ticker, JArray data)
        {

            AggFeed.AddOrUpdate(ticker, new AggObj
            {
                TickVolume = (long)data[0]["v"],
                VolumeShare = (long)data[0]["av"],
                OpenPrice = (double)data[0]["op"],
                TickAverage = (double)data[0]["a"],
                VWAP = (double)data[0]["vw"],
                TickClosePrice = (double)data[0]["c"],
                TickHighPrice = (double)data[0]["h"],
                TickLowPrice = (double)data[0]["l"],
                TickOpenPrice = (double)data[0]["o"]
            }, (key, existingVal) =>
            {
                return new AggObj
                {
                    TickVolume = (long)data[0]["v"],
                    VolumeShare = (long)data[0]["av"],
                    OpenPrice = (double)data[0]["op"],
                    TickAverage = (double)data[0]["a"],
                    VWAP = (double)data[0]["vw"],
                    TickClosePrice = (double)data[0]["c"],
                    TickHighPrice = (double)data[0]["h"],
                    TickLowPrice = (double)data[0]["l"],
                    TickOpenPrice = (double)data[0]["o"]
                };
            });

        }

        public static void AddOrUpdateQuote(string ticker, JArray data)
        {

            QuoteFeed.AddOrUpdate(ticker, new QuoteObj
            {
                BidPrice = (double)data[0]["bp"],
                BidSize = (double)data[0]["bs"],
                AskPrice = (double)data[0]["ap"],
                AskSize = (double)data[0]["as"]
            }, (key, existingVal) =>
            {
                return new QuoteObj
                {
                    BidPrice = (double)data[0]["bp"],
                    BidSize = (double)data[0]["bs"],
                    AskPrice = (double)data[0]["ap"],
                    AskSize = (double)data[0]["as"]
                };
            });

        }

4

0 回答 0