Дата обновления перевода 2021-07-05

Мессенджер: работа с синхронизированными сообщениями и сообщениями в очереди

Мессенджер предоставляет автобус сообщений с возможностью отправки сообщений, а затем работы с ними сразу же в вашем приложении, или их отправки через транспорт (например, очередь) для их обработки позже. Чтобы узнать об этом больше, прочтите документацию компонента Мессенджер.

Установка

В приложениях, использующих Symfony Flex, выполните эту команду, чтобы установить мессенджер:

1
$ composer require symfony/messenger

Создание сообщения и обработчика

Мессенджер строится на двух разных классах, которые вы создадите: (1) классе сообщения, который содержит данные, и (2) классе обработчика(ов), который будет вызван после запуска сообщения. Класс обработчика будет читать класс сообщения и выполнять некоторое действие.

Для класса сообщения нет особых требований, кроме того, что его можно сериализовать:

// src/Message/SmsNotification.php
namespace App\Message;

class SmsNotification
{
    private $content;

    public function __construct(string $content)
    {
        $this->content = $content;
    }

    public function getContent(): string
    {
        return $this->content;
    }
}

Обработчик сообщений - это PHP-вызываемое, рекомендованный способ его создания - создать класс, реализующий MessageHandlerInterface и имеющий метод __invoke(), который имеет подсказки класса сообщения (или интерфейс сообщения):

// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;

use App\Message\SmsNotification;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class SmsNotificationHandler implements MessageHandlerInterface
{
    public function __invoke(SmsNotification $message)
    {
        // ... сделайте что-то - вроде отправки SMS!
    }
}

Благодаря автоконфигурации и подсказке SmsNotification, Symfony значет, что этот отбработчик должен быть вызван, когда запускается сообщение SmsNotification. В большинстве случаев, это все, что вам нужно будет сделать. Но вы можете также сконфигурировать обработчики сообщений вручную. Чтобы увидеть всех сконфигурированных обработчиков, выполните:

1
$ php bin/console debug:messenger

Запуск сообщения

Вы готовы! Для запуска сообщения (и вызова обработчика), внедрите сервис messenger.default_bus (через MessageBusInterface), например, в контроллер:

// src/Controller/DefaultController.php
namespace App\Controller;

use App\Message\SmsNotification;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\Messenger\MessageBusInterface;

class DefaultController extends AbstractController
{
    public function index(MessageBusInterface $bus)
    {
        // приведет к вызову SmsNotificationHandler
        $bus->dispatch(new SmsNotification('Look! I created a message!'));

        // или используйте это сокращение
        $this->dispatchMessage(new SmsNotification('Look! I created a message!'));

        // ...
    }
}

Транспорт: асинхронные сообщения/сообщения в очереди

По умолчанию, сообщения обрабатываются сразу же после запуска. Если вы хотите обработать сообщение асинхронно, вы можете сконфигурировать транспорт. Транспорт может отправлять сообщения (например, в систему очереди) и затем получать из через работника. Мессенджер поддерживает несколько трансортов.

Note

Если вы хотите использовать транспорт, который не поддерживается, посмотрите на транспорт Enqueue, который поддерживает штуки вроде Kafka и Google Pub/Sub.

Транспорт регистрируется с использованием “DSN”. Благодаря рецепту Flex для Мессенджера, ваш файл .env уже имеет несколько примеров.

1
2
3
# MESSENGER_TRANSPORT_DSN=amqp://guest:[email protected]:5672/%2f/messages
# MESSENGER_TRANSPORT_DSN=doctrine://default
# MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages

Раскомментируйте тот транспорт, который вы хотите (или установите его в .env.local). См. Transport Configuration, чтобы узнать больше деталей.

Далее, в config/packages/messenger.yaml, давайте определим транспорт под название async, использующий эту конфигурацию:

  • YAML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    # config/packages/messenger.yaml
    framework:
        messenger:
            transports:
                async: "%env(MESSENGER_TRANSPORT_DSN)%"
    
                # или расширено для конфигурации большего количества опций
                #async:
                #    dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
                #    options: []
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <framework:messenger>
                <framework:transport name="async">%env(MESSENGER_TRANSPORT_DSN)%</framework:transport>
    
                <!-- или расширено для конфигурации большего количества опций -->
                <framework:transport name="async"
                    dsn="%env(MESSENGER_TRANSPORT_DSN)%"
                >
                    <option key="...">...</option>
                </framework:transport>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $framework->messenger()
            ->transport('async')
                ->dsn('%env(MESSENGER_TRANSPORT_DSN)%')
        ;
    
        $framework->messenger()
            ->transport('async')
                ->dsn('%env(MESSENGER_TRANSPORT_DSN)%')
                ->options([])
        ;
    };
    

Маршрутизация сообщений к транспорту

Теперь, когда у вас есть сконфигурированный транспорт, вместо немедленно обработки сообщений, вы можете сконфигурировать их так, чтобы они были отправлены транспорту:

  • YAML
    1
    2
    3
    4
    5
    6
    7
    8
    9
    # config/packages/messenger.yaml
    framework:
        messenger:
            transports:
                async: "%env(MESSENGER_TRANSPORT_DSN)%"
    
            routing:
                # async - это то имя, которое вы дали вашему транспорту выше
                'App\Message\SmsNotification': async
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <framework:messenger>
                <framework:routing message-class="App\Message\SmsNotification">
                    <!-- async - это то имя, которое вы дали вашему транспорту выше -->
                    <framework:sender service="async"/>
                </framework:routing>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
    1
    2
    3
    4
    5
    6
    7
    8
    9
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $framework->messenger()
            // async - это то имя, которое вы дали вашему транспорту выше
            ->routing('App\Message\SmsNotification')->senders(['async'])
        ;
    };
    

Благодаря этому, App\Message\SmsNotification будет отправлен транспорту async, и его обработчик(и) не будут вызваны сразу же. Все сообщения, не совпавшие с routing будут обработаны немедленно.

Вы также можете маршрутизировать классы по их родительскому классу или интерфейсу. Или отправлять сообщения нескольким транспортам:

  • YAML
    1
    2
    3
    4
    5
    6
    7
    8
    9
    # config/packages/messenger.yaml
    framework:
        messenger:
            routing:
                # маршрутизируйте все сообщения, расширяющие этот пример базового класса или интерфейс
                'App\Message\AbstractAsyncMessage': async
                'App\Message\AsyncMessageInterface': async
    
                'My\Message\ToBeSentToTwoSenders': [async, audit]
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <framework:messenger>
                <!-- маршрутизируйте все сообщения, расширяющие этот пример базового класса или интерфейс -->
                <framework:routing message-class="App\Message\AbstractAsyncMessage">
                    <framework:sender service="async"/>
                </framework:routing>
                <framework:routing message-class="App\Message\AsyncMessageInterface">
                    <framework:sender service="async"/>
                </framework:routing>
                <framework:routing message-class="My\Message\ToBeSentToTwoSenders">
                    <framework:sender service="async"/>
                    <framework:sender service="audit"/>
                </framework:routing>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $messenger = $framework->messenger();
        // маршрутизируйте все сообщения, расширяющие этот пример базового класса или интерфейс
        $messenger->routing('App\Message\AbstractAsyncMessage')->senders(['async']);
        $messenger->routing('App\Message\AsyncMessageInterface')->senders(['async']);
        $messenger->routing('My\Message\ToBeSentToTwoSenders')->senders(['async', 'audit']);
    };
    

