0

我使用 TCP 上的 CustomBinding 在同一进程(用于测试)中设置了 WCF 双工服务(单向消息)和多个客户端。

只要只有一个客户被回调,这一切都可以正常工作。但是,对于多个客户端,它会失败。在后一种情况下,一个客户端工作,其他客户端可以发送他们的请求但没有得到响应。服务器可以毫无问题地发送所有响应。WCF 跟踪在客户端显示 EndpointNotFoundException:

There was no channel that could accept the message with action
'http://tempuri.org/IMyService/Response'.
at System.ServiceModel.Dispatcher.ErrorBehavior.ThrowAndCatch(Exception e, Message message)
at System.ServiceModel.Channels.DatagramChannelDemuxer`2.ProcessItem(TInnerItem item)
at System.ServiceModel.Channels.DatagramChannelDemuxer`2.HandleReceiveResult(IAsyncResult result)
at System.ServiceModel.Channels.DatagramChannelDemuxer`2.OnReceiveCompleteStatic(IAsyncResult result)
at System.Runtime.Fx.AsyncThunk.UnhandledExceptionFrame(IAsyncResult result)
at System.Runtime.AsyncResult.Complete(Boolean completedSynchronously)
at System.Runtime.AsyncResult.Complete(Boolean completedSynchronously, Exception exception)
at System.ServiceModel.Channels.InputChannel.HelpReceiveAsyncResult.OnReceive(IAsyncResult result)
at System.Runtime.Fx.AsyncThunk.UnhandledExceptionFrame(IAsyncResult result)
at System.Runtime.AsyncResult.Complete(Boolean completedSynchronously)
at System.Runtime.InputQueue`1.AsyncQueueReader.Set(Item item)
at System.Runtime.InputQueue`1.Dispatch()
at System.ServiceModel.Channels.SingletonChannelAcceptor`3.DispatchItems()
at System.ServiceModel.Channels.DuplexSessionOneWayChannelListener.ChannelReceiver.OnReceive(IAsyncResult result)
at System.Runtime.Fx.AsyncThunk.UnhandledExceptionFrame(IAsyncResult result)
at System.Runtime.AsyncResult.Complete(Boolean completedSynchronously)
at System.Runtime.AsyncResult.Complete(Boolean completedSynchronously, Exception exception)
at System.ServiceModel.Channels.FramingDuplexSessionChannel.TryReceiveAsyncResult.OnReceive(IAsyncResult result)
at System.Runtime.Fx.AsyncThunk.UnhandledExceptionFrame(IAsyncResult result)
at System.Runtime.AsyncResult.Complete(Boolean completedSynchronously)
at System.Runtime.AsyncResult.Complete(Boolean completedSynchronously, Exception exception)
at System.ServiceModel.Channels.SynchronizedMessageSource.ReceiveAsyncResult.OnReceiveComplete(Object state)
at System.ServiceModel.Channels.SessionConnectionReader.OnAsyncReadComplete(Object state)
at System.ServiceModel.Channels.TracingConnection.TracingConnectionState.ExecuteCallback()
at System.ServiceModel.Channels.TracingConnection.WaitCallback(Object state)
at System.ServiceModel.Channels.SocketConnection.FinishRead()
at System.ServiceModel.Channels.SocketConnection.AsyncReadCallback(Boolean haveResult, Int32 error, Int32 bytesRead)
at System.ServiceModel.Channels.OverlappedContext.CompleteCallback(UInt32 error, UInt32 numBytes, NativeOverlapped* nativeOverlapped)
at System.Runtime.Fx.IOCompletionThunk.UnhandledExceptionFrame(UInt32 error, UInt32 bytesRead, NativeOverlapped* nativeOverlapped)
at System.Threading._IOCompletionCallback.PerformIOCompletionCallback(UInt32 errorCode, UInt32 numBytes, NativeOverlapped* pOVERLAP)

在异常发生时,我确信所有客户端通道仍处于打开状态,因为它们仅在收到响应后才会关闭。看起来客户端接收到消息但无法将其发送到客户端实例。

这是我的完整示例(通过代码配置 WCF):

using System;
using System.Text;
using System.Collections.Generic;
using System.Linq;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System.ServiceModel;
using System.ServiceModel.Channels;
using System.Threading;
using System.Threading.Tasks;

namespace WcfDuplex
{
  [TestClass]
  public class WcfDuplexTest
  {
    [TestMethod]
    public void WcfDuplexTest1()
    {
      const int NumParallelRequests = 2;
      const int NumMessagesPerThread = 1;
      using (var host = MyServer.CreateServer(TestContext))
      {
        Action clientAction = () =>
          {
            using (var client = MyClient.CreateProxy(TestContext))
            {
              using (var scope = new OperationContextScope(client))
              {
                var callback = MyClient.GetCallbackHandler(client);
                OperationContext.Current.OutgoingMessageHeaders.ReplyTo = client.LocalAddress;
                for (int i = 1; i <= NumMessagesPerThread; i++)
                {
                  string message = String.Format("Message {0} from tread {1}", i, Thread.CurrentThread.ManagedThreadId);
                  client.Request(message);
                  bool success = callback.MessageArrived.WaitOne(5000);
                  Assert.IsTrue(success, "Timeout while waiting for: " + message);
                  Assert.IsTrue(callback.Message.EndsWith(message));
                }
              }
              client.Close();
            }
          };
        var tasks = new List<Task>();
        for (int i = 0; i < NumParallelRequests; i++)
          tasks.Add(Task.Factory.StartNew(clientAction));
        foreach (var task in tasks)
          task.Wait(10000);
      }
    }

