Symfony 信使:
https://symfony.com/doc/current/messenger.html
问题:
Pool#1 =(user1
创建一个Job
,Job
拆分为 10 个信使Message
)
Pool#2 =(user2
创建一个Job
,Job
拆分为 10 个信使Message
)
...
Pool#100 =(user100
创建一个Job
,Job
拆分为 10 个信使Message
)
Pool#100 将不会被执行,直到所有先前的 Pool 都不会完成。
目标:
我需要并行队列,所有池都将单独运行,因此每个池都有个人队列。
代码示例:
配置/包/messenger.yamlframework:
messenger:
transports:
sync: "%env(MESSENGER_TRANSPORT_DSN)%"
routing:
'App\Message\Job': sync
src/消息/Job.php
<?php
namespace App\Message;
class Job
{
private $content;
public function __construct(string $content)
{
$this->content = $content;
}
public function getContent(): string
{
return $this->content;
}
}
src/MessageHandler/JobHandler.php
<?php
namespace App\MessageHandler;
use App\Message\Job;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
class JobHandler implements MessageHandlerInterface
{
public function __construct()
{}
public function __invoke(Job $message)
{
$params = json_decode($message->getContent(), true);
dump($params);
}
}
src/Controller/JobController.php
<?php
namespace App\Controller;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\JsonResponse;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Routing\Annotation\Route;
/**
* @Route("/job")
*/
class JobController extends AbstractController
{
/**
* @Route("/create", name="app_job_create")
* @param Request $request
* @param MessageBusInterface $bus
* @return JsonResponse
*/
public function create(Request $request, MessageBusInterface $bus): JsonResponse
{
// ...
$entityId = $entity->getId();
// ...
for ($i = 0; $i < 10; $i++) {
$params['entityId'] = $entityId;
$params['counter'] = $i;
$bus->dispatch(new Job(json_encode($params)));
}
return new JsonResponse([]);
}
}
更多信息:
我想继续使用它,但找不到最简单的解决方案来传递一些唯一的队列名称或 ID,然后告诉工人他必须只处理这个 Pool of Messages
.
我找到了自定义传输https://symfony.com/doc/current/messenger/custom-transport.html,但我不确定它是否有帮助。至少我认为只有定制运输是不够的。
我阅读了有关Actor models
https://www.brianstorti.com/the-actor-model/的信息,但如果可能的话,我只想使用 Messenger+Redis。
可能这里没有解决方案,这个信使还不能处理并行队列。无论如何,我很高兴得到任何帮助。
谢谢!