2

I'm using the GDAX API Websocket Stream to try and create a copy of the full LEVEL3 orderbook.

I've got a very simple implementation using WebSocketSharp and Im basically doing something like this.

private readonly WebSocket _webSocket = new WebSocket("wss://ws-feed.gdax.com");

_webSocket.OnMessage += WebSocket_OnMessage;
_webSocket.Connect();
_webSocket.Send(JsonConvert.SerializeObject(new BeginSubscriptionMessage()));

private void WebSocket_OnMessage(object sender, MessageEventArgs e)
{
    var message = JsonConvert.DeserializeObject<BaseMessage>(e.Data);
    switch (message.Type)
    {   
        case "match": //A trade occurred between two orders. 
            MatchMessage matchMessage = JsonConvert.DeserializeObject<MatchMessage>(e.Data);
            _receivedMatchQueue.Enqueue(matchMessage);
            break;
        case "received": //A valid order has been received and is now active. This message is emitted for every single valid order as soon as the matching engine receives it whether it fills immediately or not.
            ReceivedMessage receivedMessage = JsonConvert.DeserializeObject<ReceivedMessage>(e.Data);
            _receivedMessageQueue.Enqueue(receivedMessage);
            break;
        case "open": //The order is now open on the order book. This message will only be sent for orders which are not fully filled immediately. remaining_size will indicate how much of the order is unfilled and going on the book.
            OpenMessage openMessage = JsonConvert.DeserializeObject<OpenMessage>(e.Data);
            _receivedOpenQueue.Enqueue(openMessage);
            break;
        case "done": //The order is no longer on the order book. Sent for all orders for which there was a received message. This message can result from an order being canceled or filled. 
            DoneMessage doneMessage = JsonConvert.DeserializeObject<DoneMessage>(e.Data);
            _receivedDoneQueue.Enqueue(doneMessage);
            break;
        case "change": //Existing order has been changed
            ChangeMessage changeMessage = JsonConvert.DeserializeObject<ChangeMessage>(e.Data);
            _receivedChangeQueue.Enqueue(changeMessage);
            break;
        case "activate": //Stop order placed
            //Console.WriteLine("Stop Order Placed");
            //ActivateMessage activateMessage = JsonConvert.DeserializeObject<ActivateMessage>(e.Data);

            break;
        case "subscriptions":
            break;
        case "ticker":
            TickerMessage tickerMessage = JsonConvert.DeserializeObject<TickerMessage>(e.Data);
            _receivedTickerQueue.Enqueue(tickerMessage);
            break;
        case "l2update":

            break;
    }
}

The problem I am running into is that when I look at the sequence numbers as received through both the RECEIVED and OPEN messages I can see they are not sequential which (based on the following information) suggests that messages are being skipped.

Basically you end up with something like this

Open Message SequenceId: 5359746354
Open Message SequenceId: 5359746358
Open Message SequenceId: 5359746361
Open Message SequenceId: 5359746363
Open Message SequenceId: 5359746365
Open Message SequenceId: 5359746370
Open Message SequenceId: 5359746372

I have tried testing this on Azure, just to make sure that it wasn't a bandwidth limitation on my end and the results were largely similar.

So given this, how is it possible to build a complete 'real-time' orderbook using the 'full' websocket stream if messages are dropped? Can I just safely ignore them? Or do I just somehow clear orphaned values?

Any advice from anyone having done something similar would be extremely appreciated.

4

1 回答 1

2

最有可能的消息不会被丢弃,您只是对这些序列号所代表的“序列”有错误的印象。

如 api 文档中所述

大多数提要消息都包含一个序列号。每个产品的序列号都是递增的整数值,每条新消息都比之前的消息正好是 1 个序列号。

因此,每个渠道对每个产品(如ETH-USD)都有单独的序列号,而不是每个消息类型(如“打开”或“接收”)。假设您订阅了“完整”频道,用于产品ETH-USDETH-EUR. 然后你应该期待这样的序列:

receive `ETH-EUR` X
open `ETH-EUR` X + 1
receive `ETH-USD` Y
done `ETH-EUR` X + 2
open `ETH-USD` Y + 1

对于全通道,消息类型有:received、open、done、match、change、activate(注意ticker消息属于不同的通道,因此有不同的顺序)。因此,为确保不跳过任何消息,您需要跟踪所有这些消息类型,并确保您收到的最后一个序列号正好比每个产品的新序列号小 1(如果您订阅了多个产品)。

证明代码:

class Program {
    private static readonly WebSocket _webSocket = new WebSocket("wss://ws-feed.gdax.com");
    private static long _lastSequence = 0;
    private static readonly HashSet<string> _expectedTypes = new HashSet<string>(
        new[] { "received", "open", "done", "match", "change", "activate" });

    static void Main(string[] args) {
        var subMsg = "{\"type\": \"subscribe\",\"product_ids\": [\"ETH-USD\"],\"channels\": [\"full\"]}";
        _webSocket.OnMessage += WebSocket_OnMessage;
        _webSocket.Connect();
        _webSocket.Send(subMsg);
        Console.ReadKey();
    }        

    private static void WebSocket_OnMessage(object sender, MessageEventArgs e) {
        var message = JsonConvert.DeserializeObject<BaseMessage>(e.Data);
        if (_expectedTypes.Contains(message.Type)) {
            lock (typeof(Program)) {
                if (_lastSequence == 0)
                    _lastSequence = message.Sequence;
                else {
                    if (message.Sequence > _lastSequence + 1) {
                        Debugger.Break(); // never hits, so nothing is dropped
                    }
                    _lastSequence = message.Sequence;
                }
            }
        }
    }
}

public class BaseMessage {
    [JsonProperty("type")]
    public string Type { get; set; }

    [JsonProperty("product_id")]
    public string ProductId { get; set; }

    [JsonProperty("sequence")]
    public long Sequence { get; set; }
}
于 2018-03-15T07:36:50.520 回答