    public TestContext TestContext
    {
      get;
      set;
    }
  }

  [ServiceContract(CallbackContract = typeof(IMyCallback))]
  interface IMyService
  {
    [OperationContract(IsOneWay = true)]
    void Request(string message);
  }

  [ServiceContract()]
  interface IMyCallback
  {
    [OperationContract(IsOneWay = true)]
    void Response(string message);
  }

  interface IMyServiceChannel : IMyService, IClientChannel
  { }

  [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Multiple)]
  class MyService : IMyService
  {
    public void Request(string message)
    {
      var context = OperationContext.Current.Host.Extensions.Find<TestContextExtension>().TestContext;
      var callback = OperationContext.Current.GetCallbackChannel<IMyCallback>();
      OperationContext.Current.OutgoingMessageHeaders.To = OperationContext.Current.IncomingMessageHeaders.ReplyTo.Uri;
      context.WriteLine("Server received message: {0}. Reply to {1}", message, OperationContext.Current.OutgoingMessageHeaders.To);
      string responseMessage = "From server thread " + Thread.CurrentThread.ManagedThreadId + ": " + message;
      callback.Response(responseMessage);
      context.WriteLine("Server sent response: " + responseMessage);
    }
  }

  class MyCallbackHandler : IMyCallback, IExtension<IContextChannel>
  {

    private readonly TestContext Context;
    public string Message { get; set; }
    public AutoResetEvent MessageArrived { get; private set; }

    public MyCallbackHandler(TestContext context)
    {
      Context = context;
      MessageArrived = new AutoResetEvent(false);
    }

    public void Response(string message)
    {
      Message = message;
      Context.WriteLine("Client received message: " + message + " on " + OperationContext.Current.Channel.LocalAddress +
        " in thread " + Thread.CurrentThread.ManagedThreadId);
      MessageArrived.Set();
    }

    public void Attach(IContextChannel owner) { }
    public void Detach(IContextChannel owner) { }
  }

  class MyServer
  {
    public const string Url = "net.tcp://localhost:8731/MyService/";

    public static ServiceHost CreateServer(TestContext context)
    {
      var host = new ServiceHost(typeof(MyService));
      host.Extensions.Add(new TestContextExtension { TestContext = context });
      host.AddServiceEndpoint(typeof(IMyService), MyClient.GetBinding(), new Uri(MyServer.Url));
      host.Open();
      return host;
    }
  }

  class TestContextExtension : IExtension<ServiceHostBase>
  {
    public TestContext TestContext { get; set; }
    public void Attach(ServiceHostBase owner) { }
    public void Detach(ServiceHostBase owner) { }
  }

  class MyClient
  {
    public static MyCallbackHandler GetCallbackHandler(IMyServiceChannel channel)
    {
      var callback = channel.Extensions.Find<MyCallbackHandler>();
      return callback;
    }

    public static IMyServiceChannel CreateProxy(TestContext testContext)
    {
      var callback = new MyCallbackHandler(testContext);
      var instanceContext = new InstanceContext(callback);
      var binding = GetBinding();
      int port = 8732;// +Thread.CurrentThread.ManagedThreadId;
      binding.Elements.Find<CompositeDuplexBindingElement>().ClientBaseAddress = new Uri(String.Format("net.tcp://localhost:{0}/Client/", port));
      binding.OpenTimeout = TimeSpan.FromMinutes(5);
      var clientFactory = new DuplexChannelFactory<IMyServiceChannel>(instanceContext, binding);
      var client = clientFactory.CreateChannel(new EndpointAddress(MyServer.Url));
      client.Extensions.Add(callback);
      return client;
    }

    public static CustomBinding GetBinding()
    {
      var binding = new CustomBinding(
        new CompositeDuplexBindingElement(),
        new OneWayBindingElement(),
        new BinaryMessageEncodingBindingElement(),
        //new ReliableSessionBindingElement(),
        new TcpTransportBindingElement());
      return binding;
    }
  }
}

我使用以下文章作为帮助:hereherehere

4

1 回答 1

0

问题是绑定。有效的是:

var binding = new CustomBinding(
  new BinaryMessageEncodingBindingElement(),
  new ReliableSessionBindingElement(),
  new TcpTransportBindingElement());

CompositeDuplexBindingElement 和 OneWayBindingElement 引起了麻烦。TCP 类型的通道不需要它们。

同样有效的是:

var binding = new NetTcpBinding(SecurityMode.None);

或者

var binding = new WSDualHttpBinding(WSDualHttpSecurityMode.None);
binding.ClientBaseAddress = new Uri("http://localhost:8732/Client/");
于 2013-10-29T11:24:08.310 回答