4

我编写了一个服务,使用PollingDuplexHttpBinding它有一个使用它的 Silverllight 客户端。我的服务基本上有给定数量的客户端向服务发送他们的数据(经常是每秒,并且数据非常大,每次调用大约 5KB),并监听其他客户端向服务发送的新数据被路由到他们,非常类似于聊天室架构。

我注意到的问题是,当客户端通过 Internet 连接到服务时,几分钟后服务的响应变得缓慢并且回复变得滞后。我得出的结论是,当服务主机的上传容量达到时(互联网上传速度,服务器上只有大约 15KB/s),其他客户端发送的消息会在有可用带宽时进行相应的缓冲和处理。我想知道如何才能限制服务用于存储从客户端接收到的消息的缓冲区的占用?我的客户获得所有数据并不是那么重要,而是他们获得其他人发送的最新数据,因此实时连接是我正在寻找的,以保证交付为代价。

简而言之,我希望能够在服务填满时清理我的队列/缓冲区,或者达到某个上限并开始用收到的呼叫再次填充它以消除延迟。我该怎么做呢?MaxBufferSize我需要在服务端和客户端减少该属性吗?或者我是否需要在我的服务中编写此功能?有任何想法吗?

谢谢。

编辑:

这是我的服务架构:

//the service
[ServiceContract(Namespace = "", CallbackContract = typeof(INewsNotification))]
[AspNetCompatibilityRequirements(RequirementsMode = AspNetCompatibilityRequirementsMode.Allowed)]
[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Multiple, InstanceContextMode = InstanceContextMode.Single)]
public class NewsService
{


private static Dictionary<IChatNotification, string> clients = new Dictionary<IChatNotification, string>();
private ReaderWriterLockSlim subscribersLock = new ReaderWriterLockSlim();

[OperationContract(IsOneWay = true)]
public void PublishNotifications(byte[] data)
{
            try
            {
                subscribersLock.EnterReadLock();
                List<INewsNotification> removeList = new List<INewsNotification>();
                lock (clients)
                {
                    foreach (var subscriber in clients)
                    {
                        if (OperationContext.Current.GetCallbackChannel<IChatNotification>() == subscriber.Key)
                        {
                            continue;
                        }
                        try
                        {
                            subscriber.Key.BeginOnNotificationSend(data, GetCurrentUser(), onNotifyCompletedNotificationSend, subscriber.Key);

                        }
                        catch (CommunicationObjectAbortedException)
                        {
                            removeList.Add(subscriber.Key);
                        }
                        catch (CommunicationException)
                        {
                            removeList.Add(subscriber.Key);
                        }
                        catch (ObjectDisposedException)
                        {
                            removeList.Add(subscriber.Key);
                        }

                    }
                }

                foreach (var item in removeList)
                {
                    clients.Remove(item);
                }
            }
            finally
            {
                subscribersLock.ExitReadLock();
            }
        }

}


//the callback contract
[ServiceContract]
public interface INewsNotification
{
       [OperationContract(IsOneWay = true, AsyncPattern = true)]
       IAsyncResult BeginOnNotificationSend(byte[] data, string username, AsyncCallback callback, object asyncState);
       void EndOnNotificationSend(IAsyncResult result);
}

