5

我正在使用带有 Symfony 4 messenger 组件的工作人员。

这个工人是

  • 接收消息(来自rabbitMQ)
  • 启动 ffmpeg
  • 对视频进行治疗
  • 并将某些内容保存在数据库中。

为了在 Symfony 上配置这个 worker,我已经完成了这个(中间件很重要):

// config/packages/framework.yaml
framework:
    messenger:
        buses:
            command_bus:
                middleware:
                    # each time a message is handled, the Doctrine connection
                    # is "pinged" and reconnected if it's closed. Useful
                    # if your workers run for a long time and the database
                    # connection is sometimes lost
                    - doctrine_ping_connection

                    # After handling, the Doctrine connection is closed,
                    # which can free up database connections in a worker,
                    # instead of keeping them open forever
                    - doctrine_close_connection

        transports:
            ffmpeg:
              dsn: '%env(CLOUDAMQP_URL)%'
              options:
                auto_setup: false
                exchange:
                    name: amq.topic
                    type: topic
                queues:
                  ffmpeg: ~

        routing:
            # Route your messages to the transports, for now all are AMQP messages
            'App\Api\Message\AMQPvideoFFMPEG': ffmpeg
        ## Handle multiple buses ? https://symfony.com/doc/current/messenger/multiple_buses.html
        ## When queries and command should be distinguished

然后为了了解可能导致此问题的原因,我尝试调试信使以查看中间件是否配置正确

root@b9eec429cb54:/var/www/html# php bin/console debug:messenger

Messenger
=========

command_bus
-----------

 The following messages can be dispatched:

 ------------------------------------------------------ 
  App\Api\Message\AMQPvideoFFMPEG                       
      handled by App\Api\Message\Handler\FFMPEGHandler  
 ------------------------------------------------------ 

一切似乎都很好,对吧?

那么这怎么可能看到这个:

[2019-08-23 10:25:26] messenger.ERROR:重试 App\Api\Message\AMQPvideoFFMPEG - 重试 #1。{"message":"[object] (App\Api\Message\AMQPvideoFFMPEG: {})","class":"App\Api\Message\AMQPvideoFFMPEG","re​​tryCount":1,"error":"[object ](Doctrine\DBAL\Exception\ConnectionException(代码:0):驱动程序中发生异常:SQLSTATE [HY000] [2002] 连接超时在 /var/www/html/vendor/doctrine/dbal/lib/Doctrine/DBAL /Driver/AbstractMySQLDriver.php:93, Doctrine\DBAL\Driver\PDOException(code: 2002): SQLSTATE[HY000] [2002] Connection timed out at /var/www/html/vendor/doctrine/dbal/lib/Doctrine/ DBAL/Driver/PDOConnection.php:31, PDOException(code: 2002): SQLSTATE[HY000] [2002] Connection timed out at /var/www/html/vendor/doctrine/dbal/lib/Doctrine/DBAL/Driver/PDOConnection .php:27)"} []

我完全迷路了,我错过了什么吗?

这有时会发生,但它在大多数情况下都有效,我想当我的工作人员失去与数据库的连接时会发生这个错误,特别是如果 ffmpeg 处理持续 7 分钟或更长时间,但这应该通过 ping 和关闭连接的中间件来避免。所以我不清楚这里有什么问题。

4

2 回答 2

2

在阅读了我的中间件的代码之后,尤其是这个块

https://github.com/symfony/symfony/blob/4.4/src/Symfony/Bridge/Doctrine/Messenger/DoctrinePingConnectionMiddleware.php

class DoctrinePingConnectionMiddleware extends AbstractDoctrineMiddleware
{
    protected function handleForManager(EntityManagerInterface $entityManager, Envelope $envelope, StackInterface $stack): Envelope
    {
        $connection = $entityManager->getConnection();
        if (!$connection->ping()) {
            $connection->close();
            $connection->connect();
        }
        if (!$entityManager->isOpen()) {
            $this->managerRegistry->resetManager($this->entityManagerName);
        }
        return $stack->next()->handle($envelope, $stack);
    }
}

我们可以看到我的处理程序在连接打开后立即被调用。我认为这种行为应该有效,但 FFMPEG 可以在很长一段时间内使用相同的 RabbitMQ 消息。因此,我的处理程序的最后一步将向数据库中插入一些内容可能会提供一个 mySQL has gone away 错误,或者连接超时。

这就是为什么,我把这个片段放到一个方法中,而不用调用处理程序,只调用与学说连接相关的代码,然后我在任何插入到我的数据库之前调用它,如下所示:

public function __invoke(AMQPvideoFFMPEG $message)
    {
        // reset connection if not found
        $this->processService->testConnection();
        $process = $this->processService->find($message->getProcess());
        $this->renderServcie->updateQueue($process->getQueue(), "processing");

// some other stuff
}

testConnection() 方法在哪里

 /**
     * Reconnect if connection is aborted for some reason
     */
    public function testConnection()
    {
        $connection = $this->entityManager->getConnection();
        if (!$connection->ping()) {
            $connection->close();
            $connection->connect();
        }

    }

但在那之后我尝试了另一个问题

不支持重置非惰性管理器服务。将“doctrine.orm.default_entity_manager”服务设置为惰性,并在您的 composer.json 文件中要求“symfony/proxy-manager-bridge”。

安装“symfony/proxy-manager-bridge”后,错误消失了。

到目前为止,没有遇到连接超时。等着瞧。

于 2019-08-29T09:58:21.667 回答
1

在任何插入操作之前断开连接:

public function handle(…)
{
    // your time-consuming business logic

    // disconnect if needed
    if (!$this->entityManager->getConnection()->ping()) {
        $this->entityManager->getConnection()->close();
    }

    // save your work
    $this->entityManager->flush();
}
于 2019-08-29T10:06:23.113 回答