1

我正在做 c# 项目。我想以某种同步方式通过 websocketsharp 库发送 API 请求。

我一直在尝试按照以下方式进行操作:

  1. 在我们发送任何 WS 请求之前,我们创建具有唯一 ID 的新 SynchronousRequest() 对象,并将新创建的对象添加到某种等待列表中

  2. 我们在响应中发送 WS 请求,将唯一 ID 添加到有效负载 - 服务器将返回相同的 ID。

  3. 我们开始等待事件发出信号(一旦我们收到响应就会发出信号)

在响应处理程序上:

  1. 一旦 WS 响应到达,我尝试通过唯一 ID 匹配上下文
  2. 一旦匹配成功,我们就发出响应已收到的信号,并将响应负载添加到 synchronousRequest() 对象

问题是第 3 步,一旦我在整个 websocket 客户端挂起的事件上使用 WaitOne() 并且不会收到进一步的响应 - 导致完全死锁。

我怎样才能在单独的线程中进行某种 WaitOne() 调用,或者我的问题可能存在完全更好的解决方案,所以整个客户端不会挂起并且我们匹配上下文?

public class SynchronousRequest
{
    public long requestId;
    public ManualResetEvent resetEvent = new ManualResetEvent(false);
    public dynamic response;

    public SynchronousRequest()
    {
        var random = new Random();
        requestId = random.Next();
    }

}


public class APIWebSocket: BaseAPIWebSocket
{


    private List<SynchronousRequest> waitingSyncRequests = new List<SynchronousRequest>();


    public APIWebSocket()
    {
        ws = new WebSocket("wss://www.someserver.com");

        registerConnectionEvents(); //Registers onOpen(), onMessage() handlers and similar
    }



    public void SendSyncTest()
    {
        var sr = new SynchronousRequest();
        waitingSyncRequests.Add(sr);

        //some data to send
        var msg = new
        {
            jsonrpc = "2.0",
            method = "public/ticker",
            id = sr.requestId, //Response should contain the same ID
            @params = new
            {
                instrument_name = "ETH"
            }
        };

        ws.Send(JsonConvert.SerializeObject(msg));

        //Below WaitOne() causes the entire websocket connection/thread to block
        // No further messages will be received by HandleMessage() once we call WaitOne()

        sr.resetEvent.WaitOne(); //Wait until we receive notification that response has been received

        //do some processing on response here... 

       //Synchronous request completed, remove it from list
       waitingSyncRequests.Remove(sr);
    }




    protected override void OnReceivedMessage(System.Object sender, WebSocketSharp.MessageEventArgs e)
    {
        dynamic message = JsonConvert.DeserializeObject(e.Data);

        if (message.id != null )
        {
            //Find a resetEvent for given message.id
            var matchingSyncRequest = waitingSyncRequests.First(r => r.requestId == message.id);
            if (matchingSyncRequest != null)
            {
                matchingSyncRequest.response = message;
                matchingSyncRequest.resetEvent.Set(); //Notify that response has been received
            }
        }

    }

}
4

1 回答 1

0

据我了解,您需要参加await一些将在未来某个时间设置的活动。你可以考虑使用TaskCompletionSource.

  • 1.每当您发送需要其结果的消息时,您可以创建一个TaskCompletionSource(tcs) ,将其添加到 aDictionary并通过套接字发送消息。

  • 2.你await tcs.Task还是tcs.Task.Result那个tcs

  • 3.在另一个Thread/Task您可以处理您的传入responses(在您的情况下,您有收到的消息处理程序。每当您收到目标type或 的响应时id,您可以tcs根据id从字典中 获取它。在Task.SetResult(response)那一刻来电者(您正在等待的人被解锁)。

     public class Message
     {
       public string ID{get;set;}
     }
     public class Response
     {
         public string Id{get;set;}
     }
    
     public void MyClass
     {
        private ConcurrentDictionary<string,TaskCompletionSource<Response>>map=new ConcurrentDictionary<string,TaskCompletionSource<Response>>();
        private Websocket socket;
        public void SomeEventTrigger()
        {
            var msg=new Message{ Id="somespecialID" };
            var tcs=new TaskCompletionSource<Response>();
            ws.Send(JsonConvert.SerializeObject(msg));
            if(!this.map.TryAdd(msg.Id,tcs))
            {
               return;
            }
            var result=tcs.Result; //this gets blocked until you use `tcs.SetResult`- >  that would happen in your OnReceivedMessage  
            this.map.TryRemove(msg.Id,out TaskCompletionSource<Response>resp);
    
        }
        protected override void OnReceivedMessage(System.Object sender, WebSocketSharp.MessageEventArgs e)
        {
             Response message = JsonConvert.DeserializeObject<Response>(e.Data);
    
             if (message.id != null )
             {
    
                 if(this.map.TryGetValue(message.id),out TaskCompletionSource<Response> tcs)
                 {
                    tcs.SetResult(message); //this unblocks the method that wrote the message to the socket   (above)
                 }
    
    
             }
          }
       }
    

PS 您需要确保您使用 Task.SetResult 正确的方法tcs以便正确调用SomeEventTrigger方法停止等待。

于 2020-07-16T11:52:57.073 回答