服务配置:

  <system.serviceModel>
    <extensions>
      <bindingExtensions>
        <add name="pollingDuplex" type="System.ServiceModel.Configuration.PollingDuplexHttpBindingCollectionElement, System.ServiceModel.PollingDuplex, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35" />
      </bindingExtensions>
    </extensions>
    <behaviors>
      <serviceBehaviors>
        <behavior name="">

          <serviceMetadata httpGetEnabled="true" />
          <serviceThrottling maxConcurrentSessions="2147483647" />
          <serviceDebug includeExceptionDetailInFaults="true" />
        </behavior>
      </serviceBehaviors>
    </behaviors>
    <bindings>
      <pollingDuplex>

        <binding name="myPollingDuplex" duplexMode="SingleMessagePerPoll" 
                 maxOutputDelay="00:00:00" inactivityTimeout="02:00:00" 
                 serverPollTimeout="00:55:00" sendTimeout="02:00:00"  openTimeout="02:00:00" 
                  maxBufferSize="10000"  maxReceivedMessageSize="10000" maxBufferPoolSize="1000"/>

      </pollingDuplex>
    </bindings>
    <serviceHostingEnvironment aspNetCompatibilityEnabled="true" multipleSiteBindingsEnabled="true" />
    <services>
      <service name="NewsNotificationService.Web.NewsService">
        <endpoint address="" binding="pollingDuplex" bindingConfiguration="myPollingDuplex" contract="NewsNotificationService.Web.NewsService" />
        <endpoint address="mex" binding="mexHttpBinding" contract="IMetadataExchange" />
      </service>
    </services>
  </system.serviceModel>
    <system.webServer>
        <directoryBrowse enabled="true" />
    </system.webServer>
</configuration>

客户端通常会在 500 毫秒到 1000 毫秒之间调用服务,如下所示:

_client.PublishNotificationAsync(byte[] data);

并且回调将通知客户端其他客户端发送的通知:

void client_NotifyNewsReceived(object sender, NewsServiceProxy.OnNewsSendReceivedEventArgs e)
        {
                e.Usernamer//WHich client published the data
                e.data//contents of the notification
        }

回顾一下,当客户端数量增加,并且服务主机通过互联网上传速度受到限制时,服务发送给订阅者的消息会在某处缓冲并在队列中处理,这就是导致问题的原因,我不知道这些消息在哪里缓冲。在 LAN 中,该服务运行良好,因为服务器的上传速度等于其下载速度(对于 100KB/s 的传入呼叫,它发出 100KB/s 的通知)。这些消息在哪里缓冲?我怎样才能清除这个缓冲区?

我做了一些实验来尝试查看消息是否在服务中缓冲,我尝试在客户端上调用此方法,但它始终返回 0,即使一个客户端仍在接收其他人发送的通知 4- 5 分钟前:

[OperationContract(IsOneWay = false)]
public int GetQueuedMessages()
{

            return OperationContext.Current.OutgoingMessageHeaders.Count();
}
4

2 回答 2

4

我对你的情况做了一些数学计算。

  • 消息大小 = 100KB
  • 上传通道 = 15KB/s
  • 下载通道 = 100KB/s
  • 客户端每秒调用服务 1-2 次

客户端通常会在 500 毫秒到 1000 毫秒之间调用服务

这个对吗?

对于一个客户端,仅消息的下载流量将是 100-200KB/s,这只是消息正文。将有更多的标题和更多的启用安全性。

消息将被合并以进行异步调用。因此,如果我们有 3 个客户端,并且每个发送的消息回调包含每个客户端的 2 条消息。4 个客户端 - 每个回调中有 3 条消息。

对于 3 个客户端,下载通道的速度为 200-400KB/s。

对我来说,看起来消息对于您声明的带宽来说太大了。

检查您是否可以:

  1. 减小消息大小。我不知道您的业务性质,因此无法在这里提供建议。

  2. 对消息或流量使用压缩。

  3. 增加网络带宽。没有那个即使是理想的解决方案也会有非常高的延迟。您可以花费数天甚至数周的时间来优化您的代码,即使您正确地使用了您的网络解决方案,仍然会很慢。

我知道这听起来像是明显的船长,但有些问题没有正确答案,除非你改变问题。

执行上述步骤后 ServiceThrottlingBehavior,结合将管理回调队列的自定义代码。

ServiceThrottlingBehavior如果达到边界,则拒绝请求。

http://msdn.microsoft.com/en-us/library/ms735114(v=vs.100).aspx

这是微不足道的样本。实数应该专门为您的环境定义。

<serviceThrottling 
 maxConcurrentCalls="1" 
 maxConcurrentSessions="1" 
 maxConcurrentInstances="1"
