我们已经创建了 .Net 核心 Windows 服务,用于使用来自 Salesforce 的平台事件(每当在 Salesforce 中创建/更新的特定对象想要获取信息时)。我们正在使用 Cometd/bayeux 客户端订阅 Salesforce 平台事件。
最初一切正常,只要 Salesforce 对象发生更改,我们就会获取数据,但在空闲几个小时(大约 1-2 小时)后,没有数据获取。检查 Bayeux 客户端状态,它显示为已连接,但订阅未发生。当我们重新启动服务时,它开始工作。使用以下代码进行连接和订阅。任何人都可以帮助解决这个问题。
public void CheckAndSubscribe()
{
if (!_bayeuxClient.IsConnected())
{
_logger.LogInformation("Bayeux client not connected. trying to connect...");
try
{
SalesforceSession salesforceSessionData = _sfSessionAdapter.GetSalesforceSession();
_bayeuxClient.Connect(salesforceSessionData.Url, salesforceSessionData.SessionId);
List<string> sfChannels = _syncSalesforceConfiguration.BayeuxClientConfiguration.ExternalChannels;
foreach (string channel in sfChannels)
{
_bayeuxClient.Subscribe(channel, _messageListener);
}
_logger.LogInformation("Bayeux client connected and channels subscribed...");
}
catch (Exception ex)
{
_logger.LogException(ex);
}
}
}
public class BayeuxClientAdapter : IBayeuxClientAdapter
{
BayeuxClient _bayeuxClient = null;
private readonly SyncSalesforceConfiguration _syncSalesforceConfiguration;
public BayeuxClientAdapter(IOptions<SyncSalesforceConfiguration> syncSalesforceConfiguration)
{
_syncSalesforceConfiguration = syncSalesforceConfiguration.Value;
}
public bool IsConnected()
{
return _bayeuxClient?.Connected ?? false;
}
public void Connect(string instanceUrl, string authToken)
{
int readTimeOut = 120 * 1000;
string streamingEndpointURI = _syncSalesforceConfiguration.BayeuxClientConfiguration.StreamingEndpointUri;
IDictionary<string, object> options = new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase)
{
{ ClientTransport.TIMEOUT_OPTION, readTimeOut },
{ ClientTransport.MAX_NETWORK_DELAY_OPTION, 120000 }
};
var headers = new NameValueCollection { { HttpRequestHeader.Authorization.ToString(), $"OAuth {authToken}" } };
var clientTransport = new LongPollingTransport(options, headers);
var serverUri = new Uri(instanceUrl);
String endpoint = String.Format("{0}://{1}{2}", serverUri.Scheme, serverUri.Host, streamingEndpointURI);
_bayeuxClient = new BayeuxClient(endpoint, new[] { clientTransport });
}
public void DisConnect()
{
if (IsConnected())
{
_bayeuxClient?.ResetSubscriptions();
_bayeuxClient?.Disconnect();
_bayeuxClient?.WaitFor(1000, new[] { BayeuxClient.State.DISCONNECTED });
}
}
public void Subscribe(string channel, IMessageListener listener)
{
_bayeuxClient.Handshake();
_bayeuxClient.WaitFor(1000, new[] { BayeuxClient.State.CONNECTED });
var sfChannel = _bayeuxClient.GetChannel(channel);
sfChannel.Subscribe(listener);
}
}