我想创建一个使用 MSMQ 绑定的 WCF 服务,因为我有大量的通知要处理。重要的是客户端不会被服务拖住,并且通知按照它们被引发的顺序进行处理,因此是队列实现。
另一个考虑因素是弹性。我知道我可以集群 MSMQ 本身以使队列更加健壮,但我希望能够在不同的服务器上运行我的服务实例,因此如果服务器崩溃通知不会在队列中建立但另一台服务器继续处理.
我已经对 MSMQ 绑定进行了试验,发现您可以让多个服务实例在同一个队列上进行侦听,并且它们最终会进行一种循环,负载分布在可用服务中。这很好,但我最终会丢失队列的顺序,因为不同的实例需要不同的时间来处理请求。
我一直在使用一个简单的控制台应用程序进行实验,这是下面的史诗代码转储。当它运行时,我得到如下输出:
host1 open
host2 open
S1: 01
S1: 03
S1: 05
S2: 02
S1: 06
S1: 08
S1: 09
S2: 04
S1: 10
host1 closed
S2: 07
host2 closed
我想要发生的是:
host1 open
host2 open
S1: 01
<pause while S2 completes>
S2: 02
S1: 03
<pause while S2 completes>
S2: 04
S1: 05
S1: 06
etc.
我会认为由于 S2 尚未完成,它可能仍然会失败并将正在处理的消息返回到队列中。因此,不应允许 S1 从队列中拉出另一条消息。我的队列是事务性的,我尝试设置TransactionScopeRequired = true
服务但无济于事。
这甚至可能吗?我是不是走错了路?在没有某种中央同步机制的情况下,是否有其他方法可以构建故障转移服务?
class WcfMsmqProgram
{
private const string QueueName = "testq1";
static void Main()
{
// Create a transactional queue
string qPath = ".\\private$\\" + QueueName;
if (!MessageQueue.Exists(qPath))
MessageQueue.Create(qPath, true);
else
new MessageQueue(qPath).Purge();
// S1 processes as fast as it can
IService s1 = new ServiceImpl("S1");
// S2 is slow
IService s2 = new ServiceImpl("S2", 2000);
// MSMQ binding
NetMsmqBinding binding = new NetMsmqBinding(NetMsmqSecurityMode.None);
// Host S1
ServiceHost host1 = new ServiceHost(s1, new Uri("net.msmq://localhost/private"));
ConfigureService(host1, binding);
host1.Open();
Console.WriteLine("host1 open");
// Host S2
ServiceHost host2 = new ServiceHost(s2, new Uri("net.msmq://localhost/private"));
ConfigureService(host2, binding);
host2.Open();
Console.WriteLine("host2 open");
// Create a client
ChannelFactory<IService> factory = new ChannelFactory<IService>(binding, new EndpointAddress("net.msmq://localhost/private/" + QueueName));
IService client = factory.CreateChannel();
// Periodically call the service with a new number
int counter = 1;
using (Timer t = new Timer(o => client.EchoNumber(counter++), null, 0, 500))
{
// Enter to stop
Console.ReadLine();
}
host1.Close();
Console.WriteLine("host1 closed");
host2.Close();
Console.WriteLine("host2 closed");
// Wait for exit
Console.ReadLine();
}
static void ConfigureService(ServiceHost host, NetMsmqBinding binding)
{
var endpoint = host.AddServiceEndpoint(typeof(IService), binding, QueueName);
}
[ServiceContract]
interface IService
{
[OperationContract(IsOneWay = true)]
void EchoNumber(int number);
}
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
class ServiceImpl : IService
{
public ServiceImpl(string name, int sleep = 0)
{
this.name = name;
this.sleep = sleep;
}
private string name;
private int sleep;
public void EchoNumber(int number)
{
Thread.Sleep(this.sleep);
Console.WriteLine("{0}: {1:00}", this.name, number);
}
}
}