我有一个应用程序,它通过与一台服务器(可观察的)和许多客户端(观察者)进行远程处理来使用 RX。我的问题是,当客户端(观察者)在不执行 ubsubscribe(dispose) 的情况下无法正确断开连接时,服务器上的 OnNext() 函数开始引发远程处理异常。
是否有任何机制可以取消订阅服务器端有问题的观察者?
客户端代码部分:
internal void SetRemoting(bool refreshInstance)
{
string channelName = "RemotingClientUI";
IDictionary dict = new Hashtable();
dict["port"] = 9988;
dict["name"] = channelName;
var bcp = new BinaryClientFormatterSinkProvider();
var channel = new TcpClientChannel(dict, bcp);
ChannelServices.RegisterChannel(channel, false);
_remoteServer = (IRemoteServerService) Activator.
GetObject(typeof (IRemoteServerService),
tcp://...");
}
private void SubscribeToRemoteEvents(bool unSubscrubeFirst)
{
_jobRowUpdate = _remoteServer.JobRowUpdate.Subscribe(UpdateJobQueueRow);
_packageRowUpdate = _remoteServer.PackageRowUpdate.
Subscribe(UpdatePackageQueueRow);
_miscUpdate = _remoteServer.MiscAction.Subscribe(MiscRemoteActions);
}
服务器代码部分:
public class RemoteServiceService
{
public RemoteServiceService()
{
JobRowUpdate = LoggerFactory.GetLogger(
LoggerType.RemoteService, this).JobRowUpdate.Remotable();
PackageRowUpdate = LoggerFactory.GetLogger(
LoggerType.RemoteService, this).PackageRowUpdate.Remotable();
MiscAction = LoggerFactory.GetLogger(
LoggerType.RemoteService, this).MiscActions.Remotable();
}
}
public class RemoteLoggerForService
{
private RemoteLoggerForService(IService service)
{
_jobRowUpdate = new Subject<IJobQueueRow>();
_packageRowUpdate = new Subject<IPackageQueueRow>();
_miscActions = new Subject<MiscRemoteObjects>();
_service = service;
}
#region Overrides of LoggerBase
public override void WriteToLog<T>(T stringFormatOrObject,
params object[] args)
{
lock (this)
try
{
lock (LockLogger)
{
if (stringFormatOrObject is IJobQueueRow &&
_jobRowUpdate != null)
{
_jobRowUpdate.OnNext(
stringFormatOrObject as IJobQueueRow);
}
if (stringFormatOrObject is IPackageQueueRow &&
_packageRowUpdate != null)
{
_packageRowUpdate.OnNext(
stringFormatOrObject as IPackageQueueRow);
}
if (stringFormatOrObject is MiscRemoteObjects &&
_miscActions != null)
{
_miscActions.OnNext(
stringFormatOrObject as MiscRemoteObjects);
}
}
}
catch(Exception ex)
{
LoggerFactory.GetLogger(LoggerType.File, null).
WriteToLog(
Utils.GetFullException("RemoteLoggerForService", ex));
}
}
#endregion
}