0

我正在构建一个蜡烛记录器(Binance Crypto),对 1 分钟的蜡烛很感兴趣,包括用于市场研究目的的内部蜡烛数据(但最终我可以使用相同的代码来实际关注市场上发生的事情)

为了避免最终的延迟/EF/SQL 性能等。我决定使用两个线程来完成这个。

一个从 Binance 接收订阅的(异步)令牌并将它们放入 ConcurrentQueue,而另一个则不断尝试出列并将数据保存在 MSSQL 中

我的问题是关于第二个线程,一个 while(true) 循环。使用 EF 将这些信息单独输入(有时在 300 毫秒内 300 条信息,有时更少)时,将 200 + 信息/秒保存到 SQL 的最佳方法是什么:

每次我想保存时都应该打开 SQL con 吗?(表现)。实现这一目标的最佳方法是什么?

-- 已编辑 -- 有一次我在队列中获得了 600k+,所以我在插入 SQL 时遇到了问题 从 Linq 更改为 SQL 到 EF

这是我的实际代码:

//Initialize 
        public void getCoinsMoves()
        {
            Thread THTransferDatatoSQL = new Thread(TransferDatatoSQL);
            THTransferDatatoSQL.Name = "THTransferDatatoSQL";
            THTransferDatatoSQL.SetApartmentState(ApartmentState.STA);
            THTransferDatatoSQL.IsBackground = true;
            THTransferDatatoSQL.Start();

            List<string> SymbolsMap;
            using(DBBINANCEEntities lSQLBINANCE = new DBBINANCEEntities())
            {
                SymbolsMap = lSQLBINANCE.TB_SYMBOLS_MAP.Select(h => h.SYMBOL).ToList();
            }

            socketClient.Spot.SubscribeToKlineUpdatesAsync(SymbolsMap, Binance.Net.Enums.KlineInterval.OneMinute, h =>
            {
                RecordCandles(h);
            });
        }

//Enqueue Data
        public void RecordCandles(Binance.Net.Interfaces.IBinanceStreamKlineData Candle)
        {
            FRACTIONED_CANDLES.Enqueue(new TB_FRACTIONED_CANDLES_DATA()
            {
                BASE_VOLUME = Candle.Data.BaseVolume,
                CLOSE_TIME = Candle.Data.CloseTime.AddHours(-3),
                MONEY_VOLUME = Candle.Data.QuoteVolume,
                PCLOSE = Candle.Data.Close,
                PHIGH = Candle.Data.High,
                PLOW = Candle.Data.Low,
                POPEN = Candle.Data.Open,
                SYMBOL = Candle.Symbol,
                TAKER_BUY_BASE_VOLUME = Candle.Data.TakerBuyBaseVolume,
                TAKER_BUY_MONEY_VOLUME = Candle.Data.TakerBuyQuoteVolume,
                TRADES = Candle.Data.TradeCount,
                IS_LAST_CANDLE = Candle.Data.Final
            });
        }

//Transfer Data to SQL
        public void TransferDatatoSQL()
        {
            while (true)
            {
                TB_FRACTIONED_CANDLES_DATA NewData;
                if (FRACTIONED_CANDLES.TryDequeue(out NewData))
                {
                    using (DBBINANCEEntities LSQLBINANCE = new DBBINANCEEntities())
                    {
                        LSQLBINANCE.TB_FRACTIONED_CANDLES_DATA.Add(NewData);
                        if (NewData.IS_LAST_CANDLE)
                            LSQLBINANCE.TB_CANDLES_DATA.Add(new TB_CANDLES_DATA()
                            {
                                BASE_VOLUME = NewData.BASE_VOLUME,
                                CLOSE_TIME = NewData.CLOSE_TIME,
                                IS_LAST_CANDLE = NewData.IS_LAST_CANDLE,
                                MONEY_VOLUME = NewData.MONEY_VOLUME,
                                PCLOSE = NewData.PCLOSE,
                                PHIGH = NewData.PHIGH,
                                PLOW = NewData.PLOW,
                                POPEN = NewData.POPEN,
                                SYMBOL = NewData.SYMBOL,
                                TAKER_BUY_BASE_VOLUME = NewData.TAKER_BUY_BASE_VOLUME,
                                TAKER_BUY_MONEY_VOLUME = NewData.TAKER_BUY_MONEY_VOLUME,
                                TRADES = NewData.TRADES
                            });
                        LSQLBINANCE.SaveChanges();
                    }
                }
                Thread.Sleep(1);
            }            
        }

Thx 进阶

拉斐尔

4

1 回答 1

0

我在您的代码中看到一个错误,每次插入后您都在休眠一个后台线程,如果有更多数据,请不要休眠。代替:

if (FRACTIONED_CANDLES.TryDequeue(out NewData))
{
    using (DBBINANCEEntities LSQLBINANCE = new DBBINANCEEntities())
    {
        LSQLBINANCE.TB_FRACTIONED_CANDLES_DATA.Add(NewData);
        if (NewData.IS_LAST_CANDLE)
            LSQLBINANCE.TB_CANDLES_DATA.Add(new TB_CANDLES_DATA()
            {
                BASE_VOLUME = NewData.BASE_VOLUME,
                CLOSE_TIME = NewData.CLOSE_TIME,
                IS_LAST_CANDLE = NewData.IS_LAST_CANDLE,
                MONEY_VOLUME = NewData.MONEY_VOLUME,
                PCLOSE = NewData.PCLOSE,
                PHIGH = NewData.PHIGH,
                PLOW = NewData.PLOW,
                POPEN = NewData.POPEN,
                SYMBOL = NewData.SYMBOL,
                TAKER_BUY_BASE_VOLUME = NewData.TAKER_BUY_BASE_VOLUME,
                TAKER_BUY_MONEY_VOLUME = NewData.TAKER_BUY_MONEY_VOLUME,
                TRADES = NewData.TRADES
            });
        LSQLBINANCE.SaveChanges();
    }
}
Thread.Sleep(1);

将最后一行更改为:

else
    Thread.Sleep(1);

这可能会解决您的问题。

于 2021-01-24T21:08:59.430 回答