1

我已经创建了一个 WCF 发布者订阅者服务,它每天处理 70 到 80 个连接如果连接中断(由于 Internet 连接失败、系统关闭等),服务本身将在连接恢复时尝试自行建立连接。

有时回来我注意到我的服务抛出异常(操作超时)

This request operation sent to net.tcp://localhost:2001/sub did not receive a reply within the configured timeout (00:00:59.9799877). The time allotted to this operation may have been a portion of a longer timeout. This may be because the service is still processing the operation or because the service was unable to send a reply message. Please consider increasing the operation timeout (by casting the channel/proxy to IContextChannel and setting the OperationTimeout property) and ensure that the service is able to connect to the client

然后在其中一些异常之后 RM 目标超时异常出现,这会导致与服务进一步通信的线路发生故障

创建可靠会话的请求已被 RM 目的地拒绝。服务器 'net.tcp://localhost:2001/Sub' 太忙,无法处理此请求。稍后再试。无法打开通道

在此期间,已经订阅的客户端成功响应了 pub sub 请求

没有新频道订阅该服务

我在服务中使用 net Tcp 绑定

客户代码

 public class Subscriber : IPublishing
{
   
    static int Counter = 0;
    ISubscription _proxy;
    string _endpoint = string.Empty;
    
    public Subscriber()
    {
        _endpoint = "net.tcp://localhost:2001/sub";
        MakeProxy(_endpoint, this);
    }
    void MakeProxy(string EndpoindAddress, object callbackinstance)
    {
        try
        {
            NetTcpBinding netTcpbinding = new NetTcpBinding(SecurityMode.None);
            EndpointAddress endpointAddress = new EndpointAddress(EndpoindAddress);
            InstanceContext context = new InstanceContext(callbackinstance);

            // Added for Reliable communication
            netTcpbinding.ReliableSession.Enabled = true;
            netTcpbinding.ReliableSession.Ordered = true;
            netTcpbinding.MaxBufferSize = 2147483647;
            netTcpbinding.MaxBufferPoolSize = 2147483647;
            netTcpbinding.MaxReceivedMessageSize = 2147483647;


            netTcpbinding.ReliableSession.InactivityTimeout = TimeSpan.MaxValue;
            netTcpbinding.ReceiveTimeout = TimeSpan.MaxValue;
            DuplexChannelFactory<ISubscription> channelFactory = new DuplexChannelFactory<ISubscription>(new InstanceContext(this), netTcpbinding, endpointAddress);
          
            _proxy = channelFactory.CreateChannel();



            Counter++;
        }
        catch (Exception ex)
        {
         //
        }
    }
  

    public void Subscribe()
    {
        try
        {
           _proxy.Subscribe("500");
           Counter = 0;
        }
        catch (Exception ex)
        {
            ((IContextChannel)_proxy).Abort();
           
        }
    }

   
    public void Publish(Message e, String ID)
    {
       
    }

    public void _Subscribe()
    {
        try
        {
            //OnUnSubscribe();
            Subscribe();
        }
        catch (Exception ex)
        {
        
        }
    }
}

服务器代码

[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
class Subscription : ISubscription
{
    #region ISubscription Members

    public void Subscribe(string ID)
    {
        String s = DateTime.Now.ToString(
        IPublishing subscriber = OperationContext.Current.GetCallbackChannel<IPublishing>();
        Filter.AddSubscriber(KioskID, subscriber);
    }

    public void UnSubscribe(string ID)
    {
        IPublishing subscriber = OperationContext.Current.GetCallbackChannel<IPublishing>();
        Filter.RemoveSubscriber(KioskID, subscriber);
    }

    #endregion
}



class Filter
    {
        static Dictionary<string, List<IPublishing>> _subscribersList = new Dictionary<string, List<IPublishing>>();

        static public Dictionary<string, List<IPublishing>> SubscribersList
        {
            get
            {
                lock (typeof(Filter))
                {
                    return _subscribersList;
                }
            }

        }

        static public List<IPublishing> GetSubscribers(String topicName)
        {
            lock (typeof(Filter))
            {
                if (SubscribersList.ContainsKey(topicName))
                {
                    return SubscribersList[topicName];
                }
                else
                    return null;
            }
        }

        static public void AddSubscriber(String topicName, IPublishing subscriberCallbackReference)
        {
            DebugLog.WriteLog("C:\\DevPubSubServiceLogs", topicName + "came for Sub");
            lock (typeof(Filter))
            {
                DebugLog.WriteLog("C:\\DevPubSubServiceLogs",topicName +  "inside lock");
                if (SubscribersList.ContainsKey(topicName))
                {
                   // Removing any stray subscribers for  same topic name 
                   // because only 1 subscriber for 1 topic name should exist
                    SubscribersList.Remove(topicName);
                }
               
                List<IPublishing> newSubscribersList = new List<IPublishing>();
                newSubscribersList.Add(subscriberCallbackReference);
                SubscribersList.Add(topicName, newSubscribersList);

              
            }
            DebugLog.WriteLog("C:\\DevPubSubServiceLogs", topicName + "subscribed");

        }

        static public void RemoveSubscriber(String topicName, IPublishing subscriberCallbackReference)
        {
            lock (typeof(Filter))
            {
                if (SubscribersList.ContainsKey(topicName))
                {
                    //if (SubscribersList[topicName].Contains(subscriberCallbackReference))
                    //{
                    //    SubscribersList[topicName].Remove(subscriberCallbackReference);
                    //}
                    SubscribersList.Remove(topicName);
                }
            }
        }

    }

合同

[ServiceContract]
public interface IPublishing
{
    [OperationContract(IsOneWay = true)]
    void Publish(Message e, string ID);

  
}

[ServiceContract(CallbackContract = typeof(IPublishing))]
public interface ISubscription
{
    [OperationContract]
    void Subscribe(string topicName);

    [OperationContract]
    void UnSubscribe(string topicName);
}

请协助..

4

0 回答 0