Note

Если вы сконфигурируете машрутизацию и для дочернего, и для родительского класса, используются оба правила. Например, если у вас есть объект SmsNotification, расширяющийся из Notification, будут использованы маршрутизации и для Notification, и для SmsNotification.

Сущности Doctrine в сообщениях

Если вам нужно передать сущность в Doctrine в сообщении, лучше всего передать основной ключ сущности (или любую релевантную информацию, необходимую обработчику, вроде email, и др.) вместо объекта:

// src/Message/NewUserWelcomeEmail.php
namespace App\Message;

class NewUserWelcomeEmail
{
    private $userId;

    public function __construct(int $userId)
    {
        $this->userId = $userId;
    }

    public function getUserId(): int
    {
        return $this->userId;
    }
}

Затем, в вашем обработчике, вы можете сделать запрос свежего объекта:

// src/MessageHandler/NewUserWelcomeEmailHandler.php
namespace App\MessageHandler;

use App\Message\NewUserWelcomeEmail;
use App\Repository\UserRepository;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class NewUserWelcomeEmailHandler implements MessageHandlerInterface
{
    private $userRepository;

    public function __construct(UserRepository $userRepository)
    {
        $this->userRepository = $userRepository;
    }

    public function __invoke(NewUserWelcomeEmail $welcomeEmail)
    {
        $user = $this->userRepository->find($welcomeEmail->getUserId());

        // ... отправить электронное письмо!
    }
}

Это гарантирует, что сущность содержит свежие данные.

Сихнронная обработка сообщений

Если сообщение не совпадает ни с одним правилом маршрутизации, оно не будет отправлено ни одному транспорту, и будет обработано немедленно. В некоторых случаях (например, при `связывании обработчиков с разными транспортами`_), легче и более гибко обработать их ясно: создав транспорт sync и “отправив” сообщения в него для немедленной обработки:

  • YAML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    # config/packages/messenger.yaml
    framework:
        messenger:
            transports:
                # ... другие транспорты
    
                sync: 'sync://'
    
            routing:
                App\Message\SmsNotification: sync
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <framework:messenger>
                <!-- ... другие транспорты -->
    
                <framework:transport name="sync" dsn="sync://"/>
    
                <framework:routing message-class="App\Message\SmsNotification">
                    <framework:sender service="sync"/>
                </framework:routing>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $messenger = $framework->messenger();
    
        // ... другие транспорты
    
        $messenger->transport('sync')->dsn('sync://');
        $messenger->routing('App\Message\SmsNotification')->senders(['sync']);
    };
    

Создание вашего собственного транспорта

Вы также можете создать собственный транспорт, если вам нужно отправлять или получать сообщения из чего-то, что не поддерживается. См. How to Create Your own Messenger Transport.

Потребление сообщений (запуск работника)

Когда ваши сообщения были маршрутизированы, в большинстве случаев, вам нужно будет “потребить” их. Вы можете сделать это с помощью команды messenger:consume:

1
2
3
4
$ php bin/console messenger:consume async

# используйте -vv, чтобы увидеть детали того, что происходит
$ php bin/console messenger:consume async -vv

Первый аргумент - это имя получателя (или id сервиса, если вы маршрутизировали к пользовательскому сервису). По умолчанию, команда будет выполняться бесконечно: искать новые сообщения в вашем транспорте и обрабатывать их. Эта команда называется вашим “работником”.

Запуск в производство

В производстве, есть несколько важных вещей, о которых стоит подумать:

Используйте Супервизора, чтобы ваш(и) работник(и) работали
Вы хотите, чтобы один или более “работников” работали все время. Чтобы сделать это, используйте систему контроля процесса вроде Супервизора.
Не позволяйте работникам работать бесконечно
Некоторые сервисы (вроде EntityManager Doctrine) будут потреблять все больше памяти со временем. Поэтому, вместо того, чтобы позволять вашему работнику работать всегда, используйте флажок вроде messenger:consume --limit=10, чтобы указать работнику, что он должен обработать только 10 сообщений до прекращения работы (затем Супервизор создаст новый процесс). Также есть другие опции вроде --memory-limit=128M и --time-limit=3600.
Перезагружайте работников при запуске
Каждый раз при запуске вам нужно будет перезагрузить все процессы ваших работников, чтобы они видели новозапущенный код. Чтобы сделать это, выполните messenger:stop-workers при запуске. Это сигнализирует каждому работнику, что он должен закончить обработку текущего сообщения и грациозно завершить работу. Затем, Супервизор создаст новые процессы работников. Команда использует кеш app внутренне - поэтому убедитесь в том, что он сконфигурирован для использования адаптера по вашему вкусу.

Приоритизированный транспорт

Иногда определенные типы сообщений должны иметь более высокий приоритет и быть обработаны до других. Чтобы сделать это возможным, вы можете создать несколько транспортов и маршрутизировать разные сообщения к ним. Например:

  • YAML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    # config/packages/messenger.yaml
    framework:
        messenger:
            transports:
                async_priority_high:
                    dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                    options:
                        # queue_name соответствует транспорту doctrine
                        queue_name: high
    
                        # для AMQP отправьте отдельный обмен, а затем поставьте в очередь
                        #exchange:
                        #    name: high
                        #queues:
                        #    messages_high: ~
                        # or redis try "group"
                async_priority_low:
                    dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                    options:
                        queue_name: low
    
            routing:
                'App\Message\SmsNotification':  async_priority_low
                'App\Message\NewUserWelcomeEmail':  async_priority_high
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <framework:messenger>
                <framework:transport name="async_priority_high" dsn="%env(MESSENGER_TRANSPORT_DSN)%">
                    <framework:options>
                        <framework:queue>
                            <framework:name>Queue</framework:name>
                        </framework:queue>
                    </framework:options>
                </framework:transport>
                <framework:transport name="async_priority_low" dsn="%env(MESSENGER_TRANSPORT_DSN)%">
                    <option key="queue_name">low</option>
                </framework:transport>
    
                <framework:routing message-class="App\Message\SmsNotification">
                    <framework:sender service="async_priority_low"/>
                </framework:routing>
                <framework:routing message-class="App\Message\NewUserWelcomeEmail">
                    <framework:sender service="async_priority_high"/>
                </framework:routing>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $messenger = $framework->messenger();
    
        $messenger->transport('async_priority_high')
            ->dsn('%env(MESSENGER_TRANSPORT_DSN)%')
            ->options(['queue_name' => 'high']);
    
        $messenger->transport('async_priority_low')
            ->dsn('%env(MESSENGER_TRANSPORT_DSN)%')
            ->options(['queue_name' => 'low']);
    
        $messenger->routing('App\Message\SmsNotification')->senders(['async_priority_low']);
        $messenger->routing('App\Message\NewUserWelcomeEmail')->senders(['async_priority_high']);
    };
    

Затем вы можете запускать отдельных работников для каждого транспорта, или инструктировать одного работника, чтобы он обрабатывал сообщения в порядке приоритетности:

1
$ php bin/console messenger:consume async_priority_high async_priority_low

Работник будет всегда вначале искать сообщения, ожидающие async_priority_high. Если таких нет, затем он будет потреблять сообщения из async_priority_low.

