我已经创建了一个 WCF 发布者订阅者服务,它每天处理 70 到 80 个连接如果连接中断(由于 Internet 连接失败、系统关闭等),服务本身将在连接恢复时尝试自行建立连接。
有时回来我注意到我的服务抛出异常(操作超时)
This request operation sent to net.tcp://localhost:2001/sub did not receive a reply within the configured timeout (00:00:59.9799877). The time allotted to this operation may have been a portion of a longer timeout. This may be because the service is still processing the operation or because the service was unable to send a reply message. Please consider increasing the operation timeout (by casting the channel/proxy to IContextChannel and setting the OperationTimeout property) and ensure that the service is able to connect to the client
然后在其中一些异常之后 RM 目标超时异常出现,这会导致与服务进一步通信的线路发生故障
创建可靠会话的请求已被 RM 目的地拒绝。服务器 'net.tcp://localhost:2001/Sub' 太忙,无法处理此请求。稍后再试。无法打开通道。
在此期间,已经订阅的客户端成功响应了 pub sub 请求
没有新频道订阅该服务
我在服务中使用 net Tcp 绑定
客户代码
public class Subscriber : IPublishing
{
static int Counter = 0;
ISubscription _proxy;
string _endpoint = string.Empty;
public Subscriber()
{
_endpoint = "net.tcp://localhost:2001/sub";
MakeProxy(_endpoint, this);
}
void MakeProxy(string EndpoindAddress, object callbackinstance)
{
try
{
NetTcpBinding netTcpbinding = new NetTcpBinding(SecurityMode.None);
EndpointAddress endpointAddress = new EndpointAddress(EndpoindAddress);
InstanceContext context = new InstanceContext(callbackinstance);
// Added for Reliable communication
netTcpbinding.ReliableSession.Enabled = true;
netTcpbinding.ReliableSession.Ordered = true;
netTcpbinding.MaxBufferSize = 2147483647;
netTcpbinding.MaxBufferPoolSize = 2147483647;
netTcpbinding.MaxReceivedMessageSize = 2147483647;
netTcpbinding.ReliableSession.InactivityTimeout = TimeSpan.MaxValue;
netTcpbinding.ReceiveTimeout = TimeSpan.MaxValue;
DuplexChannelFactory<ISubscription> channelFactory = new DuplexChannelFactory<ISubscription>(new InstanceContext(this), netTcpbinding, endpointAddress);
_proxy = channelFactory.CreateChannel();
Counter++;
}
catch (Exception ex)
{
//
}
}
public void Subscribe()
{
try
{
_proxy.Subscribe("500");
Counter = 0;
}
catch (Exception ex)
{
((IContextChannel)_proxy).Abort();
}
}
public void Publish(Message e, String ID)
{
}
public void _Subscribe()
{
try
{
//OnUnSubscribe();
Subscribe();
}
catch (Exception ex)
{
}
}
}
服务器代码
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
class Subscription : ISubscription
{
#region ISubscription Members
public void Subscribe(string ID)
{
String s = DateTime.Now.ToString(
IPublishing subscriber = OperationContext.Current.GetCallbackChannel<IPublishing>();
Filter.AddSubscriber(KioskID, subscriber);
}
public void UnSubscribe(string ID)
{
IPublishing subscriber = OperationContext.Current.GetCallbackChannel<IPublishing>();
Filter.RemoveSubscriber(KioskID, subscriber);
}
#endregion
}
class Filter
{
static Dictionary<string, List<IPublishing>> _subscribersList = new Dictionary<string, List<IPublishing>>();
static public Dictionary<string, List<IPublishing>> SubscribersList
{
get
{
lock (typeof(Filter))
{
return _subscribersList;
}
}
}
static public List<IPublishing> GetSubscribers(String topicName)
{
lock (typeof(Filter))
{
if (SubscribersList.ContainsKey(topicName))
{
return SubscribersList[topicName];
}
else
return null;
}
}
static public void AddSubscriber(String topicName, IPublishing subscriberCallbackReference)
{
DebugLog.WriteLog("C:\\DevPubSubServiceLogs", topicName + "came for Sub");
lock (typeof(Filter))
{
DebugLog.WriteLog("C:\\DevPubSubServiceLogs",topicName + "inside lock");
if (SubscribersList.ContainsKey(topicName))
{
// Removing any stray subscribers for same topic name
// because only 1 subscriber for 1 topic name should exist
SubscribersList.Remove(topicName);
}
List<IPublishing> newSubscribersList = new List<IPublishing>();
newSubscribersList.Add(subscriberCallbackReference);
SubscribersList.Add(topicName, newSubscribersList);
}
DebugLog.WriteLog("C:\\DevPubSubServiceLogs", topicName + "subscribed");
}
static public void RemoveSubscriber(String topicName, IPublishing subscriberCallbackReference)
{
lock (typeof(Filter))
{
if (SubscribersList.ContainsKey(topicName))
{
//if (SubscribersList[topicName].Contains(subscriberCallbackReference))
//{
// SubscribersList[topicName].Remove(subscriberCallbackReference);
//}
SubscribersList.Remove(topicName);
}
}
}
}
合同
[ServiceContract]
public interface IPublishing
{
[OperationContract(IsOneWay = true)]
void Publish(Message e, string ID);
}
[ServiceContract(CallbackContract = typeof(IPublishing))]
public interface ISubscription
{
[OperationContract]
void Subscribe(string topicName);
[OperationContract]
void UnSubscribe(string topicName);
}
请协助..