1

我们有一个 Windows Phone 7 应用程序,它使用一组 3 种使用 Reactive Extensions 的服务方法,定义如下:

public static class ServiceClient
{
    public static IObservable<string> LookupImage(byte[] image) {...}

    public static IObservable<XDocument> GetDefinition(string id) {...}

    public static IObservable<Dictionary<string, byte[]>> GetFiles(string id, string[] fileNames) {...}                
}

我们需要 WP7 应用程序在上述客户端中不断调用LookupImage(每次使用不同的byte[] image数据集),直到返回IObservable<string>非空。获得Observable字符串后,我们必须调用GetDefinitionGetFiles方法(按此顺序)。

调用LookupImage应该在服务响应返回时发生,而不是由计时器控制,因为它会根据网络连接速度而变化,我们需要能够发送尽可能多的这些。

对于上述问题的解决方案,我将不胜感激。作为开始,我有以下内容

private void RunLookupAndRenderLogic()
{   
   byte[] imageBytes = GetImageBytes();

   // There are some cases where the image was not 'interesting' enough in which case GetImageBytes() returns null
   if (pictureBytes != null)
   {
      // Where we have image data, send this to LookupImage service method
      var markerLookup = ServiceClient.LookupImage(imageBytes);

      markerLookup.Subscribe(id =>      
        {
                       // If the id is empty, we need to call this again.
                       if (String.IsNullOrEmpty(id))
                       {
                            ???
                       }

                       // If we have an id, call GetDefinition and GetFiles methods of the service. No further calls to LookupImage should take place.
                       RenderLogic(id);   
     });
  }
  else
   // If no interesting image was returned, try again
   RunRecognitionAndRenderLogic();
}
4

1 回答 1

1

抱歉,如果我弄错了,但如果我理解正确,您想用完全相同的参数重试对 LookupImage 的调用,直到它返回一个值?

解决这个问题的一种天真的方法是简单地调用 repeat 然后 take(1):

ServiceClient.LookupImage(imageBytes)
    .Repeat()
    .Take(1)
    .Subscribe(id =>  ....);

然而,由于默认情况下 Rx 是单线程的,因此在此上下文中没有任何意义允许我们注入我们的处置调用(隐含来自 Take(1)-->OnComplete()-->订阅的自动处置)。

您可以通过使用 CurrentThread 调度程序在后续重新订阅之间提供一些喘息空间来避免这种情况。

Observable.Defer(()=>
    ServiceClient.LookupImage(imageBytes)
                 .ObserveOn(Scheduler.CurrentThread)
    )
    .Repeat()
    .Take(1)
    .Subscribe(id =>  ....);

通过对 Rx 有一定的了解和一些创造力,还有其他方法可以实现这一点。(大多数我会想象一个调度程序)

为了给您一些启发,请查看调度和线程一章。它涵盖了递归和构建您自己的迭代器,这实际上是您正在尝试做的事情。

完整代码示例:

private void RunLookupAndRenderLogic()
{   
    byte[] imageBytes = GetImageBytes();

    // There are some cases where the image was not 'interesting' enough in which case GetImageBytes() returns null
    if (pictureBytes != null)
    {
        // Where we have image data, send this to LookupImage service method
        var subscription = Observable
        .Defer(()=>
            ServiceClient.LookupImage(imageBytes)
                     .ObserveOn(Scheduler.CurrentThread)
        )
        .Where(id=>!String.IsNullOrEmpty(id))
        .Repeat()
        .Take(1)
        .Subscribe(id =>      
        {
           // If we have an id, call GetDefinition and GetFiles methods of the service. No further calls to LookupImage should take place.
           RenderLogic(id);   
        });

        //TODO: You dont offer any way to cancel this (dispose of the suscription). 
        //This means you could loop forever :-(
    }
    else
    {
        // If no interesting image was returned, try again
        RunRecognitionAndRenderLogic();
    }
}

(披露:我是 IntroToRx.com 的作者)

于 2012-11-02T15:31:03.730 回答