Ограничьте потребление для конкретных очередей

Некоторый транспорт (особенно AMQP) имеет концепцию обмена и очередей. Транспорт Symfony всегда связан с обменом. По умолчанию, работник потребляет из всех очередей, соединенных с обменом указанного транспорта. Однако, есть примеры использования, когда вам нужно, чтобы работник потреблял только из конкретной очереди.

You can limit the worker to only process messages from specific queues:

1
$ php bin/console messenger:consume my_transport --queues=fasttrack

Чтобы позволить использование опции queues, получатель должен реализовывать QueueReceiverInterface.

New in version 5.3: Ограничение работника для конкретных очередей было представлено в Symfony 5.3.

Конфигурация супервизора

Супервизор - это отличный инструмент для гарантии того, что процесс(ы) ваших работников всегда производятся (даже если он закрывается в связи с ошибкой, достижением лимита сообщений или благодаря messenger:stop-workers). Вы можете установить его на Ubuntu, к примеру, через:

1
$ sudo apt-get install supervisor

Файлы конфигурации Супервизора обычно живут в каталоге /etc/supervisor/conf.d. Например, вы можете создать там новый файл messenger-worker.conf, чтобы убедиться, что 2 экземпляра messenger:consume работают всегда:

1
2
3
4
5
6
7
8
9
;/etc/supervisor/conf.d/messenger-worker.conf
[program:messenger-consume]
command=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600
user=ubuntu
numprocs=2
startsecs=0
autostart=true
autorestart=true
process_name=%(program_name)s_%(process_num)02d

Измените аргумент async, чтобы он использовал название вашего транспорта (или транспортов) и user на Unix-пользователя на вашем сервере.

Если вы используете транспорт Redis, заметьте, что каждому работнику нужно уникальное имя потребителя, чтобы избежать обработки одного сообщения несколькими работникам. Один из способов достичь этого - установить переменную окружения в файле конфигурации Супервизора, на которую вы потом можете сослаться в messenger.yaml (см. раздел Redis выше):

1
environment=MESSENGER_CONSUMER_NAME=%(program_name)s_%(process_num)02d

Next, tell Supervisor to read your config and start your workers:

1
2
3
4
5
$ sudo supervisorctl reread

$ sudo supervisorctl update

$ sudo supervisorctl start messenger-consume:*

См. документацию Супервизора, чтобы узнать больше деталей.

Повторные попытки и ошибки

Если во время потребления сообщения из транспорта будет вызвано исключение, оно будет автоматически повторно отправлено транспорту, чтобы попробовать снова. По умолчанию, сообщение имеет 3 попытки перед сбросом или отправкой транспорту ошибок. Каждая повторная попытка будет тажке отложена во времени, на случай, если она была вызвана временной проблемой. Все это можно сконфигурировать для каждого транспорта:

  • YAML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    # config/packages/messenger.yaml
    framework:
        messenger:
            transports:
                async_priority_high:
                    dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
    
                    # конфигурация по умолчанию
                    retry_strategy:
                        max_retries: 3
                        # задержка в милисекундах
                        delay: 1000
                        # делает так, чтобы задержка была дольше перед каждой повторной попыткой
                        # например, задержка в 1 секунду, 2 секунду, 4 секунду
                        multiplier: 2
                        max_delay: 0
                        # переопределить это все сервисом, который
                        # реализует Symfony\Component\Messenger\Retry\RetryStrategyInterface
                        # service: null
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <framework:messenger>
                <framework:transport name="async_priority_high" dsn="%env(MESSENGER_TRANSPORT_DSN)%?queue_name=high_priority">
                    <framework:retry-strategy max-retries="3" delay="1000" multiplier="2" max-delay="0"/>
                </framework:transport>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $messenger = $framework->messenger();
    
        $messenger->transport('async_priority_high')
            ->dsn('%env(MESSENGER_TRANSPORT_DSN)%')
            // конфигурация по умолчанию
            ->retryStrategy()
                ->maxRetries(3)
                // задержка в милисекундах
                ->delay(1000)
                // делает так, чтобы задержка была дольше перед каждой повторной попыткой
                // например, задержка в 1 секунду, 2 секунду, 4 секунду
                ->multiplier(2)
                ->maxDelay(0)
                // переопределить это все сервисом, который
                // реализует Symfony\Component\Messenger\Retry\RetryStrategyInterface
                ->service(null)
        ;
    };
    

Tip

Symfony запускает WorkerMessageRetriedEvent, когда сообщение имеет повторные попытки, поэтому вы можете выполнить собственную логику.

New in version 5.2: Класс WorkerMessageRetriedEvent был представлен в Symfony 5.2.

Избегание повторных попыток

Иногда обработка сообщения может быть неудачной, и вы будете знать, что это перманентно и повторных попыток не нужно. Если вы вызовете UnrecoverableMessageHandlingException, сообщение не будет иметь повторных попыток.

Форсирование повторных попыток

New in version 5.1: RecoverableMessageHandlingException был представлен в Symfony 5.1.

Иногда обработка сообщения может быть неудачной, и вы будете знать, что это временно и необходима повторная попытка. Если вы вызовете RecoverableMessageHandlingException, сообщение всегда будет иметь повторную попытку.

Сохранения и повторные попытки неудачных сообщений

Если сообщение терпит неудачу и имеет несколько повторных попыток (max_retries), оно затем будет отбраковано. Чтобы избежать этого, вы можете сконфигурировать failure_transport:

  • YAML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    # config/packages/messenger.yaml
    framework:
        messenger:
            # после повторных попыток, сообщения будут отправлены транспорту "failed"
            failure_transport: failed
    
            transports:
                # ... другой транспорт
    
                failed: 'doctrine://default?queue_name=failed'
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <!-- после повторных попыток, сообщения будут отправлены транспорту "failed" -->
            <framework:messenger failure-transport="failed">
                <!-- ... другой транспорт -->
    
                <framework:transport name="failed" dsn="doctrine://default?queue_name=failed"/>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $messenger = $framework->messenger();
    
        // после повторных попыток, сообщения будут отправлены транспорту "failed"
        $messenger->failureTransport('failed');
    
        // ... другой транспорт
    
        $messenger->transport('failed')
            ->dsn('doctrine://default?queue_name=failed');
    };
    

В этом примере, если обработка сообщения терпит неудачу 3 раза (значение по умолчанию max_retries), оно затем будет отправлено транспорту failed. Хотя вы можете использовать messenger:consume failed для потребления его, как обычного транспорта, вам скорее захочется вручную просмотреть сообщения в транспорте ошибок и решить об их повторных попытках:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# увидеть все сообщения в транспорте ошибок
$ php bin/console messenger:failed:show

# увидеть детали конкретной ошибки
$ php bin/console messenger:failed:show 20 -vv

# увидеть и повторно попробовать каждое сообщение по-отдельности
$ php bin/console messenger:failed:retry -vv

# повторная попытка конкретных сообщений
$ php bin/console messenger:failed:retry 20 30 --force

# удалить сообщение без повторной попытки
$ php bin/console messenger:failed:remove 20

# удалить сообщения без повторных попыток и показать каждое сообщение перед удалением
$ php bin/console messenger:failed:remove 20 30 --show-messages

New in version 5.1: Опция --show-messages была представлена в Symfony 5.1.

