1

我需要订阅 Uniswap 对合约同步事件并获得对储备金。所以在这里我尝试做的是:

[Event("Sync")]
class PairSyncEventDTO : IEventDTO
{
    [Parameter("uint112", "reserve0")]
    public virtual BigInteger Reserve0 { get; set; }

    [Parameter("uint112", "reserve1", 2)]
    public virtual BigInteger Reserve1 { get; set; }
}

public async Task Start()
{
    readonly string uniSwapFactoryAddress = "0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f";
    
    string uniSwapFactoryAbi = GetAbi(Resources.IUniswapV2Factory);
    string uniSwapPairAbi    = GetAbi(Resources.IUniswapV2Pair);
    
    var      web3                   = new Web3("https://mainnet.infura.io/v3/fff");
    Contract uniSwapFactoryContract = web3.Eth.GetContract(uniSwapFactoryAbi, uniSwapFactoryAddress);
    Function uniSwapGetPairFunction = uniSwapFactoryContract.GetFunction("getPair");
    
    string daiAddress          = "0x6b175474e89094c44da98b954eedeac495271d0f";
    string wethAddress         = "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2";
    string pairContractAddress = await uniSwapGetPairFunction.CallAsync<string>(wethAddress, daiAddress);
    
    Contract pairContract = web3.Eth.GetContract(uniSwapPairAbi, pairContractAddress);
    
    Event          pairSyncEvent  = pairContract.GetEvent("Sync");
    NewFilterInput pairSyncFilter = pairSyncEvent.EventABI.CreateFilterInput();
    
    using (var client = new StreamingWebSocketClient("wss://mainnet.infura.io/ws/v3/fff"))
    {
        var subscription = new EthLogsObservableSubscription(client);
        subscription.GetSubscriptionDataResponsesAsObservable().
                     Subscribe(log =>
                               {
                                   try
                                   {
                                       EventLog<PairSyncEventDTO> decoded = Event<PairSyncEventDTO>.DecodeEvent(log);
                                       if (decoded != null)
                                       {
                                           decimal reserve0 = Web3.Convert.FromWei(decoded.Event.Reserve0);
                                           decimal reserve1 = Web3.Convert.FromWei(decoded.Event.Reserve1);
                                           Console.WriteLine($@"Price={reserve0 / reserve1}");
                                       }
                                       else Console.WriteLine(@"Found not standard transfer log");
                                   }
                                   catch (Exception ex)
                                   {
                                       Console.WriteLine(@"Log Address: " + log.Address + @" is not a standard transfer log:", ex.Message);
                                   }
                               });
    
        await client.StartAsync();
        await subscription.SubscribeAsync(pairSyncFilter);
    }
}

string GetAbi(byte[] storedContractJson)
{
    string  json           = Encoding.UTF8.GetString(storedContractJson);
    JObject contractObject = JObject.Parse(json);

    if (!contractObject.TryGetValue("abi", out JToken abiJson)) throw new KeyNotFoundException("abi object was not found in stored contract json");
    return abiJson.ToString();
}

它似乎订阅了,但从未进入Subscribelambda。此外,如果我尝试await subscription.SubscribeAsync();不使用任何过滤器,它也不会输入Subscribelambda。但是在执行后SubscribeAsync,进程会显着加载 CPU。

  1. 我究竟做错了什么?为什么不Subscribe调用 lambda?
  2. 为什么会加载 CPU?
4

1 回答 1

1

我没有看到您的代码存在重大问题,但是由于我没有 abis,因此这是一个没有它们的示例。“同步”事件不会一直触发,所以这可能是问题所在。

using Nethereum.ABI.FunctionEncoding.Attributes;
using Nethereum.Contracts;
using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
using System;
using System.Collections.Generic;
using System.Numerics;
using System.Text;
using System.Threading.Tasks;
using Nethereum.RPC.Eth.DTOs;
using Nethereum.RPC.Web3;
using Newtonsoft.Json.Linq;

namespace Nethereum.WSLogStreamingUniswapSample
{
  class Program
  {

    [Event("Sync")]
    class PairSyncEventDTO : IEventDTO
    {
        [Parameter("uint112", "reserve0")]
        public virtual BigInteger Reserve0 { get; set; }

        [Parameter("uint112", "reserve1", 2)]
        public virtual BigInteger Reserve1 { get; set; }
    }


    public partial class GetPairFunction : GetPairFunctionBase { }

    [Function("getPair", "address")]
    public class GetPairFunctionBase : FunctionMessage
    {
        [Parameter("address", "tokenA", 1)]
        public virtual string TokenA { get; set; }
        [Parameter("address", "tokenB", 2)]
        public virtual string TokenB { get; set; }
    }

   

    public static async Task Main()
    {


        string uniSwapFactoryAddress = "0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f";

        var web3 = new Web3.Web3("https://mainnet.infura.io/v3/7238211010344719ad14a89db874158c");


        string daiAddress = "0x6b175474e89094c44da98b954eedeac495271d0f";
        string wethAddress = "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2";

        var pairContractAddress = await web3.Eth.GetContractQueryHandler<GetPairFunction>()
            .QueryAsync<string>(uniSwapFactoryAddress,
                new GetPairFunction() {TokenA = daiAddress, TokenB = wethAddress});

        var filter =  web3.Eth.GetEvent<PairSyncEventDTO>(pairContractAddress).CreateFilterInput();

        
        using (var client = new StreamingWebSocketClient("wss://mainnet.infura.io/ws/v3/7238211010344719ad14a89db874158c"))
        {
            var subscription = new EthLogsObservableSubscription(client);
            subscription.GetSubscriptionDataResponsesAsObservable().
                         Subscribe(log =>
                         {
                             try
                             {
                                 EventLog<PairSyncEventDTO> decoded = Event<PairSyncEventDTO>.DecodeEvent(log);
                                 if (decoded != null)
                                 {
                                     decimal reserve0 = Web3.Web3.Convert.FromWei(decoded.Event.Reserve0);
                                     decimal reserve1 = Web3.Web3.Convert.FromWei(decoded.Event.Reserve1);
                                     Console.WriteLine($@"Price={reserve0 / reserve1}");
                                 }
                                 else Console.WriteLine(@"Found not standard transfer log");
                             }
                             catch (Exception ex)
                             {
                                 Console.WriteLine(@"Log Address: " + log.Address + @" is not a standard transfer log:", ex.Message);
                             }
                         });

            await client.StartAsync();
            subscription.GetSubscribeResponseAsObservable().Subscribe(id => Console.WriteLine($"Subscribed with id: {id}"));
            await subscription.SubscribeAsync(filter);

            Console.ReadLine();

            await subscription.UnsubscribeAsync();
        }
    }
}

为了让它在 infura 中保持活力,您可能需要每隔一段时间对其执行一次 ping 操作示例

 while (true)
 {
      var handler = new EthBlockNumberObservableHandler(client);
      handler.GetResponseAsObservable().Subscribe(x => Console.WriteLine(x.Value));
      await handler.SendRequestAsync();
      Thread.Sleep(30000);
  }
于 2021-03-17T12:17:40.797 回答