0

我正在使用 symfony2 和 FOSElasticaBundle。

我的 elasticsearch 服务经常因为未知原因而被终止或失败。我已将 systemctlrestart always作为临时修复程序。

尽管如此,如果关闭,当学说更新实体时执行索引更新的弹性搜索侦听器会给我一个错误 52:

无法连接到主机,Elasticsearch 关闭?

因此,如果还使用更新最后用户连接日期的 FOSUserBundle ,则会在记录时发生这种情况。对弹性搜索有如此依赖真是太烦人了。我已经为此错误设置了一个异常侦听器,但我希望捆绑包在以后服务再次可用时保留更新。

查看捆绑文件,我发现:

供应商/friendsofsymfony/elastica-bundle/Persister/ObjectPersister.php

public function replaceMany(array $objects)
{
    $documents = array();
    foreach ($objects as $object) {
        $document = $this->transformToElasticaDocument($object);
        $document->setDocAsUpsert(true);
        $documents[] = $document;
    }

    try {
        $this->type->updateDocuments($documents);
    } catch (BulkException $e) {
        $this->log($e);
    }
}

这是一个服务,我跳进去可以按如下方式覆盖,但它是另一个继承的类,子类被实例化而不是作为服务调用,所以我不知道如何覆盖它。我怎么能 ?

    try {
        $this->type->updateDocuments($documents);
    } catch (\Exception $e) {
        if ($e instanceof BulkException)
        {
            $this->log($e);
        }
        elseif ($e->getMessage() != "Couldn't connect to host, Elasticsearch down?")
        {
            throw $e;
        }
    }

那么,如何确保下次服务可用时更新文档?

编辑:

我收到错误时的跟踪:

Stack Trace
in vendor/ruflin/elastica/lib/Elastica/Transport/Http.php at line 153   -
        }
        if ($errorNumber > 0) {
            throw new HttpException($errorNumber, $request, $response);
        }
        return $response;
at Http ->exec (object(Request), array('connection' => array('config' => array('headers' => array()), 'host' => 'localhost', 'port' => '9200', 'logger' => 'fos_elastica.logger', 'enabled' => true))) 
in vendor/ruflin/elastica/lib/Elastica/Request.php at line 167   + 
at Request ->send () 
in vendor/ruflin/elastica/lib/Elastica/Client.php at line 587   + 
at Client ->request ('_bulk', 'PUT', '{"update":{"_index":"foodmeup","_type":"user","_id":4}} {"doc":{"firstName":"Dominique","lastName":"Descamps","content":null,"username":"ddescamps","email":"ddescamps@ebp-paris.com","jobSeeker":{"skills":[],"experiences":[],"trainings":[]}},"doc_as_upsert":true} ', array()) 
in vendor/friendsofsymfony/elastica-bundle/Elastica/Client.php at line 47   + 
at Client ->request ('_bulk', 'PUT', '{"update":{"_index":"foodmeup","_type":"user","_id":4}} {"doc":{"firstName":"Dominique","lastName":"Descamps","content":null,"username":"ddescamps","email":"ddescamps@ebp-paris.com","jobSeeker":{"skills":[],"experiences":[],"trainings":[]}},"doc_as_upsert":true} ', array()) 
in vendor/ruflin/elastica/lib/Elastica/Bulk.php at line 342   + 
at Bulk ->send () 
in vendor/ruflin/elastica/lib/Elastica/Client.php at line 270   + 
at Client ->updateDocuments (array(object(Document))) 
in vendor/ruflin/elastica/lib/Elastica/Index.php at line 131   + 
at Index ->updateDocuments (array(object(Document))) 
in vendor/ruflin/elastica/lib/Elastica/Type.php at line 174   + 
at Type ->updateDocuments (array(object(Document))) 
in vendor/friendsofsymfony/elastica-bundle/Persister/ObjectPersister.php at line 144   + 
at ObjectPersister ->replaceMany (array(object(User))) 
in vendor/friendsofsymfony/elastica-bundle/Doctrine/Listener.php at line 151   + 
at Listener ->persistScheduled () 
in vendor/friendsofsymfony/elastica-bundle/Doctrine/Listener.php at line 182   + 
at Listener ->postFlush (object(PostFlushEventArgs)) 
in vendor/symfony/symfony/src/Symfony/Bridge/Doctrine/ContainerAwareEventManager.php at line 63   + 
at ContainerAwareEventManager ->dispatchEvent ('postFlush', object(PostFlushEventArgs)) 
in vendor/doctrine/orm/lib/Doctrine/ORM/UnitOfWork.php at line 3318   + 
at UnitOfWork ->dispatchPostFlushEvent () 
in vendor/doctrine/orm/lib/Doctrine/ORM/UnitOfWork.php at line 428   + 
at UnitOfWork ->commit (null) 
in vendor/doctrine/orm/lib/Doctrine/ORM/EntityManager.php at line 357   + 
at EntityManager ->flush (null) 
in src/AppBundle/Model/Classes/CustomBaseController.php at line 61   + 
at CustomBaseController ->flush () 
in src/AppBundle/Controller/Core/VoteController.php at line 68   + 
at VoteController ->voteAction (object(Request), 'up', 'Post', 'permettre-le-partage-de-documents-avec-les-equipes') 
at call_user_func_array (array(object(VoteController), 'voteAction'), array(object(Request), 'up', 'Post', 'permettre-le-partage-de-documents-avec-les-equipes')) 
in app/bootstrap.php.cache at line 3029   + 
at HttpKernel ->handleRaw (object(Request), '1') 
in app/bootstrap.php.cache at line 2991   + 
at HttpKernel ->handle (object(Request), '1', true) 
in app/bootstrap.php.cache at line 3140   + 
at ContainerAwareHttpKernel ->handle (object(Request), '1', true) 
in app/bootstrap.php.cache at line 2384   + 
at Kernel ->handle (object(Request)) 
in web/app_dev.php at line 36   + 
4