/>

更新:

我在这个问题上犯了很大的错误,每次调用都是 5KB/s,

即使是 5KB 的消息,15KB/s 也不够。我们只计算 3 个用户的流量。3 每秒传入消息。系统现在应该使用双工来通知用户其他人发送了什么。每个用户都应该从他的合作伙伴那里得到消息。我们总共有 3 条消息。一个(属于发件人的)可能会被跳过,因此每个用户都应该收到 2 条消息。3 个用户将获得 10KB(5KB + 5KB = 一条消息 10KB)= 30KB。每秒一条消息将为 3 个用户在上传通道中产生 30KB/秒的速度。

这些消息在哪里缓冲?我怎样才能清除这个缓冲区?

这取决于您如何托管您的服务。如果它是自托管服务,则根本没有缓冲。您的代码试图向接收者发送消息,但由于通道被淹没,它变得非常慢。使用 IIS 可能会有一些缓冲,但从外部触摸它不是一个好习惯。

正确的解决方案是节流。由于带宽限制,您不应尝试将所有消息发送到所有客户端。相反,您应该例如限制发送消息的线程数量。因此,从上面的 3 个用户而不是并行发送 3 条消息,您可以按顺序发送它们,或者决定只有一个用户将在该轮中获得更新,下一个用户将在下一轮中获得更新。

所以一般的想法是不要在你有数据后立即将所有内容发送给每个人,而是只发送你能负担得起的数据量,并使用线程数或响应速度来控制它。

于 2012-06-09T17:19:44.370 回答
1

MaxBufferSize不会帮你解决这个问题。您将不得不自己编写代码,我不知道任何现有的解决方案/框架。然而,这听起来确实是一个有趣的问题。您可以首先为每个连接的客户端维护一个Queue<Message>,然后在推送到此队列时(或当客户端调用 dequeue 时),您可以重新评估Message应该发送的内容。

更新:首先,我会忘记尝试从客户端和配置中执行此操作,您将不得不自己编写代码

在这里,我可以看到您发送给客户的位置:

 subscriber.Key.BeginOnNotificationSend(data, GetCurrentUser(), onNotifyCompletedNotificationSend, subscriber.Key);

因此,与其将这些通知异步推送到每个客户端,不如将它们推送到Queue<byte[]>. 每个客户端连接都有自己的队列,您可能应该构造一个专用于每个客户端连接的类:

请注意,此代码不会开箱即用,并且可能存在一些逻辑错误,仅用作指南

public class ClientConnection
{
    private INewsNotification _callback;
    private Queue<byte> _queue = new Queue<byte>();
    private object _lock = new object();
    private bool _isSending
    public ClientConnection(INewsNotification callBack)
    {
           _callback=callback;
    }
    public void Enqueue(byte[] message)
    { 
       lock(_lock)
       {               
           //what happens here depends on what you want to do. 
           //Do you want to only send the latest message? 
           //if yes, you can just clear out the queue and put the new message in there,                         
           //or you could keep the most recent 5 messages.                
           if(_queue.Count > 0)
               _queue.Clear();
          _queue.Enqueue(message);
           if(!_isSending)
               BeginSendMessage();
       }
    }

    private void BeginSendMessage()
    {
       _isSending=true;
        _callback.BeginOnNotificationSend(_queue.Dequeue(), GetCurrentUser(), EndCallbackClient, subscriber.Key);

    }

    private void EndCallbackClient(IAsyncResult ar)
    {
        _callback.EndReceive(ar);
        lock(_lock)
        {
           _isSending=false;
           if(_queue.Count > 0)
              BeginSendMessage();//more messages to send
        }
    }
}

想象一个场景,一条消息被推送到客户端,同时发送 9 条消息调用ClientConnection.Enqueue。当第一条消息完成后,它会检查应该只包含最后一条(第 9 条消息)的队列

于 2012-06-07T14:19:01.187 回答