0

我遇到了客户端无法在第二次启动时注册回调通道的问题。我尝试了各种方法,似乎找到了在关闭服务时删除 Close() 通道(包装器中的 ResetProxy() 调用)调用的解决方案。但事实证明这会导致另一个问题:服务器崩溃并出现错误“线程试图读取或写入它没有适当访问权限的虚拟地址”. 此外,还有一些与网络相关的问题也会导致相同的行为。解决方案始终是重新启动服务器,这不是修复回调通道注册的正确选项。谁能建议可能是什么问题?我做了很多测试,这似乎与所有客户端的突然停止(多个服务的全局停止)和服务数量有关。结果,即使是新客户端也无法在服务器重新启动之前注册到服务器。这听起来像服务器正在阻止关闭的频道。

合同:

[ServiceContract(SessionMode = SessionMode.Required, CallbackContract = typeof(IAgentCallback))]
public interface IAgentManager
{
    [OperationContract]
    string RegisterAgent(string hostName);

    [OperationContract]
    bool Ping();
}

客户端创建回调代码(截断):

public class AgentCallbackDaemon
{
    private void CreateManagerProxy()
    {
        Reset();
        var isUnbound = true;
        while (isUnbound)
        {
            try
            {
                ManagerProxy = new ProxyWrapper<AgentManagerProxy, IAgentManager>(CreateProxy);
                ManagerProxy.ChannelStateChanged += HandleProxyConnectionStateChanged;
                isUnbound = false;
            }
            catch (AddressAlreadyInUseException)
            {
                sLog.ErrorFormat("Port is already reserved, binding failed.");
            }
            catch (Exception error)
            {
                sLog.ErrorFormat($"No proxy due to {error}");
                throw;
            }
        }
    }

    private AgentManagerProxy CreateProxy()
        => mCallbackChannelPortRange.IsCallbackPortRageSet() ? GetProxyWithPortRange() : GetProxyDefault();

    private AgentManagerProxy GetProxyDefault()
        => new AgentManagerProxy(mAgentCallback, mManagerUri, GetServiceName());

    private AgentManagerProxy GetProxyWithPortRange()
    {
        var minPort = mCallbackChannelPortRange.MinimumCallbackPortNumber;
        var maxPort = mCallbackChannelPortRange.MaximumCallbackPortNumber;
        return new AgentManagerProxy(mAgentCallback, mManagerUri, GetServiceName(), minPort, maxPort);
    }
}

客户端回调代码(截断):

public class AgentManagerProxy : DuplexClientBase<IAgentManager>, IAgentManager
{
    public const string SERVICE_NAME = "AgentManager";

    public AgentManagerProxy(IAgentCallback callback, string serviceAddress, string connectionId,
        ushort minPort, ushort maxPort)
        : base(callback, BindingAbstractFactory.DuplexServiceClientBindingFactory(connectionId,
        minPort, maxPort), BindingUtility.CreateEndpoint(serviceAddress, SERVICE_NAME))
    {
    }

    public string RegisterAgent(string hostName)
    => Channel.RegisterAgent(hostName);


    public bool Ping() => return Channel.Ping();
}


public static class BindingAbstractFactory
{
    public static Binding DuplexServiceClientBindingFactory(string connectionId, ushort minPort, ushort maxPort)
    => CreateDuplexServiceClientBinding(connectionId, minPort, maxPort);


    private static Binding CreateDuplexServiceClientBinding(string connectionId, ushort minPort, ushort maxPort)
    {
        var binding = CreateDuplexServiceHostBinding();
        if (binding is WSDualHttpBinding)
        {
            lock (sLock)
            {
                try
                {
                    ((WSDualHttpBinding)binding).ClientBaseAddress =
                        CreateClientBaseAddress(connectionId, minPort, maxPort);
                }
                catch (Exception error)
                {
                    sLog.ErrorFormat("Unexpected exception: {0}", error);
                    throw error;
                }
                finally
                {
                    Monitor.PulseAll(sLock);
                }
            }
        }

        return binding;
    }

    private static Binding CreateDuplexServiceHostBinding()
    {
        var binding = new WSDualHttpBinding
        {
            ReceiveTimeout = TimeSpan.MaxValue,
            MaxReceivedMessageSize = int.MaxValue,
            ReaderQuotas =
            {
                // Set the maximum sizes to allow big message sizes.
                MaxArrayLength = int.MaxValue,
                MaxBytesPerRead = int.MaxValue,
                MaxDepth = int.MaxValue,
                MaxNameTableCharCount = int.MaxValue,
                MaxStringContentLength = int.MaxValue
            },
            Security =
            {
                // Disable security on control services.
                // TODO: Once security concept has been clarified, update this to reflect security rules.
                Mode = WSDualHttpSecurityMode.None,
                Message = { ClientCredentialType = MessageCredentialType.None }
                //// binding.Security.Mode = SecurityMode.None;
                //// binding.Security.Transport.ClientCredentialType = HttpClientCredentialType.None;
            }
        };

        return binding;
    }

    private static Uri CreateClientBaseAddress(string connectionId, ushort minPort, ushort maxPort)
    {
        var fullDnsName = Dns.GetHostEntry(Dns.GetHostName()).HostName;
        int portNumber;
        if (maxPort == ushort.MaxValue)
            portNumber = LocalPorts.GetRandomAvailablePort(connectionId, minPort, maxPort);
        else
            portNumber = LocalPorts.GetNextAvailablePort(connectionId, minPort, maxPort);
        return new Uri($@"http://{fullDnsName}:{portNumber}");
    }
}

所有这些都包装到代理包装器中,带有重置功能,在服务停止或任何错误(包括网络)时调用:

public class ProxyWrapper<TPROXY, TSERVICE> where TPROXY : ClientBase<TSERVICE> where TSERVICE : class
{
    public delegate TPROXY CreateProxy();
    private readonly object mProxyLock = new object();</summary>
    private readonly CreateProxy mCreateProxy;

    private TPROXY mProxy;

    public ProxyWrapper(CreateProxy createProxyCallback)
    {
        mLog.Info.Write($"Creating Proxy for '{typeof(TPROXY).FullName}'");
        mCreateProxy = createProxyCallback;
        BuildProxy();
    }

    public event EventHandler<CommunicationState> ChannelStateChanged;

    public TPROXY Proxy
    {
        get
        {
            lock (mProxyLock)
            {
                if (mProxy == null)
                    BuildProxy();

                return mProxy;
            }
        }
    }

    public void ResetProxy()
    {
        mLog.Info.Write("Call ResetProxy()");
        lock (mProxyLock)
        {
            try
            {
                if (mProxy == null)
                    return;

                UnSubscribeFromChannelEvents();
                CloseProxy();
            }
            catch (Exception ex)
            {
                // Catch all exceptions, and ignore them.
            }
            finally
            {
                mProxy = null;
            }
        }
    }

    private void RaiseChannelStateChanged(object sender, EventArgs e)
        => ChannelStateChanged?.Invoke(this, mProxy.InnerChannel.State);

    private void RaiseChannelEnteredFaultyState()
        => ChannelStateChanged?.Invoke(this, CommunicationState.Faulted);

    private void BuildProxy()
    {
        lock (mProxyLock)
        {
            if (mProxy != null)
                ResetProxy();

            mProxy = mCreateProxy();
            SubscribeToChannelEvents();
        }
    }

    private void SubscribeToChannelEvents()
    {
        if (mProxy?.InnerChannel == null)
            return;

        mProxy.InnerChannel.Faulted += OnInnerChannelFaulted;
        mProxy.InnerChannel.Closed += RaiseChannelStateChanged;
        mProxy.InnerChannel.Closing += RaiseChannelStateChanged;
        mProxy.InnerChannel.Opened += RaiseChannelStateChanged;
        mProxy.InnerChannel.Opening += RaiseChannelStateChanged;
    }

    private void UnSubscribeFromChannelEvents()
    {
        if (mProxy?.InnerChannel == null)
            return;

        mProxy.InnerChannel.Faulted -= OnInnerChannelFaulted;
        mProxy.InnerChannel.Closed -= RaiseChannelStateChanged;
        mProxy.InnerChannel.Closing -= RaiseChannelStateChanged;
        mProxy.InnerChannel.Opened -= RaiseChannelStateChanged;
        mProxy.InnerChannel.Opening -= RaiseChannelStateChanged;
    }

    private void OnInnerChannelFaulted(object sender, EventArgs e)
    {
        ResetProxy();
        RaiseChannelEnteredFaultyState();
    }

    private void CloseProxy()
    {
        try
        {
            mProxy.Close();
        }
        catch (Exception ex)
        {
            try
            {
                mProxy.Abort();
            }
            catch (Exception abortException) {  // ignored  }
        }

        try
        {
            mProxy.ChannelFactory.Close();
        }
        catch (Exception ex)
        {
            try
            {
                mProxy.ChannelFactory.Abort();
            }
            catch (Exception abortException) {  // ignored  }
        }
    }
}

服务器只维护回调列表。

  • 为“AgentManagerProxy”创建代理
  • 调用 BuildProxy()
  • mProxy.Open() [状态=已创建]
  • 调用 SubscribeToChannelEvents()
  • …………………………………………………………………………
  • 服务停止...
  • 调用 ResetProxy()
  • 调用 UnSubscribeFromChannelEvents()
  • 调用 CloseProxy(): mProxy.Close()
  • 调用 CloseProxy(): mProxy.ChannelFactory.Close()

为客户端添加的日志记录在第一次启动时显示:

第二次启动服务失败:

  • 为“AgentManagerProxy”创建代理
  • 调用 BuildProxy()
  • mProxy.Open() [状态=已创建]

错误: 由于 System.TimeoutException 没有代理:请求通道在 00:00:00 之后尝试发送超时。增加传递给 Request 调用的超时值或增加 Binding 上的 SendTimeout 值。分配给此操作的时间可能是较长超时的一部分。---> System.TimeoutException: 对' http://xxxxx:9003/AgentManager的 HTTP 请求' 已超过分配的超时时间 00:00:00。分配给此操作的时间可能是较长超时的一部分。在 System.ServiceModel.Channels.HttpChannelUtilities.SetRequestTimeout(HttpWebRequest 请求,TimeSpan 超时)在 System.ServiceModel.Channels.HttpChannelFactory`1.HttpRequestChannel.HttpChannelRequest.SendRequest(消息消息,TimeSpan 超时)在 System.ServiceModel.Channels.RequestChannel.Request (消息消息,TimeSpan 超时) --- 内部异常堆栈跟踪结束 ---

4

1 回答 1

0

您的客户端数量可能超过 Maxconcurrentinstances 的值。Maxconcurrentinstances 是一个正整数,用于限制一次可以通过 ServiceHost 执行的 instancecontext 对象的数量。当槽数低于限制时,创建其他实例的请求将排队并完成。默认值为maxconcurrentsessions和maxconcurrentcalls之和。需要在每个客户端完成后关闭通道。否则客户端排队时间长了会出现异常。

try
{
   if (client.State != System.ServiceModel.CommunicationState.Faulted)
   {
      client.Close();
   }
}
catch (Exception ex)
{
   client.Abort();
}

关闭方法可能会失败。如果 close 方法失败,则需要 abort 方法。

于 2020-06-10T08:28:47.110 回答