Если сообщение опять потерпит неудачу, оно будет отправлено обратно в транспорт ошибок в соответствии с обычными правилами повторных попыток. Как только будет достигнут максимум повторных попыток, сообщение будет сброшено перманентно.

Несколько транспортов ошибок

New in version 5.3: Возможность использовать несколько транспортов ошибок была представлена в Symfony 5.3.

Иногда недостаточно иметь один глобальный сконфигурированный failed transport, котому что некоторые сообщения важнее, чем другие. В таких случаях, вы можете переопределить транспорт ошибок только для определенных транспортов:

  • YAML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    # config/packages/messenger.yaml
    framework:
        messenger:
            # после повторных попыток, сообщения будут отправлены транспорту "failed"
            # по умолчанию, если внутри транспорта не сконфигурирован "failed_transport"
            failure_transport: failed_default
    
            transports:
                async_priority_high:
                    dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                    failure_transport: failed_high_priority
    
                # так как транспорт ошибок не сконфигурирован, будет использоваться установленный
                # глобальный набор "failure_transport"
                async_priority_low:
                    dsn: 'doctrine://default?queue_name=async_priority_low'
    
                failed_default: 'doctrine://default?queue_name=failed_default'
                failed_high_priority: 'doctrine://default?queue_name=failed_high_priority'
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <!-- после повторных попыток, сообщения будут отправлены транспорту "failed"
            по умолчанию, если внутри транспорта не сконфигурирован "failed_transport" -->
            <framework:messenger failure-transport="failed_default">
                <framework:transport name="async_priority_high" dsn="%env(MESSENGER_TRANSPORT_DSN)%" failure-transport="failed_high_priority"/>
                <!-- так как транспорт ошибок не сконфигурирован, будет использоваться установленный
                глобальный набор "failure_transport" -->
                <framework:transport name="async_priority_low" dsn="doctrine://default?queue_name=async_priority_low"/>
    
                <framework:transport name="failed_default" dsn="doctrine://default?queue_name=failed_default"/>
                <framework:transport name="failed_high_priority" dsn="doctrine://default?queue_name=failed_high_priority"/>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $messenger = $framework->messenger();
    
        // после повторных попыток, сообщения будут отправлены транспорту "failed"
        // по умолчанию, если внутри транспорта не сконфигурирован "failed_transport"
        $messenger->failureTransport('failed_default');
    
        $messenger->transport('async_priority_high')
            ->dsn('%env(MESSENGER_TRANSPORT_DSN)%')
            ->failureTransport('failed_high_priority');
    
        // так как транспорт ошибок не сконфигурирован, будет использоваться установленный
        // глобальный набор "failure_transport"
       $messenger->transport('async_priority_low')
            ->dsn('doctrine://default?queue_name=async_priority_low');
    
       $messenger->transport('failed_default')
            ->dsn('doctrine://default?queue_name=failed_default');
    
       $messenger->transport('failed_high_priority')
            ->dsn('doctrine://default?queue_name=failed_high_priority');
    };
    

Если нету определенного failure_transport глобально или на уровне транспорта, сообщение будет сброшено после определенного количества попыток.

Неудачные команды имеют необязательную опцию --transport, чтобы указать failure_transport, сконфигурированный на уровне транспорта.

1
2
3
4
5
6
7
8
# увидеть все сообщения в транспорте "failure_transport"
$ php bin/console messenger:failed:show --transport=failure_transport

# повторная попытка конкретных сообщений из "failure_transport"
$ php bin/console messenger:failed:retry 20 30 --transport=failure_transport --force

# удалить сообщение без повторной попытки из "failure_transport"
$ php bin/console messenger:failed:remove 20 --transport=failure_transport

Конфигурация транспорта

Мессенджер поддерживает несколько разных типов транспорта, каждый со своими опциями. Опции могут быть переданы транспорту через строку DSN или конфигурацию.

1
2
# .env
MESSENGER_TRANSPORT_DSN=amqp://localhost/%2f/messages?auto_setup=false
  • YAML
    1
    2
    3
    4
    5
    6
    7
    8
    # config/packages/messenger.yaml
    framework:
        messenger:
            transports:
                my_transport:
                    dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
                    options:
                        auto_setup: false
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <framework:messenger>
                <framework:transport name="my_transport" dsn="%env(MESSENGER_TRANSPORT_DSN)%">
                    <framework:options auto-setup="false"/>
                </framework:transport>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $messenger = $framework->messenger();
    
        $messenger->transport('my_transport')
            ->dsn('%env(MESSENGER_TRANSPORT_DSN)%')
            ->options(['auto_setup' => false]);
    };
    

Опции, определенные под options, главенствуют над определенными в DSN.

Транспорт AMQP

Транспорт AMQP использует PHP-расширение AMQP для отправки сообщений в очередь вроде RabbitMQ.

New in version 5.1: Начиная с Symfony 5.1, транспорта AMQP переехал в отдельный пакет. Установите его, выполнив:

1
$ composer require symfony/amqp-messenger

DSN транспорта AMQP может выглядеть так:

1
2
3
4
5
# .env
MESSENGER_TRANSPORT_DSN=amqp://guest:[email protected]:5672/%2f/messages

# или используйте протокол AMQPS
MESSENGER_TRANSPORT_DSN=amqps://guest:[email protected]/%2f/messages

New in version 5.2: Поддержка протокола AMQPS была представлена в Symfony 5.2.