1 回答 1

1

Message queue perfectly fits your requirements. You send a message to MQ whenever your model is updated. That's it for web process. Then you have a pool of workers that consume messages from the MQ and are trying to update ES index. If the ES is down right now there will be an exception, the worker dies and the message returned to the queue. So the message is still in MQ, once the ES is online workers do their job.

The Same pattern could be used not only with ES but any other 3rd party service. For example, you want to send a very important email but mail server is down, you cannot wait now you have to send a response to the customer. So put it to the MQ and let a broker and workers do their job.

Here's a code of how it could be done using enqueue MQ library. Installation and configuration are pretty easy to do so I'll skip it.

The standard listener has to be replaced with one that sends messages:

<?php
use Enqueue\Client\ProducerInterface;

class ElasticaUpdateIndexListener
{
    private $producer;

    public function __construct(ProducerInterface $producer)
    {
        $this->producer = $producer;
    }

    public function postPersist(LifecycleEventArgs $eventArgs)
    {
        $entity = $eventArgs->getObject();

        $this->producer->sendCommand('elastica_index_entity', [
            'entity' => $entity->getId(),
            'type' => 'insert'
        ]);
    }

    public function postUpdate(LifecycleEventArgs $eventArgs)
    {
        $entity = $eventArgs->getObject();

        $this->producer->sendCommand('elastica_index_entity', [
            'entity' => $entity->getId(),
            'type' => 'update'
        ]);
    }

    public function preRemove(LifecycleEventArgs $eventArgs)
    {
        $entity = $eventArgs->getObject();

        $this->producer->sendCommand('elastica_index_entity', [
            'entity' => $entity->getId(),
            'type' => 'delete'
        ]);
    }
}

The processor for this messages looks like this:

<?php

class ElasticaUpdateIndexProcessor implements PsrProcessor, CommandSubscriberInterface
{
    private $doctrine;

    protected $objectPersister;

    protected $propertyAccessor;

    private $indexable;

    public function __construct(Registry $doctrine, ObjectPersisterInterface $objectPersister, IndexableInterface $indexable)
    {
        $this->indexable = $indexable;
        $this->objectPersister = $objectPersister;
        $this->propertyAccessor = PropertyAccess::createPropertyAccessor();
        $this->doctrine = $doctrine;
    }

    public function process(PsrMessage $message, PsrContext $context)
    {
        $data = JSON::encode($message->getBody());

        if ($data['type'] == 'delete') {
            $this->objectPersister->deleteManyByIdentifiers([$data['entityId']]);

            return self::ACK;
        } 

        if (false == $entity = $this->doctrine->getManagerForClass($data['entityClass'])->find($data['entityId'])) {
            return self::REJECT;
        }

        if (false == ($this->objectPersister->handlesObject($entity) && $this->isObjectIndexable($entity))) {
            return self::ACK;
        }

        if ($data['type'] == 'insert') {
            $this->objectPersister->insertMany([$this->scheduledForInsertion]);

            return self::ACK;
        }

        if ($data['type'] == 'update') {
            $this->objectPersister->replaceMany([$this->scheduledForInsertion]);

            return self::ACK;
        }

        return self::REJECT;
    }

    private function isObjectIndexable($object)
    {
        return $this->indexable->isObjectIndexable(
            $this->config['indexName'],
            $this->config['typeName'],
            $object
        );
    }

    public static function getSubscribedCommand()
    {
        return 'elastica_index_entity';
    }
}

And run some workers:

./bin/console enqueue:consume --setup-broker -vvv 
于 2017-06-27T09:07:47.963 回答