1

我有一个应用程序,它通过与一台服务器(可观察的)和许多客户端(观察者)进行远程处理来使用 RX。我的问题是,当客户端(观察者)在不执行 ubsubscribe(dispose) 的情况下无法正确断开连接时,服务器上的 OnNext() 函数开始引发远程处理异常。

是否有任何机制可以取消订阅服务器端有问题的观察者?

客户端代码部分:

internal void SetRemoting(bool refreshInstance)
{
    string channelName = "RemotingClientUI";
    IDictionary dict = new Hashtable();
    dict["port"] = 9988;
    dict["name"] = channelName;

    var bcp = new BinaryClientFormatterSinkProvider();
    var channel = new TcpClientChannel(dict, bcp);
    ChannelServices.RegisterChannel(channel, false);

    _remoteServer = (IRemoteServerService) Activator.
        GetObject(typeof (IRemoteServerService),
            tcp://...");
}

private void SubscribeToRemoteEvents(bool unSubscrubeFirst)
{            
    _jobRowUpdate = _remoteServer.JobRowUpdate.Subscribe(UpdateJobQueueRow);
    _packageRowUpdate = _remoteServer.PackageRowUpdate.   
        Subscribe(UpdatePackageQueueRow);
    _miscUpdate = _remoteServer.MiscAction.Subscribe(MiscRemoteActions);
}

服务器代码部分:

public class RemoteServiceService
{
    public RemoteServiceService()
    {
        JobRowUpdate = LoggerFactory.GetLogger(
            LoggerType.RemoteService, this).JobRowUpdate.Remotable();
        PackageRowUpdate = LoggerFactory.GetLogger(
            LoggerType.RemoteService, this).PackageRowUpdate.Remotable();
        MiscAction = LoggerFactory.GetLogger(
            LoggerType.RemoteService, this).MiscActions.Remotable();
    }
}

public class RemoteLoggerForService
{
    private RemoteLoggerForService(IService service)
    {
        _jobRowUpdate = new Subject<IJobQueueRow>();
        _packageRowUpdate = new Subject<IPackageQueueRow>();
        _miscActions = new Subject<MiscRemoteObjects>();
        _service = service;
    }

    #region Overrides of LoggerBase

    public override void WriteToLog<T>(T stringFormatOrObject, 
        params object[] args)
    {
        lock (this)
        try
        {
            lock (LockLogger)
            {
                if (stringFormatOrObject is IJobQueueRow && 
                    _jobRowUpdate != null)
                {
                    _jobRowUpdate.OnNext(
                        stringFormatOrObject as IJobQueueRow);
                }

                if (stringFormatOrObject is IPackageQueueRow && 
                    _packageRowUpdate != null)
                {
                    _packageRowUpdate.OnNext(
                        stringFormatOrObject as IPackageQueueRow);
                }

                if (stringFormatOrObject is MiscRemoteObjects && 
                    _miscActions != null)
                {
                    _miscActions.OnNext(
                        stringFormatOrObject as MiscRemoteObjects);
                }
            }
        }
        catch(Exception ex)
        {
            LoggerFactory.GetLogger(LoggerType.File, null).
                WriteToLog(
                    Utils.GetFullException("RemoteLoggerForService", ex));
        }
    }

    #endregion
}
4

1 回答 1

0

我认为您正在尝试做的是吞下异常并让您的服务器继续运行。您可以通过捕获异常来做到这一点(就像您在此处所做的那样),然后您可以将其传递给OnError相关 Observable 的方法,以便订阅者可以选择如何做出反应。

于 2015-01-05T13:48:59.960 回答