use Predis\Response\Status as StatusResponse;
class MbStreamConnection extends \Predis\Connection\StreamConnection
{
protected function write($buffer)
{
$socket = $this->getResource();
$buffer = iconv('utf-8', 'windows-1251', $buffer);
while (($length = mb_strlen($buffer, '8bit')) > 0)
{
$written = @fwrite($socket, $buffer, $length);
if ($length === $written) {
return;
}
if ($written === false) {
$this->onConnectionError('Error while writing bytes to the server');
}
$buffer = substr($buffer, $written);
}
return;
}
/**
* {@inheritdoc}
*/
public function read()
{
$socket = $this->getResource();
$chunk = fgets($socket);
$chunk = iconv('windows-1251', 'utf-8', $chunk);
if ($chunk === false || $chunk === '') {
$this->onConnectionError('Error while reading line from the server.');
}
$prefix = $chunk[0];
$payload = substr($chunk, 1, -2);
switch ($prefix) {
case '+':
return StatusResponse::get($payload);
case '$':
$size = (int) $payload;
if ($size === -1) {
return;
}
$bulkData = '';
$bytesLeft = ($size += 2);
do {
$chunk = fread($socket, min($bytesLeft, 4096));
if ($chunk === false || $chunk === '') {
$this->onConnectionError('Error while reading bytes from the server.');
}
$bulkData .= $chunk;
$bytesLeft = $size - mb_strlen($bulkData, '8bit');
} while ($bytesLeft > 0);
$tmp = mb_substr($bulkData, 0, -2);
$tmp = iconv('windows-1251', 'utf-8', $tmp);
return $tmp;
case '*':
$count = (int) $payload;
if ($count === -1) {
return;
}
$multibulk = array();
for ($i = 0; $i < $count; ++$i) {
$multibulk[$i] = $this->read();
}
return $multibulk;
case ':':
$integer = (int) $payload;
return $integer == $payload ? $integer : $payload;
case '-':
return new ErrorResponse($payload);
default:
$this->onProtocolError("Unknown response prefix: '$prefix'.");
return;
}
}
}
在连接参数中使用 MbStreamConnection
$client = new \Predis\Client('tcp://localhost:6379', [
'scheme' => 'tcp',
'host' => 'localhost',
'port' => 6379,
'connections' => [
'tcp' => 'MbStreamConnection'
],
'parameters' => [
'password' => '',
]
]);