Если вы хотите использовать AMQP, зашифрованный с помощью TLS/SSL, вы должны также предоставить CA-сертификат. Определите путь сертификата в настройке PHP.ini amqp.cacert (например, amqp.cacert = /etc/ssl/certs) или в параметре DSN cacert (например, amqps://localhost?cacert=/etc/ssl/certs/).

Порт, по умолчанию используемый AMQP, зашифрованным с помощью TLS/SSL, - 5671, но вы можете переопределить его в параметре DSN port (например, amqps://localhost?cacert=/etc/ssl/certs/&port=12345).

Note

По умолчанию, транспорт будет автоматически создавать любые необходимые обмены, очереди и связующие ключи. Это можно отключить, но некоторые функции могут работать некорректно (вроде отложенных очередей).

Note

С Symfony 5.3 или новее, вы можете ограничить потребителей транспорта AMQP, чтобы они обрабатывали только сообщения из некоторых очередей какого-то обмена. См. Limit Consuming to Specific Queues.

Транспорт имеет другие опции, включая способы конфигурации обмена, связующие ключи очереди и т.д. Смотрите документацию Connection.

Транспорт имеет ряд опций:

Опция Описание По умолчанию
auto_setup Должна ли таблица быть создана автоматически во время отправки / получения. true
cacert Путь к файлу CA-сертификата в PEM-формате.  
cert Путь к сертификату клиента в PEM-формате.  
channel_max Указывает наибольшее число каналов, которые позволяет сервер. 0 означает стандартны лимит расширений.  
confirm_timeout Тайм-аут для подтверждения в секундах; если не указан, транспорт не будет ожидать подтверждения сообщения, Примечание: 0 или больше секунд. Может быть дробным.  
connect_timeout Тайм-аут соединения. Примечание: 0 или больше секунд. Может быть дробным.  
frame_max Наибольший размер фрейма, который предлагает сервер для соединения, включая заголовок фрейма и энд-байт. 0 означает стандартный лимит расщирения (зависит от лимита размера фрейма librabbimq по умолчанию)  
heartbeat Задержка пульсации соединения (в секундах), которую хочет сервер. 0 означает, что сервер не хочет пульсации. Отметьте, что librabbimq имеет ограниченную поддержку пульсации, что означает, что пульсация проверяется только во время блокировки вызовов.  
host Имя хоста AMQP-сервиса  
key Пусть к ключу клиента в PEM-формате.  
password Password to use to connect to the AMQP service  
persistent   'false'
port Порт AMQP-сервиса  
prefetch_count    
read_timeout Тайм-аут входящей активности. Note: Примечание: 0 или больше секунд. Может быть дробным.  
retry    
sasl_method    
user Имя пользователя для соединения с AMQP-сервисом  
verify Включает или выключает верификацию точек. Если верификацию включена, то общее имя в сертификате сервера должно совпадать с именем сервера. Верификация включена по умолчанию.  
vhost Виртуальный хост для использования с AMQP-сервисом  
write_timeout Тайм-аут исходящей активности. Примечание: 0 или больше секунд. Может быть дробным.  
delay[queue_name_pattern] Паттерн, используемый для создания очередей delay_%exchange_name%_%routing_key%_%delay%
delay[exchange_name] Имя обмена, используемое для отложенных/повторных сообщений delays
queues[name][arguments] Дополнительные аргументы  
queues[name][binding_arguments] Аргументы, используемые при связывании очереди.  
queues[name][binding_keys] Связующие ключи (если есть) для связывания с очередью  
queues[name][flags] Флажки очереди AMQP_DURABLE
exchange[arguments]    
exchange[default_publish_routing_key] Ключ маршрутизации, используемый при публикации, если он не указан в сообщении  
exchange[flags] Флажки обмена AMQP_DURABLE
exchange[name] Название обмена  
exchange[type] Тип обмена fanout

New in version 5.2: Опция confirm_timeout была представлена в Symfony 5.2.

Deprecated since version 5.3: Опция prefetch_count устарела в Symfony 5.3 так как она не имеет эффекта в транспорте AMQP мессенджера.

Вы можете также сконфигурирвать настройки специально для AMQP в вашем оообщении, добавив AmqpStamp к вашему Конверту:

use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp;
// ...

$attributes = [];
$bus->dispatch(new SmsNotification(), [
    new AmqpStamp('custom-routing-key', AMQP_NOPARAM, $attributes),
]);

Caution

Потребители не отображаются в панели админа, так как этот транспорт не полагается на \AmqpQueue::consume(), который блокирует. Наличие блокирующего получателя делает опции --time-limit/--memory-limit команды messenger:consume. а также команды messenger:stop-workers бесполезными, так как они полагаются на тот факт, что получатель возвращается незамедлительно, независимо от того, находит он сообщение или нет. Работник потребления отвечает за итерацию до получения сообщения для обработки и/или до того, как будет достигнуто одно из условий остановки. Таким образом, логика остановки работника может быть достигнута, если он застрял на блокирующем вызове.

Транспорт Doctrine

Транспорт Doctrine может быть использован для хранения сообщений в таблице базы данных.

New in version 5.1: Начиная с Symfony 5.1, транспорт Doctrine был перемещен в отдельный пакет. Установите его, запустив:

1
$ composer require symfony/doctrine-messenger

The Doctrine transport DSN may looks like this:

1
2
# .env
MESSENGER_TRANSPORT_DSN=doctrine://default

Формат doctrine://<connection_name>, в случае если у вас есть несколько соединений и вы хотите использовать какое-либо другое, чем “default”. Транспорт будет автоматически создавать таблицу под названием messenger_messages.

New in version 5.1: Возможность автоматически генерировать миграцию для таблицы messenger_messages была вредставлена в Symfony 5.1 и DoctrineBundle 2.1.

Или, для создания таблицы самостоятельно, установите опцию auto_setup как false, и сгенерируйте миграцию.

Caution

Свойство datetime сообщений, хранящееся в базе данных, использует временную зону текущей системы. Это может вызвать проблемы, если несколько машин с разными конфигурациями временных зон используют одно хранилище.

Транспорт имеет такие опции:

Опция Описание По умолчанию
table_name Название таблицы messenger_messages
queue_name Название очереди (столбец в таблице, чтобы использовать одну таблицу для нескольких транспортов) default
redeliver_timeout Тайм-аут перед повторной попыткой сообщения в очереди, но не в состоянии “обработки” (если работник по какой-то причине остановился, произойдет это, в итоге вы должны снова попробовать сообщение) - в секундах. 3600
auto_setup Должна ли таблица быть создана автоматически во время отправки/получения. true

New in version 5.1: Возможность использовать PostgreSQL LISTEN/NOTIFY была представлена в Symfony 5.1.

При использовании PostgreSQL, у вас есть доступ к следующим опциям для получения преимуществ функции LISTEN/NOTIFY. Это позволяет более производительный подход, чем поведение голосования транспорта Doctrine по умолчанию, так как PostgreSQL будет напрямую уведомлять работников, когда новое сообщение будет появляться в таблице.

Опция Описание По умолчанию
use_notify Использовать ли LISTEN/NOTIFY. true
check_delayed_interval Интервал проверки отложенных сообщений, в милисекундах. Установите как 0, чтобы отключить проверку. 1000
get_notify_timeout Продолжительность времени ожидания ответа при вызове PDO::pgsqlGetNotify`, в милисекундах. 0

Транспорт Beanstalkd

New in version 5.2: Транспорт Beanstalkd был представлен в Symfony 5.2.

Транспорт Beanstalkd отправляет сообщения прямо с рабочую очередь Beanstalkd. Установите его, выполнив:

1
$ composer require symfony/beanstalkd-messenger

DSN транспорта Beanstalkd может выглядеть так:

1
2
3
4
5
# .env
MESSENGER_TRANSPORT_DSN=beanstalkd://localhost:11300?tube_name=foo&timeout=4&ttr=120

# Если порта нет, он по умолчанию будет 11300
MESSENGER_TRANSPORT_DSN=beanstalkd://localhost

Транспорт имеет ряд опций:

Опция Описание По умолчанию
tube_name Название очереди default
timeout Тайм-аут резерва сообщения - в секундах. 0 (заставит сервер сразу же либо вернуть ответ, либо вызвать TransportException)
ttr Время выполнения сообщения перед тем как поместить его обратно в очередь готовности - в секундах. 90

Транспорт Redis

Транспорт Redis использует `streams`_ для создания очереди сообщений. Этот транспорт требует PHP-расширения Redis (>=4.3) и работающего сервера Redis (^5.0).

New in version 5.1: Начиная с Symfony 5.1, транспорт Redis был перемешен в отдельный пакет. Установите его, выполнив:

1
$ composer require symfony/redis-messenger

DSN транспорта Redis может выглядеть так:

1
2
3
4
5
6
7
8
# .env
MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
# Полный пример DSN
MESSENGER_TRANSPORT_DSN=redis://[email protected]:6379/messages/symfony/consumer?auto_setup=true&serializer=1&stream_max_entries=0&dbindex=0
# Пример кластера Redis
MESSENGER_TRANSPORT_DSN=redis://host-01:6379,redis://host-02:6379,redis://host-03:6379,redis://host-04:6379
# Пример Unix-сокета
MESSENGER_TRANSPORT_DSN=redis:///var/run/redis.sock

New in version 5.1: DSN Unix-соекта была представлена в Symfony 5.1.

Некоторые опции могут быть сконфигурированы через DSN или ключ options под транспортом в messenger.yaml:

Опция Описание По умолчанию
stream Название потока Redis messages
group Название группы потребителей Redis symfony
consumer Название потребителя, используемого в Redis consumer
auto_setup Создать группу Redis автоматически? true
auth Пароль Redis  
delete_after_ack Если true, сообщения откладываются автоматически после обраотки false
delete_after_reject Если true, сообщения откладываются автоматически, если они отклоняются true
lazy Соединяться только если соединение действительно необходимо false
serializer Как сериализовать финальную нагрузку в Redis (опция Redis::OPT_SERIALIZER ) Redis::SERIALIZER_PHP
stream_max_entries Максимальное количество записей, до которого будет усечен поток. Установите достаточно большое значение, чтобы избежать потери “подвешенных” сообщений 0 (что означает “не усекать”)
tls Включить TLS-поддержку соединения false
redeliver_timeout Тайм-аут перед повторной попыткой “подвешенного” сообщения, которое принадлежит заброшенному потребителю (если работник по какой-то причине умер, произойдет это, и в итоге вам нужно будет произвести повторную попытку сообщения) - в секундах. 3600
claim_interval Интервал с которым нужно проверять “подвешенные”/заброшенные сообщения - в миллисекундах 60000 (1 минута)

Caution

Никогда не должно быть более одной команды messenger:consume выполняемой с одинаковой комбинацией stream, group и consumer, иначе сообщения могут быть обработаны более, чем один раз. Если вы запускаете несколько работников очерели, consumer может быть установлен как переменная окружения (вроде %env(MESSENGER_CONSUMER_NAME)%), установленная Супервизором (пример ниже) или любым другим сервисом, используемым для управления процессами работников. В окружении контейнера, HOSTNAME может быть использовано как имя потребителя, так как там только один работник на контейнер/хост. Если вы используете Kubernetes для управления контейнерами, рассмотрите использование StatefulSet для стабилизации имен.

Tip

Установите delete_after_ack как true (если у вас одна группа) или определите stream_max_entries (если вы можете предположить, какое максимальное количество записей допустимо в вашем случае), чтобы избежать утечек памяти. В другом случае. все сообщения навсегда останутся в Redis.

New in version 5.1: Опции delete_after_ack, redeliver_timeout и claim_interval были представлены в Symfony 5.1.

New in version 5.2: Опции delete_after_reject и lazy были представлены в Symfony 5.2.

Транспорт в памяти

Транспорт in-memory на самом деле не доставляет сообщения. Вместо этого, он дедржит их в памяти во время запроса, что может быть полезным для тестирования. Например, если у вас есть транспорт async_priority_normal, вы можете переопределить его в окружении test, чтобы использовать этот транспорт:

  • YAML
    1
    2
    3
    4
    5
    # config/packages/test/messenger.yaml
    framework:
        messenger:
            transports:
                async_priority_normal: 'in-memory://'
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    <!-- config/packages/test/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <framework:messenger>
                <framework:transport name="async_priority_normal" dsn="in-memory://"/>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
    1
    2
    3
    4
    5
    6
    7
    8
    9
    // config/packages/test/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $messenger = $framework->messenger();
    
        $messenger->transport('async_priority_normal')
            ->dsn('in-memory://');
    };
    

Тогда, во время тестирования, сообщения не будут отправлены реальному транспорту. Даже лучше, в тесте, вы можете проверить, чтобы только одно сообщение было отправлено во время запроса:

// tests/Controller/DefaultControllerTest.php
namespace App\Tests\Controller;

use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;
use Symfony\Component\Messenger\Transport\InMemoryTransport;

class DefaultControllerTest extends WebTestCase
{
    public function testSomething()
    {
        $client = static::createClient();
        // ...

        $this->assertSame(200, $client->getResponse()->getStatusCode());

        /* @var InMemoryTransport $transport */
        $transport = self::$container->get('messenger.transport.async_priority_normal');
        $this->assertCount(1, $transport->getSent());
    }
}

Транспорт имеет ряд опций:

serialize (булево, по умолчанию: false)
Сериализовать сообщения или нет. Это полезно для тестирования дополнительного слоя, особенно когда вы используете собственный сериализатор сообщений.

New in version 5.3: Опция serialize была представлена в Symfony 5.3.

Note

Все транспорты in-memory будут автоматически сброшены после каждого теста в классах тестов, расширяющих KernelTestCase или WebTestCase.

Amazon SQS

New in version 5.1: Транспорт Amazon SQS был представлен в Symfony 5.1.

Транспорт Amazon SQS прекрасно подходит для приложения на AWS. Установите его, выполнив:

1
$ composer require symfony/amazon-sqs-messenger

DSN SQS транспорта выглядит так:

1
2
3
# .env
MESSENGER_TRANSPORT_DSN=https://sqs.eu-west-3.amazonaws.com/123456789012/messages?access_key=AKIAIOSFODNN7EXAMPLE&secret_key=j17M97ffSVoKI0briFoo9a
MESSENGER_TRANSPORT_DSN=sqs://localhost:9494/messages?sslmode=disable

Note

Транспорт автоматически создаст необходимые очерди. Это можно отключить, установив опцию auto_setup как false.

Tip

До отправки или получения сообщения, Symfony необходимо конвертировать название очереди в URL очереди AWS вызвав API GetQueueUrl в AWS. Этого дополнительного API-вызова можно избежать, предоставив DSN, которая является URL очереди.

New in version 5.2: Функция предоставления URL очереди в DSN была представлена в Symfony 5.2.

Транспорт имеет ряд опций:

Опция Описание По умолчанию
access_key Ключ доступа AWS  
account Идентификатор AWS-аккаунта Владелец учетных данных
auto_setup Нужно ли автоматически создават очередь во время отправки/получения. true
buffer_size Количество сообщений для предварительного извлечения 9
debug Если true. ведет лог всех HTTP запросов и ответов (это влияет на производительность) false
endpoint Абсолютный URL к SQS-сервису https://sqs.eu-west-1.amazonaws.com
poll_timeout Время ожидания нового сообщения в секундах 0.1
queue_name Название очереди messages
region Название AWS-региона eu-west-1
secret_key Секретный ключ AWS  
visibility_timeout Количество секунд, на протяжении которых сообщение не будет видимым (Visibility Timeout) Конфигурация очереди
wait_time Длительность Long polling в секундах 20

New in version 5.3: Опция debug была представлена в Symfony 5.3.

Note

Параметр wait_time определяет максимальное время ожидания для Amazon SQS до того, как сообщение будет доступно в очереди, перед отправкой ответа. Это помогает снизить стоимость использования Amazon SQS, устранив некоторое количество пустых ответов.

Параметр poll_timeout определяет время ожидания получателя до возвращения null. Он избегает блокировки других получателей от вызова.

Note

Если название очереди имеет суффикс .fifo, AWS создаст очередь FIFO. Используйте марку AmazonSqsFifoStamp, чтобы определить Message group ID и Message deduplication ID.

Очереди FIFO не поддерживают установки задержки отдельных сообдений, значение delay: 0 требуется в настройках стратегии повторных попыток.

Сериализация сообщений

Когда сообщения отправляются (и получаются) в транспорт, они сериализуются с использование нативных функций PHP serialize() и unserialize(). Вы можете изменить это глобально (или для каждого транспорта) на сервис, реализующий SerializerInterface:

  • YAML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    # config/packages/messenger.yaml
    framework:
        messenger:
            serializer:
                default_serializer: messenger.transport.symfony_serializer
                symfony_serializer:
                    format: json
                    context: { }
    
            transports:
                async_priority_normal:
                    dsn: # ...
                    serializer: messenger.transport.symfony_serializer
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <framework:messenger>
                <framework:serializer default-serializer="messenger.transport.symfony_serializer">
                    <framework:symfony-serializer format="json">
                        <framework:context/>
                    </framework:symfony-serializer>
                </framework:serializer>
    
                <framework:transport name="async_priority_normal" dsn="..." serializer="messenger.transport.symfony_serializer"/>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $messenger = $framework->messenger();
    
        $messenger->serializer()
            ->defaultSerializer('messenger.transport.symfony_serializer')
            ->symfonySerializer()
                ->format('json')
                ->context('foo', 'bar');
    
        $messenger->transport('async_priority_normal')
            ->dsn(...)
            ->serializer('messenger.transport.symfony_serializer');
    };
    

messenger.transport.symfony_serializer - это встроенный сервис, который использует компонент Сериализатор и может быть сконфигурирован несколькими способами. Если вы выберете использовать сериализатор Symfony, вы сможете контролировать контекст для каждого случая отдельно через SerializerStamp (см. Конверты и марки).

Tip

При отправке/получении сообщений в/из другого транспорта, вам может понадобиться больше контроля над процессом сериализации. Использование пользовательского сериализатора предоставляет такой контроль. См. `Туториал по сериализации сообщений SymfonyCasts`_, чтобы узнать больше.

Настройка обработчиков

Manually Configuring Handlers

Symfony обычно будет находить и регистрировать вашего обработчика автоматически. Но вы также можете сконфигурировать его вручную - и передать ему дополнительную конфигурацию - тегировав сервис обработчика messenger.message_handler

  • YAML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    # config/services.yaml
    services:
        App\MessageHandler\SmsNotificationHandler:
            tags: [messenger.message_handler]
    
            # или сконфигурировать с опциями
            tags:
                -
                    name: messenger.message_handler
                    # необходимо только если невозможно угадать по подсказке
                    handles: App\Message\SmsNotification
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    <!-- config/services.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd">
    
        <services>
            <service id="App\MessageHandler\SmsNotificationHandler">
                 <!-- handles необходимо только если невозможно угадать по подсказке -->
                 <tag name="messenger.message_handler"
                      handles="App\Message\SmsNotification"/>
            </service>
        </services>
    </container>
    
  • PHP
    1
    2
    3
    4
    5
    6
    7
    8
    9
    // config/services.php
    use App\Message\SmsNotification;
    use App\MessageHandler\SmsNotificationHandler;
    
    $container->register(SmsNotificationHandler::class)
        ->addTag('messenger.message_handler', [
            // необходимо только если невозможно угадать по подсказке
            'handles' => SmsNotification::class,
        ]);
    

Возможные опции конфигурации с тегами:

  • bus
  • from_transport
  • handles
  • method
  • priority

Подписчик и опции обработчика

Класс обработчика может обрабатывать множество сообщений или конфигурировать сам себя, реализуя MessageSubscriberInterface:

// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;

use App\Message\OtherSmsNotification;
use App\Message\SmsNotification;
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;

class SmsNotificationHandler implements MessageSubscriberInterface
{
    public function __invoke(SmsNotification $message)
    {
        // ...
    }

    public function handleOtherSmsNotification(OtherSmsNotification $message)
    {
        // ...
    }

    public static function getHandledMessages(): iterable
    {
        // обработать это сообщение в __invoke
        yield SmsNotification::class;

        // также обработать это сообщение в handleOtherSmsNotification
        yield OtherSmsNotification::class => [
            'method' => 'handleOtherSmsNotification',
            //'priority' => 0,
            //'bus' => 'messenger.bus.default',
        ];
    }
}

Связывание обработчиков с разными транспортами

Каждое сообщение может иметь несколько обработчиков, и когда сообщение потребляется, вызываются все его обработчики. Но вы можете также сконфигурироать обработчика так, чтобы он вызывался только когда сообщение получено из конкретного транспорта. Это позволяет вам имет одно сообщение, где каждый обработчик вызывается разными “работниками”, потребляющими разный транспорт.

Представьте, что у вас есть сообщение UploadedImage с двумя обработчиками:

  • ThumbnailUploadedImageHandler: вы хотите, чтобы это обрабатывалось транспортом под названием image_transport
  • NotifyAboutNewUploadedImageHandler: вы хотите, чтобы это обрабатывалось транспортом под названием async_priority_normal

Чтобы сделать это, добавьте опцию from_transport к каждому обработчику. Например:

// src/MessageHandler/ThumbnailUploadedImageHandler.php
namespace App\MessageHandler;

use App\Message\UploadedImage;
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;

class ThumbnailUploadedImageHandler implements MessageSubscriberInterface
{
    public function __invoke(UploadedImage $uploadedImage)
    {
        // создать миниатюры
    }

    public static function getHandledMessages(): iterable
    {
        yield UploadedImage::class => [
            'from_transport' => 'image_transport',
        ];
    }
}

И, похожим образом:

// src/MessageHandler/NotifyAboutNewUploadedImageHandler.php
// ...

class NotifyAboutNewUploadedImageHandler implements MessageSubscriberInterface
{
    // ...

    public static function getHandledMessages(): iterable
    {
        yield UploadedImage::class => [
            'from_transport' => 'async_priority_normal',
        ];
    }
}

Затем, убедитесь, что “маршрутизируете” ваше сообщение к обоим транспортам:

  • YAML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    # config/packages/messenger.yaml
    framework:
        messenger:
            transports:
                async_priority_normal: # ...
                image_transport: # ...
    
            routing:
                # ...
                'App\Message\UploadedImage': [image_transport, async_priority_normal]
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <framework:messenger>
                <framework:transport name="async_priority_normal" dsn="..."/>
                <framework:transport name="image_transport" dsn="..."/>
    
                <framework:routing message-class="App\Message\UploadedImage">
                    <framework:sender service="image_transport"/>
                    <framework:sender service="async_priority_normal"/>
                </framework:routing>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $messenger = $framework->messenger();
    
        $messenger->transport('async_priority_normal')->dsn(...);
        $messenger->transport('image_transport')->dsn(...);
    
        $messenger->routing('App\Message\UploadedImage')
            ->senders(['image_transport', 'async_priority_normal']);
    };
    

Вот и все! Теперь вы можете потреблять каждый транспорт:

1
2
3
4
# вызовет ThumbnailUploadedImageHandler только при обработке сообщения
$ php bin/console messenger:consume image_transport -vv

$ php bin/console messenger:consume async_priority_normal -vv

Caution

Если обработчик не имеет конфигурации from_transport, он будет выполнен в каждом транспорте, из которого будет получено это сообщение.

Расширение мессенлжера

Конверты и марки

Сообщение может быть любым PHP-объектом. Иногда вам может понадобиться сконфигурировать что-то дополнительное в сообщении - вроде того, как оно должно быть обработано внутри AMQP или добавления задержки перед обработкой сообщения. Вы можете сделать это, добавив марку (“stamp”) к вашему сообщению:

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;

public function index(MessageBusInterface $bus)
{
    $bus->dispatch(new SmsNotification('...'), [
        // подождать 5 секунд перед обработкой
        new DelayStamp(5000),
    ]);

    // или ясно создайте Конверт
    $bus->dispatch(new Envelope(new SmsNotification('...'), [
        new DelayStamp(5000),
    ]));

    // ...
}

Внутренне, каждое сообщение оборачивается в конверт (Envelope), содержащий сообщение и марки. Вы можете создать его вручную или позволить автобусу сообщений сделать это. Существует множество различных марок для разных целей и они используются внутренне для отслеживания информации о сообщении - вроде того, какой автобус обрабатывает его или имеет ли оно повторные попытки после неудачи.

Промежуточное ПО

То, что происходит после запуска сообщения в автобус сообщений, зависит от его набора промежуточного ПО и его порядка. По умолчанию, промежуточное ПО, сконфигурированное для каждого автобуса, выглядит так:

  1. add_bus_name_stamp_middleware - добавляет марку для записи того, в каком атобусе было запущено это собщение;
  2. dispatch_after_current_bus- см. Transactional Messages: Handle New Messages After Handling is Done;
  3. failed_message_processing_middleware - обрабатывает сообщения, которые имеют повторные попытки через транспорт ошибок, чтобы они правильно функционировали, как будто бы они были получены из изначального транспорта;
  4. Ваша собственная коллекция middleware_;
  5. send_message - если машрутизация сконфигурирована для транспорта, отправляет сообщения этому транспорту и останавливает цепь промежуточного ПО;
  6. handle_message - вызывает обработчика(ов) сообщений для заданног сообщения.

Note

Эти названия промежуточного ПО - на самом деле сокращения. Настоящие id сервисов имеют префикс messenger.middleware. (например,``messenger.middleware.handle_message``).

Промежуточное ПО выполняется после запуска сообщения, и также еще раз, когда сообщение получено через работника (для сообщений, которые были отправлены транспорту для асинхронной обработки). Помните это, если вы создаете собственное промежуточное ПО.

Вы можете добавить собственное промежуточное ПО в список, или полностью отключить промежуточное ПО по умолчанию и добавить только ваше собственное:

  • YAML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    # config/packages/messenger.yaml
    framework:
        messenger:
            buses:
                messenger.bus.default:
                    # отключить промежуточное ПО по умолчанию
                    default_middleware: false
    
                    # и/или добавить ваше собственное
                    middleware:
                        # id серисов, релизующих Symfony\Component\Messenger\Middleware\MiddlewareInterface
                        - 'App\Middleware\MyMiddleware'
                        - 'App\Middleware\AnotherMiddleware'
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <framework:messenger>
                <!-- default-middleware: отключить промежуточное ПО по умолчанию -->
                <framework:bus name="messenger.bus.default" default-middleware="false"/>
    
                <!-- и/или добавить ваше собственное -->
                <framework:middleware id="App\Middleware\MyMiddleware"/>
                <framework:middleware id="App\Middleware\AnotherMiddleware"/>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $messenger = $framework->messenger();
    
        $bus = $messenger->bus('messenger.bus.default')
            ->defaultMiddleware(false);
        $bus->middleware()->id('App\Middleware\MyMiddleware');
        $bus->middleware()->id('App\Middleware\AnotherMiddleware');
    };
    

Note

Если сервис промежуточного ПО абстрактный, будет создан другой экземпляр сервиса для каждого автобуса.

Промежуточное ПО для Doctrine

New in version 1.11: Следующее промежуточное по для Doctrine было представлено в DoctrineBundle 1.11.

Если вы в своем приложении используете Doctrine, существует ряд необязательного промежуточного ПО, которое вы можете захотеть использовать:

  • YAML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    # config/packages/messenger.yaml
    framework:
        messenger:
            buses:
                command_bus:
                    middleware:
                        # каждый раз при обработке сообщения, соединение Doctrine
                        # "пингуется" и повторно подключается, если оно закрыто. Полезно,
                        # если ваши работники работают долгое время и соединение базы
                        # данных иногда теряется
                        - doctrine_ping_connection
    
                        # После обработки, соединение Doctrine закрывается, что может
                        # освободить соединения базы данных в работнике, вместо того,
                        # чтобы держать их открытыми всегда
                        - doctrine_close_connection
    
                        # оборачивает всех обработчиков в одну транзакцию Doctrine
                        # обработчикам не надо вызывать flush(), а ошибка в любом
                        # обработчике вызовет откат
                        - doctrine_transaction
    
                        # or pass a different entity manager to any
                        #- doctrine_transaction: ['custom']
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <framework:messenger>
                <framework:bus name="command_bus">
                    <framework:middleware id="doctrine_transaction"/>
                    <framework:middleware id="doctrine_ping_connection"/>
                    <framework:middleware id="doctrine_close_connection"/>
    
                    <!-- или передать другого менеджера сущностей любому -->
                    <!--
                    <framework:middleware id="doctrine_transaction">
                        <framework:argument>custom</framework:argument>
                    </framework:middleware>
                    -->
                </framework:bus>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $messenger = $framework->messenger();
    
        $bus = $messenger->bus('command_bus');
        $bus->middleware()->id('doctrine_transaction');
        $bus->middleware()->id('doctrine_ping_connection');
        $bus->middleware()->id('doctrine_close_connection');
        // Использование другого менеджера сущностей
        $bus->middleware()->id('doctrine_transaction')
            ->arguments(['custom']);
    };
    

Другое промежуточное ПО

New in version 5.3: Промежуточное ПО router_context было представлено в Symfony 5.3.

Добавьте промежуточное ПО router_context, если вам нужно генерировать абсолютные URL в потребителе (например, отображать шаблон со ссылками). Это промежуточное ПО хранит контекст изначального запроса (т.е. хост, HTTP-порт и т.д.), что необходимо при создании абсолютных URL.

  • YAML
    1
    2
    3
    4
    5
    6
    7
    # config/packages/messenger.yaml
    framework:
        messenger:
            buses:
                command_bus:
                    middleware:
                        - router_context
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <framework:messenger>
                <framework:bus name="command_bus">
                    <framework:middleware id="router_context"/>
                </framework:bus>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
    1
    2
    3
    4
    5
    6
    7
    8
    9
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $messenger = $framework->messenger();
    
        $bus = $messenger->bus('command_bus');
        $bus->middleware()->id('router_context');
    };
    

События Мессенджера

В дополнение к промежуточному ПО, Мессенджер также запускает несколько событий. Вы можете создать слушателя событий, чтобы подключаться к разным частям процесса. Для каждого, класс события будет названием события:

Несколько автобусов, автобусов команд и событий

Мессенджер предоставляет вам один сервис автобуса сообщений по умолчанию. Но вы можете сконфигурировать столько, сколько вы хотите, создав автобусы “команд”, “запросов” или “событий” и контролируя их промежуточное ПО. См. Multiple Buses.

Эта документация является переводом официальной документации Symfony и предоставляется по свободной лицензии CC BY-SA 3.0.