我正在创建一个 Twitter 客户端,我需要处理来自每个流式处理的推文(从 Symfony 的Process组件开始)。我有一个在后台运行的websocket 服务器,它运行良好。
我的问题是我不知道如何从这些进程中获取数据流并通过套接字传回客户端。为了继续检查读取输出,我需要为其创建一个 while 循环,但这会阻止其他用户启动他们的流,因为套接字服务器将停留在该循环中。
如果可能的话,我想避免使用数据库队列。
服务器,从终端启动:
class StreamServer extends \WebSocketServer
{
/**
*
* Called immediately when the data is recieved.
*
* @param $user
* @param $message
*/
protected function process($user, $message)
{
var_dump($user);
}
/**
* Called after the handshake response is sent to the client.
*
* @param $user
*/
protected function connected($user)
{
$streamer = new Process('php Streamer.php');
$streamer->setTimeout(null);
$streamer->start();
$this->send($user, 'tweets');
}
/**
* Called after the connection is closed.
*
* @param $user
*/
protected function closed($user)
{
$this->disconnect($user->socket);
}
}
$echo = new StreamServer("127.0.0.1", "9000");
try {
$echo->run();
} catch (Exception $e) {
$echo->stdout($e->getMessage());
}
流光:
class Streamer
{
public function run()
{
// $this->user = $user;
echo 'hello';
$config = [
'oauth' => [
'consumer_key' => '**',
'consumer_secret' => '**',
'token' => '**',
'token_secret' => '**',
],
];
$floodgate = Floodgate::create($config);
$handler = function ($message)
{
var_export($message);
};
$generator = function ()
{
return [];
};
$floodgate->user($handler, $generator);
}
}
$stream = new Streamer();
$stream->run();