8

我在我的 Windows 应用程序中使用 websocket sharp dll 从 GDAX 服务器获取消息。到目前为止一切正常 - 消息即将到来,我正在处理它们。我被卡住的地方是消息停止出现的时候。至少我在 WebSocket.OnMessage 事件(https://github.com/sta/websocket-sharp)中没有找到任何可以帮助我跟踪消息何时停止的东西(我也尝试了 emitonping)

现在我收到的消息有一个消息类型“心跳”,每秒发送一次。我想添加一个单独的计时器控件来检查心跳消息是否每秒发送一次,如果它停止发送,那么我将需要再次重新连接服务器。但是由于消息停止发送时没有任何反应,我该如何跟踪它,我应该在哪里放置计时器代码来检查心跳消息何时停止发送?

我希望我能解释一下我受到打击的情况。如果有人渴望帮助我并需要更多输入,请告诉我。

更新

    private void _3_Load(object sender, EventArgs e)
    {          
        ConnectAndGetWebsocketFeedMessages();           
    }

    public delegate void WSOpen(string text);
    public delegate void WSMessage(string message);
    public delegate void WSError(string text);
    public delegate void WSClose(string text);

    private static string _endPoint = "wss://ws-feed.gdax.com";
    WebSocket ws = new WebSocket(_endPoint);

    private bool IsConnected { get; set; }
    private string ProductId { get; set; }

    string productId = "LTC-EUR";
    ConcurrentQueue<string> concurrentQueue = new ConcurrentQueue<string>();

    public void SetWebSocketSharpEvents()
    {
        ws.Log.Level = LogLevel.Trace;

        ws.OnOpen += (sender, e) =>
        {
            IsConnected = true;
            OnWSOpen("Connection Status :: Connected *********");
        };
        ws.EmitOnPing = true;
        ws.OnMessage += (sender, e) =>
        {
            if (e.IsPing)
            {
                OnWSMessage("ping received");
            }
            else
            {                    
                OnWSMessage(e.Data);
            }
        };

        ws.OnError += (sender, e) =>
        {
            IsConnected = false;
            OnWSError(e.Message); //An exception has occurred during an OnMessage event. An error has occurred in closing the connection.
            if (ws.IsAlive)
                ws.Close();
        };

        ws.OnClose += (sender, e) =>
        {
            IsConnected = false;
            OnWSClose("Close");
        };

        ws.ConnectAsync();
    }

    private void ConnectAndGetWebsocketFeedMessages()
    {            
        SetWebSocketSharpEvents();
    }

    private void SubscribeProduct(string sProductID)
    {
        if (IsConnected)
        {
            ProductId = sProductID;
            string data = "{\"type\": \"subscribe\", \"product_ids\": [\"" + sProductID + "\"]}";
            ws.Send(data);
            ws.Send("{\"type\": \"heartbeat\", \"on\": true}");
        }
    }

    void OnWSOpen(string text)
    {
        SubscribeProduct(productId);
        timer1.Interval = 1000;
        timer1.Tick += timer1_Tick;
        timer1.Start();
    }

    DateTime lastHeartbeatTime = DateTime.MinValue;
    bool isTimerStart = false;
    void OnWSMessage(string message)
    {
        concurrentQueue.Enqueue(message);
        SaveHeartbeatMessageTime(message);
        ProcessMessage(message);
    }

    private void SaveHeartbeatMessageTime(string jsonString)
    {
        var jToken = JToken.Parse(jsonString);

        var typeToken = jToken["type"];

        var type = typeToken.ToString();

        if (type == "heartbeat")
        {
            lastHeartbeatTime = DateTime.Now;
            this.Invoke(new MethodInvoker(delegate()
            {
                lbllastheartbeat.Text = lastHeartbeatTime.ToLongTimeString();
            }));               
        }
    }

    private void ProcessMessage(string message) {  }

    void OnWSError(string text) { }

    void OnWSClose(string text) { }

    bool isMessagesReceived = false;

    private void timer1_Tick(object sender, EventArgs e) // it stops working as soon as lbllastheartbeat gets some value
    {
        DateTime currentTime = DateTime.Now;
        TimeSpan duration = currentTime.Subtract(lastHeartbeatTime);
        this.Invoke(new MethodInvoker(delegate()
        {
            lblNow.Text = currentTime.ToLongTimeString();
        }));
        if (Int16.Parse(duration.ToString("ss")) > 1)
        {
            // reconnect here
        }
    }

编辑 我正在使用 Windows 窗体计时器控件,它继续调用 timer1_Tick 方法并且不调用 OnWSMessage 方法。如何确保两者并行运行,如果任何消息丢失或消息停止发送,则它会重新连接?

Edit2 下面提供的解决方案建议在 onMessage 事件中添加计时器功能,但如果我没有收到消息会发生什么?如果未收到消息,则代码不执行任何操作。我采用了一个全局变量,每当收到消息时,它都会在该变量中添加时间。现在我想运行一个单独的计时器控件,它将检查该变量中是否有任何内容,如果它的值(即秒差)大于 1,那么继续检查其他内容。

有没有人可以调查一下并请提供建议。

Update2:我仍然想使用 windows.timer 控件而不是 threading.timer 来执行此操作。我在我的 Windows 应用程序中使用了两个标签,lbllastheartbeat(显示收到心跳消息的时间)和 lblNow(显示调用计时器的当前时间)。

要求 - 我的计时器将检查是否丢失了任何心跳消息,这是通过“lastHeartbeatTime”变量完成的,该变量存储接收心跳消息的时间。

如果有人可以查看我的代码并提出我做错了什么或哪里做错了,我将不胜感激。

4

2 回答 2

5

答案已经给出 - 您需要启动计时器,该计时器将在您收到消息时的超时时间后触发,并在每次收到消息时重置该计时器。但似乎你想要代码示例,所以这里是(带注释):

System.Threading.Timer _timeoutTimer;
private readonly object _timeoutTimerLock = new object();
private void ResetTimeoutTimer() {
    // if you are sure you will never access this from multiple threads at the same time - remove lock
    lock (_timeoutTimerLock) {
        // initialize or reset the timer to fire once, after 2 seconds
        if (_timeoutTimer == null)
            _timeoutTimer = new System.Threading.Timer(ReconnectAfterTimeout, null, TimeSpan.FromSeconds(2), Timeout.InfiniteTimeSpan);
        else
            _timeoutTimer.Change(TimeSpan.FromSeconds(2), Timeout.InfiniteTimeSpan);
    }
}

private void StopTimeoutTimer() {
    // if you are sure you will never access this from multiple threads at the same time - remove lock
    lock (_timeoutTimerLock) {
        if (_timeoutTimer != null)
            _timeoutTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
    }
}

private void ReconnectAfterTimeout(object state) {
    // reconnect here
}

public void SetWebSocketSharpEvents() {
    ws.Log.Level = LogLevel.Trace;

    ws.OnOpen += (sender, e) => {
        // start timer here so that if you don't get first message after 2 seconds - reconnect
        ResetTimeoutTimer();
        IsConnected = true;
        OnWSOpen("Connection Status :: Connected *********");
    };
    ws.EmitOnPing = true;
    ws.OnMessage += (sender, e) => {
        // and here
        ResetTimeoutTimer();
        if (e.IsPing) {
            OnWSMessage("ping received");
        }
        else {
            OnWSMessage(e.Data);
        }
    };

    ws.OnError += (sender, e) => {
        // stop it here
        StopTimeoutTimer();
        IsConnected = false;
        OnWSError(e.Message); //An exception has occurred during an OnMessage event. An error has occurred in closing the connection.


  if (ws.IsAlive)
                ws.Close();
        };

        ws.OnClose += (sender, e) => {
            // and here
            StopTimeoutTimer();
            IsConnected = false;
            OnWSClose("Close");
        };

        ws.ConnectAsync();
    }
于 2017-11-09T07:41:24.760 回答
2

从你的问题我理解的是,你的消息每秒钟发送一次,但问题是只有当它停止时你想知道并再次运行它,如果是这样,你只需应用计时器并检查每秒钟是否有消息一秒钟或更长时间后未发送(检查 sentMessage() 方法设置一个布尔值,如果消息发送它应该给出 true 否则 false),而不是给出重新连接服务器的命令。

于 2017-11-07T05:13:42.273 回答