13

我正在尝试了解反应式扩展 (Rx) 的正确用例。不断出现的示例是 UI 事件(拖放、绘图),以及 Rx 适用于异步应用程序/操作(如 Web 服务调用)的建议。

我正在开发一个需要为 REST 服务编写小型客户端 API 的应用程序。我需要调用四个 REST 端点,其中三个是为了获取一些参考数据(机场、航空公司和状态),第四个是主要服务,可为您提供给定机场的航班时间。

我创建了公开三个参考数据服务的类,方法如下所示:

public Observable<IEnumerable<Airport>> GetAirports()
public Observable<IEnumerable<Airline>> GetAirlines()
public Observable<IEnumerable<Status>> GetStatuses()
public Observable<IEnumerable<Flights>> GetFlights(string airport)

在我的 GetFlights 方法中,我希望每个航班都保存一个参考,它是从机场出发的,以及运营该航班的航空公司。为此,我需要 GetAirports 和 GetAirlines 提供的数据。每个机场、航空公司和状态都将被添加到一个字典(即字典)中,以便我可以在解析每个航班时轻松设置参考。

flight.Airport = _airports[flightNode.Attribute("airport").Value]
flight.Airline = _airlines[flightNode.Attribute("airline").Value]
flight.Status = _statuses[flightNode.Attribute("status").Value]

我当前的实现现在看起来像这样:

public IObservable<IEnumerable<Flight>> GetFlightsFrom(Airport fromAirport)
{
    var airports = new AirportNamesService().GetAirports();
    var airlines = new AirlineNamesService().GetAirlines();
    var statuses = new StatusService().GetStautses();


    var referenceData = airports
        .ForkJoin(airlines, (allAirports, allAirlines) =>
                            {
                                Airports.AddRange(allAirports);
                                Airlines.AddRange(allAirlines);
                                return new Unit();
                            })
        .ForkJoin(statuses, (nothing, allStatuses) =>
                            {
                                Statuses.AddRange(allStatuses);
                                return new Unit();
                            });

    string url = string.Format(_serviceUrl, 1, 7, fromAirport.Code);

    var flights = from data in referenceData
                    from flight in GetFlightsFrom(url)
                    select flight;

    return flights;
}

private IObservable<IEnumerable<Flight>> GetFlightsFrom(string url)
{
    return WebRequestFactory.GetData(new Uri(url), ParseFlightsXml);
}

当前的实现基于 Sergey 的回答,并使用 ForkJoin 来确保顺序执行,并且我引用的数据在航班之前被加载。这个实现比我之前的实现必须触发“ReferenceDataLoaded”事件更优雅。

4

2 回答 2

2

我认为,如果您从每个 REST 调用中接收到实体列表,那么您的调用应该有一点不同的签名——您不是在观察返回集合中的每个值,而是在观察调用完成的事件。所以对于机场,它应该有签名:

public IObservable<Aiports> GetAirports()

下一步是并行运行前三个并等待所有这些:

var ports_lines_statuses = 
    Observable.ForkJoin(GetAirports(), GetAirlines(), GetStatuses());

第三步是用 GetFlights() 组合上面的 abservable:

var decoratedFlights = 
  from pls in ports_lines_statuses
  let airport = MyAirportFunc(pls)
  from flight in GetFlights(airport)
  select flight;

编辑:我仍然不明白为什么您的服务返回

IObservable<Airport> 

代替

IObservable<IEnumerable<Airport>>

AFAIK,从 REST 调用中,您可以一次获得所有实体 - 但也许您进行分页?无论如何,如果你想让 RX 做缓冲,你可以使用 .BufferWithCount() :

    var allAirports = new AirportNamesService()
        .GetAirports().BufferWithCount(int.MaxValue); 
...

然后就可以申请 ForkJoin:

var ports_lines_statuses =  
    allAirports
        .ForkJoin(allAirlines, PortsLinesSelector)
        .ForkJoin(statuses, ...

ports_lines_statuses 将包含时间线上的单个事件,该事件将包含所有参考数据。

编辑:这是另一个,使用新创建的 ListObservable (仅限最新版本):

allAiports = airports.Start(); 
allAirlines = airlines.Start();
allStatuses = statuses.Start();

...
whenReferenceDataLoaded =
  Observable.Join(airports.WhenCompleted()
                 .And(airlines.WhenCompleted())
                 .And(statuses.WhenCompleted())
                 Then((p, l, s) => new Unit())); 



    public static IObservable<Unit> WhenCompleted<T>(this IObservable<T> source)
    {
        return source
            .Materialize()
            .Where(n => n.Kind == NotificationKind.OnCompleted)
            .Select(_ => new Unit());
    }
于 2010-05-14T18:03:38.383 回答
0

这里的用例是基于拉的 - IEnumerable 很好。如果你想说,通知新航班的到来,然后在 Observable.Generate 中包装一个基于拉取的 REST 调用可能会有一些价值。

于 2010-05-15T16:38:47.250 回答