1

到目前为止,在过去的 3 个月中,我仍然对 SignalR 在 JIT(即时)级别的工作方式一无所知。我正在尝试构建一个集线器,及时向客户端发送数据,然后客户端将接收数据并与之一起工作。

编辑:如果你不知道我所说的 JIT 发送和接收是什么意思,

我的意思是当有新数据可用时,服务器能够发送连接的套接字客户端数据。只有当服务器关闭/出现问题或客户端与套接字断开连接时,套接字连接才会关闭。所以简而言之,不管怎样,当服务器产生新数据时,它总是会一个接一个地将数据发送给连接客户端。

所以这就是我错过/困惑的:

  1. SubscribeToAll(查看下面的 TickerHub.cs)方法是当我有新数据要通知客户并向客户发出哔哔声时我调用的地方还是它在哪里?
  2. 我知道异步 WriteToChannel 是如何工作的。基本上,它会逐项向客户端发送一个集合。关键问题是,如何将整个函数转换为 JIT?我在哪里处理订阅此集线器的客户列表?

目前,TickerHub.cs 不断检索数据集(名为 CurrencyPairs),然后无限期地将其广播给客户端。我有一个后台服务,可以 24/7 同步和更新 CurrencyPairs。我只需要 SignalR 专家的帮助来解释/展示如何从后台服务调用集线器,然后允许集线器将新数据广播到连接的客户端。

TickerHub.cs

public class TickerHub : Hub, ITickerHubClient
{
    private IEnumerable<CurrencyPair> _currencyPairs;
    private readonly ICurrencyPairService _cpService;

    public TickerHub(ICurrencyPairService cpService)
    {
        _cpService = cpService;
    }

    public async Task<NozomiResult<CurrencyPair>> Tickers(IEnumerable<CurrencyPair> currencyPairs = null)
    {
        var nozRes = new NozomiResult<CurrencyPair>()
        {
            Success = true,
            ResultType = NozomiResultType.Success,
            Data = currencyPairs
        };

        return nozRes;
    }

    // We can use this to return a payload
    public async Task<ChannelReader<NozomiResult<CurrencyPair>>> SubscribeToAll()
    {
        // Initialize an unbounded channel
        // 
        // Unbounded Channels have no boundaries, allowing the server/client to transmit
        // limitless amounts of payload. Bounded channels have limits and will tend to 
        // drop the clients after awhile.
        var channel = Channel.CreateUnbounded<NozomiResult<CurrencyPair>>();

        _ = WriteToChannel(channel.Writer); // Write all Currency Pairs to the channel

        // Return the reader
        return channel.Reader;

        // This is a nested method, allowing us to write repeated methods
        // with the same semantic conventions while maintaining conformity.
        async Task WriteToChannel(ChannelWriter<NozomiResult<CurrencyPair>> writer)
        {
            // Pull in the latest data
            _currencyPairs = _cpService.GetAllActive();

            // Iterate them currency pairs
            foreach (var cPair in _currencyPairs)
            {
                // Write one by one, and the client receives them one by one as well
                await writer.WriteAsync(new NozomiResult<CurrencyPair>()
                {
                    Success = (cPair != null),
                    ResultType = (cPair != null) ? NozomiResultType.Success : NozomiResultType.Failed,
                    Data = new[] {cPair}
                });
            }

            // Beep the client, telling them you're done
            writer.Complete();
        }
    }
}

如果您想了解我的客户端代码是否无法正常工作,请查看这里

using Microsoft.AspNetCore.SignalR.Client;
using Newtonsoft.Json;
using Nozomi.Client.Data.Interfaces;
using Nozomi.Data;
using Nozomi.Data.CurrencyModels;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Nozomi.Client
{
    public class NozomiClient
    {
        private CancellationToken _tickerStreamCancellationToken;
        private string ServerPath;
        private HubConnection _hubConnection;

        public NozomiClient(string serverPath)
        {
            ServerPath = serverPath;
            _hubConnection = new HubConnectionBuilder()
                .WithUrl(serverPath)
                .Build();
        }

        public async Task InitializeAsync()
        {
            await _hubConnection.StartAsync();
        }

        public async Task StreamTickers()
        {
            // Setup the channel for streaming
            var streamTickerChannel = await _hubConnection.StreamAsChannelAsync<NozomiResult<CurrencyPair>>("SubscribeToAll", CancellationToken.None);

            // Setup the asynchronous data stream
            // https://docs.microsoft.com/en-us/aspnet/core/signalr/streaming?view=aspnetcore-2.1#net-client
            //while (await streamTickerChannel.WaitToReadAsync())
            //{
            //    while (streamTickerChannel.TryRead(out var cp))
            //    {
            //        Console.WriteLine(JsonConvert.SerializeObject(cp));
            //    }
            //}

            _hubConnection.On<CurrencyPair>("SubscribeToAll", cp =>
            {
                Console.WriteLine(cp);
            });

            while (!_tickerStreamCancellationToken.IsCancellationRequested)
            {
                if (await streamTickerChannel.WaitToReadAsync())
                {
                    while (streamTickerChannel.TryRead(out var cp))
                    {
                        Console.WriteLine(JsonConvert.SerializeObject(cp));
                    }
                }

                Console.WriteLine("Processing");
                Thread.Sleep(1000);
            }
        }

        public ICurrencyPair CurrencyPairs { get; }

        public ISource Sources { get; }
    }
}
4

0 回答 0