我在方法中有这样的代码:
ISubject<Message> messages = new ReplaySubject<Message>(messageTimeout);
public void HandleNext(string clientId, Action<object> callback)
{
messages.Where(message => !message.IsHandledBy(clientId))
.Take(1)
.Subscribe(message =>
{
callback(message.Message);
message.MarkAsHandledBy(clientId);
});
}
rx'y 编码它的方式是什么,以便在多个并发调用时不会发生MarkAsHandledBy()
和之间的竞争?IsHandledBy()
HandleNext()
编辑:
这是用于长轮询。HandleNext()
为每个 Web 请求调用。请求只能处理一条消息,然后返回给客户端。下一个请求接受下一条消息,依此类推。
完整的代码(当然仍在进行中)是这样的:
public class Queue
{
readonly ISubject<MessageWrapper> messages;
public Queue() : this(TimeSpan.FromSeconds(30)) {}
public Queue(TimeSpan messageTimeout)
{
messages = new ReplaySubject<MessageWrapper>(messageTimeout);
}
public void Send(string channel, object message)
{
messages.OnNext(new MessageWrapper(new List<string> {channel}, message));
}
public void ReceiveNext(string clientId, string channel, Action<object> callback)
{
messages
.Where(message => message.Channels.Contains(channel) && !message.IsReceivedBy(clientId))
.Take(1)
.Subscribe(message =>
{
callback(message.Message);
message.MarkAsReceivedFor(clientId);
});
}
class MessageWrapper
{
readonly List<string> receivers;
public MessageWrapper(List<string> channels, object message)
{
receivers = new List<string>();
Channels = channels;
Message = message;
}
public List<string> Channels { get; private set; }
public object Message { get; private set; }
public void MarkAsReceivedFor(string clientId)
{
receivers.Add(clientId);
}
public bool IsReceivedBy(string clientId)
{
return receivers.Contains(clientId);
}
}
}
编辑2:
现在我的代码如下所示:
public void ReceiveNext(string clientId, string channel, Action<object> callback)
{
var subscription = Disposable.Empty;
subscription = messages
.Where(message => message.Channels.Contains(channel))
.Subscribe(message =>
{
if (message.TryDispatchTo(clientId, callback))
subscription.Dispose();
});
}
class MessageWrapper
{
readonly object message;
readonly List<string> receivers;
public MessageWrapper(List<string> channels, object message)
{
this.message = message;
receivers = new List<string>();
Channels = channels;
}
public List<string> Channels { get; private set; }
public bool TryDispatchTo(string clientId, Action<object> handler)
{
lock (receivers)
{
if (IsReceivedBy(clientId)) return false;
handler(message);
MarkAsReceivedFor(clientId);
return true;
}
}
void MarkAsReceivedFor(string clientId)
{
receivers.Add(clientId);
}
bool IsReceivedBy(string clientId)
{
return receivers.Contains(clientId);
}
}