1

我们已经创建了 .Net 核心 Windows 服务,用于使用来自 Salesforce 的平台事件(每当在 Salesforce 中创建/更新的特定对象想要获取信息时)。我们正在使用 Cometd/bayeux 客户端订阅 Salesforce 平台事件。

最初一切正常,只要 Salesforce 对象发生更改,我们就会获取数据,但在空闲几个小时(大约 1-2 小时)后,没有数据获取。检查 Bayeux 客户端状态,它显示为已连接,但订阅未发生。当我们重新启动服务时,它开始工作。使用以下代码进行连接和订阅。任何人都可以帮助解决这个问题。

 public void CheckAndSubscribe()
    {
        if (!_bayeuxClient.IsConnected())
        {
            _logger.LogInformation("Bayeux client not connected. trying to connect...");
            try
            {
                SalesforceSession salesforceSessionData = _sfSessionAdapter.GetSalesforceSession();
                _bayeuxClient.Connect(salesforceSessionData.Url, salesforceSessionData.SessionId);

                List<string> sfChannels = _syncSalesforceConfiguration.BayeuxClientConfiguration.ExternalChannels;

                foreach (string channel in sfChannels)
                {
                    _bayeuxClient.Subscribe(channel, _messageListener);
                }
                _logger.LogInformation("Bayeux client connected and channels subscribed...");
            }
            catch (Exception ex)
            {
                _logger.LogException(ex);
            }

        }
    }
 public class BayeuxClientAdapter : IBayeuxClientAdapter
{
    BayeuxClient _bayeuxClient = null;
    private readonly SyncSalesforceConfiguration _syncSalesforceConfiguration;
    public BayeuxClientAdapter(IOptions<SyncSalesforceConfiguration> syncSalesforceConfiguration)
    {
        _syncSalesforceConfiguration = syncSalesforceConfiguration.Value;
    }
    public bool IsConnected()
    {
        return _bayeuxClient?.Connected ?? false;
    }

    public void Connect(string instanceUrl, string authToken)
    {
        int readTimeOut = 120 * 1000;
        string streamingEndpointURI = _syncSalesforceConfiguration.BayeuxClientConfiguration.StreamingEndpointUri;

        IDictionary<string, object> options = new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase)
        {
            { ClientTransport.TIMEOUT_OPTION, readTimeOut },
            { ClientTransport.MAX_NETWORK_DELAY_OPTION, 120000 }
        };

        var headers = new NameValueCollection { { HttpRequestHeader.Authorization.ToString(), $"OAuth {authToken}" } };

        var clientTransport = new LongPollingTransport(options, headers);

        var serverUri = new Uri(instanceUrl);
        String endpoint = String.Format("{0}://{1}{2}", serverUri.Scheme, serverUri.Host, streamingEndpointURI);

        _bayeuxClient = new BayeuxClient(endpoint, new[] { clientTransport });
    }

    public void DisConnect()
    {
        if (IsConnected())
        {
            _bayeuxClient?.ResetSubscriptions();
            _bayeuxClient?.Disconnect();
            _bayeuxClient?.WaitFor(1000, new[] { BayeuxClient.State.DISCONNECTED });
        }
    }

    public void Subscribe(string channel, IMessageListener listener)
    {
        _bayeuxClient.Handshake();
        _bayeuxClient.WaitFor(1000, new[] { BayeuxClient.State.CONNECTED });

        var sfChannel = _bayeuxClient.GetChannel(channel);
        sfChannel.Subscribe(listener);
    }
}
4

1 回答 1

1

只要通道上没有活动,服务器就会在特定时间后关闭连接。

在那段时间,客户端收到403(未知客户端)状态码,客户端必须在 110 秒内再次握手。

默认情况下,CometD 会在没有任何用户交互的情况下尝试重新连接,如果客户端未在预期时间内重新连接,则服务器会删除客户端的 CometD 会话。

一旦连接重新连接,ComedD 将删除所有频道订阅,我们必须再次订阅频道才能接收事件。

为此,我们必须使用meta/Handshake回调来重新订阅频道。

于 2020-09-11T12:19:01.027 回答