1

我有一个从 RESTful API 检索数据的客户端。

我的要求是我希望 API 以指定的顺序排列。如果无法达到首选的,则应使用下一个。

其原因是某种负载平衡(API 返回一个生成的链接,该链接指向应用程序中进一步使用的相同服务器)。

因此,最初我以所需的顺序生成 APIS 的 URL。例如:

  1. http://first-api.example.com/method?param=value

  2. http://first-api.example.com/method?param=value

  3. http://first-api.example.com/method?param=value

现在我可以遍历这些 url 并从第一个响应成功的 API 返回结果。然而,这需要太长时间。API 可能需要大量时间来响应(最多几秒钟)。如果加起来,整个过程可能会超时。

因此,我想等待一段可接受的时间(如果一切顺利,API 通常会成功响应)。如果到了那个时间,我想请求第二个 API,但也要等待第一个,以防它需要更长的时间来响应。

一段时间后,第三个 API 也被添加到池中

我不想从一开始就立即请求每个 API,因为每个请求都意味着我希望尽可能避免的大量工作负载。

哪个 API 最先响应,获得工作。

所以我创建了一个简单的类,使用 curl_multi_exec 完成上述操作。

它通过了 2 种方法。一种是一次提供一个 API url,一种是评估响应是否成功,另一种是启动该过程并返回第一个成功响应的公共方法。

包括测试类的类如下所示:

<?
class MultiGetProxy
{
    private $url_src = null;
    private $response_evaluator = null;
    private $total_start = null;
    private $handles = array();
    private $mh = null;
    public function setUrlSource($callback)
    {
        $this->url_src = $callback;
        return $this;
    }
    public function setResponseEvaulator($callback)
    {
        $this->response_evaluator = $callback;
        return $this;
    }
    private function addNewHandle()
    {
        echo "adding new handle ... \n";
        $ch = curl_init();

        curl_setopt($ch, CURLOPT_URL, call_user_func($this->url_src));
        ;
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);

        $this->handles[] = $ch;
        curl_multi_add_handle($this->mh, $this->handles[count($this->handles) - 1]);
    }
    private function getRunning()
    {
        curl_multi_exec($this->mh, $running);
        return $running;

    }
    private function getResponse()
    {
        foreach ($this->handles as &$handle) {
            if ($content = curl_multi_getcontent($handle)) {
                echo "one handle delivered content...\n";
                if (!call_user_func($this->response_evaluator, $content)) {
                    echo "evluater said it was an error...\n";
                    curl_multi_remove_handle($this->mh, $handle);
                    if (!$this->getRunning())
                        $this->addNewHandle();
                    return false;
                }
                echo "content looks fine\n";
                var_dump($content);
                return $content;

            }
        }
    }
    public function exec()
    {
        $this->total_start = microtime(true);
        $this->mh          = curl_multi_init();
        $response          = false;
        $lastadd           = 0;
        while (!$response) {
            if (microtime(true) > $lastadd + 5.0) {
                $this->addNewHandle();
                $lastadd = microtime(true);
                curl_multi_exec($this->mh, $running);
            }
            usleep(100000);
            curl_multi_exec($this->mh, $running);
            $response = $this->getResponse();
        }
        echo "do loop done here is some info:\n";
        var_dump(curl_multi_info_read($mh));
        // Handles schliessen
        foreach ($handles as &$handle) {
            curl_close($handle);
            curl_multi_remove_handle($mh, $handle);
        }
        curl_multi_close($mh);
        $total_run = microtime(true) - $total_start;
        echo "completed in: $total_run \n";
        echo "response is:\n";
        return $response;

    }
    public function getInfo()
    {
        $info               = array();
        $info["total_time"] = $this->total_run;
        $info["start"]      = $this->total_start;

    }

}


class Test
{
    function isvalid($data)
    {
        $data = json_decode($data);
        if ($data->status > 500) { // error
            return false;
        }
        return true;
    }
    function getUrl()
    {
        static $urls = array();
        if (empty($urls)) {
            $urls[] = 'http://test.example.com/sleep.php?s=8&status=200&message=first';
            $urls[] = 'http://test.example.com/sleep.php?s=2&status=503&message=2nd';
            $urls[] = 'http://test.example.com/sleep.php?s=2&status=200&message=3rd';
            $urls[] = 'http://test.example.com/sleep.php?s=21';
            $urls[] = 'http://test.example.com/sleep.php?s=22';
            $urls[] = 'http://test.example.com/sleep.php?s=23';
            $urls[] = 'http://test.example.como/sleep.php?s=25';
            $urls[] = 'http://test.example.com/sleep.php?s=25';
        }

        $url = array_shift($urls);
        echo "returning $url \n";
        return $url;



    }
    public function test()
    {
        $mgp = new MultiGetProxy();
        $mgp->setUrlSource(array(
            $this,
            "getUrl"
        ));
        $mgp->setResponseEvaulator(array(
            $this,
            "isvalid"
        ));
        $result = $mgp->exec();
        echo $result;
    }
}


$test = new Test();
$test->test();

现在我的实际问题更像是“征求意见”,因为我从未真正处理过此类事情。

它应该部署在高负载环境中,会有很多请求。

睡眠时间也对吗?

我有可能遇到 tcp 连接问题吗?

对不起,我不能更具体。我只是试图为一个非常具体和个别的问题找到一个解决方案,这就是我想出的。

4

1 回答 1

1

构建位于 API 和代码之间的外观。外观决定使用哪个 API。外观具有代表 API 服务器的 HTTP 对象。您的代码只是向外观发送和接收数据,并不关心它正在使用哪个服务器。

然后努力提高立面的性能。使用在您预期的时间后返回数据的模拟对象对其进行测试。

interface Proxy {
  function exec();
}

abstract class AbstractProxy {

  public $response;

  public function hasResponse() {
    return !empty($this->response);
  }

}

class SingleProxy extends AbstractProxy implements Proxy {

  // the real code for connecting to the API

}

class MultiProxy extends AbstractProxy implements Proxy {

  public $singleProxies = array();
  public $delayTime = 1000000;

  public function addProxy(Proxy $proxy) {
    $this->singleProxies[] = $proxy;
  }

  public function exec() {
     foreach($singleProxies as $proxy) {
        $proxy->exec();
        usleep($this->delayTime);
        if($proxy->hasResponse() {
          return $proxy;
        } else {
          $proxy->cancel();
        }
     }
  }

}

class SingleProxyStub extends AbstractProxy implements Proxy {

  public $timeToRespond = 0;

  public function exec() {
    usleep($this->timeToRespond);
    $this->response = 'response';
  }

}

class ProxyTest() {

   public function runTest() {

     $slowProxy = new SingleProxyStub;
     $slowProxy->timeToResponse = 2000000; // 2 seconds

     $fastProxy = new SingleProxyStub;
     $fastProxy->timeToResponse = 500000; // 0.5 seconds

     $MultiProxy = new MultiProxy;
     $MultiProxy->singleProxies = array($slowProxy, $fastProxy);

     $startTime = microtime(true);
     $MultiProxy->exec();
     $endTime = microtime(false);
     log($startTime, $endTime);

   }

}
于 2012-05-20T11:48:43.410 回答