在尝试了 IObservables 之后,我决定测试它们是否可以处理通过消息总线传入的消息。基本上我得到一个IObservable<Request>
并且Request
包含回复响应的必要功能。
在处理过程中,我必须反序列化数据并将其从 Request 转换为包含它实际需要执行的操作的 Command 对象。命令与请求无关。
反序列化后,我将其转换为正确的响应,但是为了发送响应,我需要原始的 Request 对象。我想在保持高代码可读性的同时尝试实现这一点。到目前为止,我已经使用扩展方法和 lambda 表达式来获得以下内容(其中requests
)IObservable<Request>
:
requestProcessor = requests
.Deserialize<IdentityRequest>()
.Where(idRequest => idRequest.Address != null)
.Select(idRequest => new IdentityResponse() {Identity = identityTable[idRequest.Address.Address]})
.Serialize()
.Zip(requests, (response, request) => new {request = request, response = response})
.Subscribe(data => data.request.Respond(data.response, ObjectTypes.IdentityResponse));
我的问题是,由于Zip
函数之前的所有命令都需要时间来处理,Zip
如果有恒定的消息流,是否会对同一个输入对象(即原始输入以及单独处理的输入)进行操作。我该如何测试呢?
有没有更好的方法来做到这一点?