Дата оновлення перекладу 2022-05-23

Месенджер: робота з синхронізованими повідомленнями та повідомленнями у черзі

Месенджер надає автобус повідомлень з можливістю відправлення повідомлень, а потім роботи з ними одразу ж у вашому додатку, або їх відправлення через транспорт (наприклад, чергу) для обробки пізніше. Щоб дізнатися про це більше, прочитайте документацію компонента Месенджер.

Установка

У додатках, що використовують Symfony Flex, виконайте цю команду, щоб встановити месенджер:

1
$ composer require symfony/messenger

Створення повідомлення та обробника

Месенджер будується на двох різних класах, які ви створите: (1) класі повідомлення, який містить дані, та (2) класі обробника(ів), який буде викликано після запуску повідомлення. Клас обробника читатиме клас повідомлення та виконувати якусь дію.

До класу повідомлення немає особливих вимог, окрім того, що його можна сериалізувати:

// src/Message/SmsNotification.php
namespace App\Message;

class SmsNotification
{
    private $content;

    public function __construct(string $content)
    {
        $this->content = $content;
    }

    public function getContent(): string
    {
        return $this->content;
    }
}

Обробник повідомлень - це PHP-викликане, рекомендований спосіб його створення - створити клас, який реалізує MessageHandlerInterface та має метод __invoke(), який має підказки класу повідомлення (або інтерфейс повідомлення):

// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;

use App\Message\SmsNotification;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class SmsNotificationHandler implements MessageHandlerInterface
{
    public function __invoke(SmsNotification $message)
    {
        // ... зробіть щось - на кшталт відправки SMS!
    }
}

Завдяки автоконфігурації та підказці SmsNotification, Symfony знає, що цей обробник має бути викликаний, коли запускається повідомлення SmsNotification. У більшості випадків, це все, що вам треба буде зробити. Але ви можете також сконфігурувати обробники повідомлень вручну. Щоб побачити всі сконфігуровані обробники, виконайте:

1
$ php bin/console debug:messenger

Запуск повідомлення

Ви готові! Для запуску повідомлення (та виклику обробника), впровадьте сервіс messenger.default_bus (через MessageBusInterface), наприклад, у контролер:

// src/Controller/DefaultController.php
namespace App\Controller;

use App\Message\SmsNotification;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\Messenger\MessageBusInterface;

class DefaultController extends AbstractController
{
    public function index(MessageBusInterface $bus)
    {
        // призведе до виклику SmsNotificationHandler
        $bus->dispatch(new SmsNotification('Look! I created a message!'));

        // або використайте це скорочення
        $this->dispatchMessage(new SmsNotification('Look! I created a message!'));

        // ...
    }
}

Транспорт: асинхронні повідомлення/повідомлення в черзі

За замовчуванням, повідомлення обробляються одразу ж після запуску. Якщо ви хочете обробити повідомлення асинхронно, ви можете сконфігурувати транспорт. Транспорт поже відправляти повідомлення (наприклад, в систему черги), а потім отримувати через робітника. Месенджер підтримує декілька транспортів.

Note

Якщо ви хочете використати транспорт, який не підтримується, подивіться на транспорт Enqueue, який підтримує речі на кшталт Kafka і Google Pub/Sub.

Транспорт реєструється з використанням “DSN”. Завдяки рецепту Flex для Месенджера, ваш файл .env вже має декілька прикладів.

1
2
3
# MESSENGER_TRANSPORT_DSN=amqp://guest:[email protected]:5672/%2f/messages
# MESSENGER_TRANSPORT_DSN=doctrine://default
# MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages

Розкоментуйте той транспорт, який ви хочете (або встановіть його в .env.local). Див. Transport Configuration, щоб дізнатися більше деталей.

Далі, в config/packages/messenger.yaml, давайте визначимо транспорт під назвою async, який використовує цю конфігурацію:

  • YAML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    # config/packages/messenger.yaml
    framework:
        messenger:
            transports:
                async: "%env(MESSENGER_TRANSPORT_DSN)%"
    
                # або розширено для конфігурації більшої кількості опцій
                #async:
                #    dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
                #    options: []
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <framework:messenger>
                <framework:transport name="async">%env(MESSENGER_TRANSPORT_DSN)%</framework:transport>
    
                <!-- або розширено для конфігурації більшої кількості опцій -->
                <framework:transport name="async"
                    dsn="%env(MESSENGER_TRANSPORT_DSN)%"
                >
                    <option key="...">...</option>
                </framework:transport>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $framework->messenger()
            ->transport('async')
                ->dsn('%env(MESSENGER_TRANSPORT_DSN)%')
        ;
    
        $framework->messenger()
            ->transport('async')
                ->dsn('%env(MESSENGER_TRANSPORT_DSN)%')
                ->options([])
        ;
    };
    

Маршрутизація повідомлень до транспорту

Тепер, коли у вас є сконфігурований транспорт, замість негайної обробки повідомлень, ви можете сконфігурувати їх так, щоб вони були відправлені транспорту:

  • YAML
    1
    2
    3
    4
    5
    6
    7
    8
    9
    # config/packages/messenger.yaml
    framework:
        messenger:
            transports:
                async: "%env(MESSENGER_TRANSPORT_DSN)%"
    
            routing:
                # async - це те ім'я, яке ви дали вашому транспорту вище
                'App\Message\SmsNotification': async
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <framework:messenger>
                <framework:routing message-class="App\Message\SmsNotification">
                    <!-- async - це те ім'я, яке ви дали вашому транспорту вище -->
                    <framework:sender service="async"/>
                </framework:routing>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
    1
    2
    3
    4
    5
    6
    7
    8
    9
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $framework->messenger()
            // async - це те ім'я, яке ви дали вашому транспорту вище
            ->routing('App\Message\SmsNotification')->senders(['async'])
        ;
    };
    

Завдяки цьому, App\Message\SmsNotification буде відправлено транспорту async, і його обробник(и) не будуть викликані одразу ж. Всі повідомлення, не співпавші з routing будуть оброблені негайно.

Ви також можете маршрутизувати класи за їхнім батьківським класом або інтерфейсом. Або відправляти повідомлення декільком транспортам:

  • YAML
    1
    2
    3
    4
    5
    6
    7
    8
    9
    # config/packages/messenger.yaml
    framework:
        messenger:
            routing:
                # маршрутизуйте всі повідомлення, що розширюють цей приклад базового класу або інтерфейс
                'App\Message\AbstractAsyncMessage': async
                'App\Message\AsyncMessageInterface': async
    
                'My\Message\ToBeSentToTwoSenders': [async, audit]
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <framework:messenger>
                <!-- маршрутизуйте всі повідомлення, що розширюють цей приклад базового класу або інтерфейс -->
                <framework:routing message-class="App\Message\AbstractAsyncMessage">
                    <framework:sender service="async"/>
                </framework:routing>
                <framework:routing message-class="App\Message\AsyncMessageInterface">
                    <framework:sender service="async"/>
                </framework:routing>
                <framework:routing message-class="My\Message\ToBeSentToTwoSenders">
                    <framework:sender service="async"/>
                    <framework:sender service="audit"/>
                </framework:routing>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $messenger = $framework->messenger();
        // маршрутизуйте всі повідомлення, що розширюють цей приклад базового класу або інтерфейс
        $messenger->routing('App\Message\AbstractAsyncMessage')->senders(['async']);
        $messenger->routing('App\Message\AsyncMessageInterface')->senders(['async']);
        $messenger->routing('My\Message\ToBeSentToTwoSenders')->senders(['async', 'audit']);
    };
    

Note

Якщо ви сконфігуруєте маршрутизація і для дочірнього, і для батьківського класу, використовуються обидва правила. Наприклад, якщо у вас є об’єкт SmsNotification, що розширюється з Notification, будуть використані маршрутизації і для Notification, і для SmsNotification.

Сутності Doctrine у повідомленнях

Якщо вам потрібно передати сутність Doctrine у повідомленні, краще за все передати основний ключ сутності (або будь-яку релевантну інформацію, необхідну обробнику, на кшталт email, та ін.) заміть об’єкту:

// src/Message/NewUserWelcomeEmail.php
namespace App\Message;

class NewUserWelcomeEmail
{
    private $userId;

    public function __construct(int $userId)
    {
        $this->userId = $userId;
    }

    public function getUserId(): int
    {
        return $this->userId;
    }
}

Потім, у вашому обробнику, ви можете зробити запит свіжого об’єкту:

// src/MessageHandler/NewUserWelcomeEmailHandler.php
namespace App\MessageHandler;

use App\Message\NewUserWelcomeEmail;
use App\Repository\UserRepository;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class NewUserWelcomeEmailHandler implements MessageHandlerInterface
{
    private $userRepository;

    public function __construct(UserRepository $userRepository)
    {
        $this->userRepository = $userRepository;
    }

    public function __invoke(NewUserWelcomeEmail $welcomeEmail)
    {
        $user = $this->userRepository->find($welcomeEmail->getUserId());

        // ... відправити електронний лист!
    }
}

Це гарантує, що сутність містить свіжі дані.

Синхронна обробка повідомлень

Якщо повідомлення не співпадає з жодним правилом маршрутизації, воно не буде відправлене жодному транспорту, і буде оброблене негайно. В деяких випадках (наприклад, при `пов'язуванні обробників з різними транспортами`_), легше та гнучкіше обробити їх чітко: створивши транспорт sync та “відправивши” повідомлення у нього для негайної обробки:

  • YAML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    # config/packages/messenger.yaml
    framework:
        messenger:
            transports:
                # ... інші транспорти
    
                sync: 'sync://'
    
            routing:
                App\Message\SmsNotification: sync
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <framework:messenger>
                <!-- ... інші транспорти -->
    
                <framework:transport name="sync" dsn="sync://"/>
    
                <framework:routing message-class="App\Message\SmsNotification">
                    <framework:sender service="sync"/>
                </framework:routing>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $messenger = $framework->messenger();
    
        // ... інші транспорти
    
        $messenger->transport('sync')->dsn('sync://');
        $messenger->routing('App\Message\SmsNotification')->senders(['sync']);
    };
    

Створення вашого власного транспорту

Ви також можете створити власний транспорт, якщо вам потрібно відправляти або отримувати повідомлення через щось, що не підтримується. Див. How to Create Your own Messenger Transport.

Споживання повідомлень (запуск робітника)

Коли ваші повідомлення маршрутизовані, в більшості випадків, вам треба буде “споживати” їх. Ви можете зробити це за допомогою команди messenger:consume:

1
2
3
4
$ php bin/console messenger:consume async

# використайте -vv, щоб побачити деталі того що відбувається
$ php bin/console messenger:consume async -vv

Перший аргумент - це ім’я отримувача (або id сервісу, якщо ви маршрутизували до користувацького сервісу). За замовчуванням, команда буде виконуватися нескінченно: шукати нові повідомлення у вашому транспорті та обробляти їх. Ця команда називається вашим “робітником”.

Tip

Щоб правильно зупинити робітника, викликайте екземпляр StopWorkerException.

New in version 5.4: Клас StopWorkerException було представлено в Symfony 5.4.

Запуск у виробництво

У виробництві є декілька важливих речей, про які варто подумати:

Використовуйте Супервізора, щоб ваш(і) робітник(и) працювали
Ви хочете, щоб один або більше “робітників” працювали весь час. Щоб зробити це, використайте систему контролю процесу на кшталт Супервізора.
Не дозволяйте робітникам працювати нескінченно
Деякі сервіси (на кшталт EntityManager Doctrine) будуть споживати все більше пам’яті з часом. Тому замість того, щоб дозволяти вашому робітнику працювати завжди, використовуйте прапорець на кшталт messenger:consume --limit=10, щоб вказати робітнику, що він має обробити лише 10 повідомлень до припинення роботи (потім Супервізор створить новий процес). Також є інші опції на кшталт --memory-limit=128M и --time-limit=3600.
Перезавантажуйте робітників при запуску
Кожний раз при запуску вам треба буде перезавантажити всі процеси ваших робітників, щоб вони бачили новозапущений код. Щоб зробити це, виконайте messenger:stop-workers при запуску. Це сигналізує кожному робітнику, що він має завершити обробку поточного повідомлення та граціозно завершити роботу. Потім, Супервізор створить нові процеси робітників. Команда використовує кеш app внутрішньо - тому переконайтеся в тому, що він сконфігурований для використання адаптеру за вашим смаком.
Використовуйте одиин кеш між запусками
Якщо ваша стратегія запуску полягає у створенні нових цільових каталогів, вам необхідно встановити значення опції конфігурації cache.prefix.seed, щоб використовувати один і теж простір імен кешу між запусками. Інакше, пул cache.app буде використовувати значення параметру kernel.project_dir в якості бази для простору імен, що призведе до різних просторів імен кожний раз при запуску.

Пріоритизований транспорт

Іноді певні типи повідомлень повинні мати вищий пріоритет та бути оброблені до інших. Щоб зробити це можливим, ви можете створити декілька транспортів та маршрутизувати різні повідомлення до них. Наприклад:

  • YAML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    # config/packages/messenger.yaml
    framework:
        messenger:
            transports:
                async_priority_high:
                    dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                    options:
                        # queue_name відповідає транспорту doctrine
                        queue_name: high
    
                        # для AMQP відправте окремий обмін, а потім поставте у чергу
                        #exchange:
                        #    name: high
                        #queues:
                        #    messages_high: ~
                        # або спробуйте redis "group"
                async_priority_low:
                    dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                    options:
                        queue_name: low
    
            routing:
                'App\Message\SmsNotification':  async_priority_low
                'App\Message\NewUserWelcomeEmail':  async_priority_high
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <framework:messenger>
                <framework:transport name="async_priority_high" dsn="%env(MESSENGER_TRANSPORT_DSN)%">
                    <framework:options>
                        <framework:queue>
                            <framework:name>Queue</framework:name>
                        </framework:queue>
                    </framework:options>
                </framework:transport>
                <framework:transport name="async_priority_low" dsn="%env(MESSENGER_TRANSPORT_DSN)%">
                    <option key="queue_name">low</option>
                </framework:transport>
    
                <framework:routing message-class="App\Message\SmsNotification">
                    <framework:sender service="async_priority_low"/>
                </framework:routing>
                <framework:routing message-class="App\Message\NewUserWelcomeEmail">
                    <framework:sender service="async_priority_high"/>
                </framework:routing>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $messenger = $framework->messenger();
    
        $messenger->transport('async_priority_high')
            ->dsn('%env(MESSENGER_TRANSPORT_DSN)%')
            ->options(['queue_name' => 'high']);
    
        $messenger->transport('async_priority_low')
            ->dsn('%env(MESSENGER_TRANSPORT_DSN)%')
            ->options(['queue_name' => 'low']);
    
        $messenger->routing('App\Message\SmsNotification')->senders(['async_priority_low']);
        $messenger->routing('App\Message\NewUserWelcomeEmail')->senders(['async_priority_high']);
    };
    

Потім ви зможете запускати окремних робітників для кожного транспорту, або інструктувати одного робітника, щоб він обробляв повідомлення в порядку пріоритетності:

1
$ php bin/console messenger:consume async_priority_high async_priority_low

Робітник завжди спочатку шукатиме повідомлення, які очікують async_priority_high. Якщо таких немає, потім він споживатиме повідомлення з async_priority_low.

Обмежте споживання для конкретних черг

Деякий транспорт (особливо AMQP) має концепцію обміну та черг. Транспорт Symfony завжди пов’язаний з обміном. За замовчуванням, робітник споживає з усіх черг, з’єднаних з обміном вказаного транспорту. Однак є приклади використання, коли вам потрібно, щоб робітник споживав лише з конкретної черги.

Ви можете обмежити робітника, щоб він обробляв лише повідомлення з конкретних черг:

1
$ php bin/console messenger:consume my_transport --queues=fasttrack

Щоб дозволити використання опції queues, отримувач має реалізовувати QueueReceiverInterface.

New in version 5.3: Обмеження робітника для конкретних черг було представлено в Symfony 5.3.

Конфігурація супервізора

Супервізор - це чудовий інструмент для гарантії того, що процес(и) ваших робітників завжди виконується (навіть якщо він закривається у зв’язку з помилкою, досягненням ліміту повідомлень або завдяки messenger:stop-workers). Ви можете встановити його на Ubuntu, наприклад, через:

1
$ sudo apt-get install supervisor

Файли конфігурації Супервізора зазвичай живуть в каталозі /etc/supervisor/conf.d. Наприклад, ви можете створити там новий файл messenger-worker.conf, щоб переконатися, що 2 екземпляри messenger:consume працюють завжди:

1
2
3
4
5
6
7
8
9
;/etc/supervisor/conf.d/messenger-worker.conf
[program:messenger-consume]
command=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600
user=ubuntu
numprocs=2
startsecs=0
autostart=true
autorestart=true
process_name=%(program_name)s_%(process_num)02d

Змініть аргумент async, щоб він використовував назву вашого транспорту (або транспортів) та user на Unix-користувача на вашому сервері.

Якщо ви використовуєте транспорт Redis, відмітьте, що кожному робітнику необхідне унікальне ім’я споживача, щоб уникнути обробки одного повідомлення декількома робітниками. Один зі способів досягнути цього - встановити змінну середовища в файлі конфігурації Супервізора, на яку ви потім зможете послатися в messenger.yaml (див. розділ Redis вище):

1
environment=MESSENGER_CONSUMER_NAME=%(program_name)s_%(process_num)02d

Далі, вкажіть Супервізору прочитати вашу конфігурацію та запустити ваших робітників:

1
2
3
4
5
$ sudo supervisorctl reread

$ sudo supervisorctl update

$ sudo supervisorctl start messenger-consume:*

Див. документацію Супервізора, щоб дізнатися більше деталей.

Граціозне вимкнення

Якщо ви встановили у своєму проекті PHP-розширення PCNTL, робітники будуть обробляти POSIX сигнал SIGTERM для завершення обробки поточного повідомлення перед виходом.

У деяких випадках, сигнал SIGTERM відправляється самим Супервізором (наприклад, зупинка контейнера Docker в якому Супервізор - точка входу). В таких випадках, вам треба додати ключ stopwaitsecs до конфігурації програми (зі значенням бажаного періоду відстрочки в секундах) для того, щоб виконати граціозне вимкнення:

1
2
[program:x]
stopwaitsecs=20

Робітник без стану

PHP створений бути без стану, різни запити не мають спільних джерел. В HTTP-контексті PHP очищує все перед відправленням відповіді, тому ви можете вирішити не турбуватися про сервіси, які можуть допускати витік пам’яті.

З іншої сторони, робітники зазвичай працюють у довгострокових CLI-процесах, які не завершуються після обробки повідомлення. Тому вам треба бути обережними зі станами сервісів, щоб вони не давали витік інформації та/або пам’яті з одного повідомлення у інше.

Однак, деякі сервіси Symfony, на кшталт обробника fingers crossed Monolog, допускають витік за своїм задумом. В таких випадках, використайте опцію транспорту reset_on_message, щоб автоматично скинути сервіс-контейнер між двома повідомленнями:

  • YAML
    1
    2
    3
    4
    5
    6
    7
    # config/packages/messenger.yaml
    framework:
        messenger:
            reset_on_message: true
            transports:
                async:
                    dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <framework:messenger>
                <framework:transport name="async" dsn="%env(MESSENGER_TRANSPORT_DSN)%" reset-on-message="true">
                </framework:transport>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $messenger = $framework->messenger();
    
        $messenger->transport('async')
            ->dsn('%env(MESSENGER_TRANSPORT_DSN)%')
            ->resetOnMessage(true)
        ;
    };
    

New in version 5.4: Опція reset_on_message була представлена в Symfony 5.4.

Повторні спроби та помилки

Якщо під час споживання повідомлення з транспорту буде викликане виключення, воно буде автоматично повторно відправлене транспорту, щоб спробувати знову. За замовчуванням, повідомлення має 3 спроби перед скиданням або відправкою транспорту помилок. Кожна спроба буде також відкладена в часі, на випадок, якщо вона була викликана тимчасовою проблемою. Все це можна сконфігурувати для кожного транспорту:

  • YAML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    # config/packages/messenger.yaml
    framework:
        messenger:
            transports:
                async_priority_high:
                    dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
    
                    # конфігурація за замовчуванням
                    retry_strategy:
                        max_retries: 3
                        # затримка в мілісекундах
                        delay: 1000
                        # робить так, щоб затримка була довшою перед кожною наступною спробою
                        # наприклад, затримка в 1 секунду, 2 секунди, 4 секунди
                        multiplier: 2
                        max_delay: 0
                        # перевизначити це все сервісом, який реалізує
                        # Symfony\Component\Messenger\Retry\RetryStrategyInterface
                        # service: null
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <framework:messenger>
                <framework:transport name="async_priority_high" dsn="%env(MESSENGER_TRANSPORT_DSN)%?queue_name=high_priority">
                    <framework:retry-strategy max-retries="3" delay="1000" multiplier="2" max-delay="0"/>
                </framework:transport>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $messenger = $framework->messenger();
    
        $messenger->transport('async_priority_high')
            ->dsn('%env(MESSENGER_TRANSPORT_DSN)%')
            // конфігурація за замовчуванням
            ->retryStrategy()
                ->maxRetries(3)
                // затримка в мілісекундах
                ->delay(1000)
                // робить так, щоб затримка була довшою перед кожною наступною спробою
                // наприклад, затримка в 1 секунду, 2 секунди, 4 секунди
                ->multiplier(2)
                ->maxDelay(0)
                // перевизначити це все сервісом, який реалізує
                // Symfony\Component\Messenger\Retry\RetryStrategyInterface
                ->service(null)
        ;
    };
    

Tip

Symfony запускає WorkerMessageRetriedEvent, коли повідомлення має повторні спроби, тому ви можете виконати власну логіку.

New in version 5.2: Клас WorkerMessageRetriedEvent було представлено в Symfony 5.2.

Уникання повторних спроб

Іноді обробка повідомлення може бути невдалою, і ви будете знати, що це перманентно і повторних спроб не потрібно. Якщо ви викличете UnrecoverableMessageHandlingException, повідомлення не матиме повторних спроб.

Форсування повторних спроб

New in version 5.1: RecoverableMessageHandlingException було представлено в Symfony 5.1.

Іноді обробка повідомлення може бути невдалою, і ви будете знати, що це тимчасово, і необхідна повторна спроба. Якщо ви викличете RecoverableMessageHandlingException, повідомлення завжди матиме повторну спробу.

Збереження та повторні спроби невдалих повідомлень

Якщо повідомлення зазнає невдачі та має декілька повторних спроб (max_retries), воно потім буде відбраковане. Щоб уникнути цього, ви можете сконфігурувати failure_transport:

  • YAML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    # config/packages/messenger.yaml
    framework:
        messenger:
            # після повторних спроб, повідомлення будуть відправлені транспорту "failed"
            failure_transport: failed
    
            transports:
                # ... інший транспорт
    
                failed: 'doctrine://default?queue_name=failed'
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <!-- після повторних спроб, повідомлення будуть відправлені транспорту "failed" -->
            <framework:messenger failure-transport="failed">
                <!-- ... інший транспорт -->
    
                <framework:transport name="failed" dsn="doctrine://default?queue_name=failed"/>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $messenger = $framework->messenger();
    
        // після повторних спроб, повідомлення будуть відправлені транспорту "failed"
        $messenger->failureTransport('failed');
    
        // ... інший транспорт
    
        $messenger->transport('failed')
            ->dsn('doctrine://default?queue_name=failed');
    };
    

У цьому прикладі, якщо обробка повідомлення зазнає невдачі 3 рази (значення за замовчуванням max_retries), воно потім буде відправлене транспорту failed. Хоча ви можете використати messenger:consume failed для споживання його, як звичайного транспорту, вам скоріше захочеться вручну переглянути повідомлення в транспорті помилок та вирішити щодо повторних спроб:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# побачити всі повідомлення в транспорті помилок
$ php bin/console messenger:failed:show

# побачити деталі конкретної помилки
$ php bin/console messenger:failed:show 20 -vv

# побачити та повторно спробувати кожне повідомлення окремо
$ php bin/console messenger:failed:retry -vv

# повторна спроба конкретних повідомлень
$ php bin/console messenger:failed:retry 20 30 --force

# видалити повідомлення без повторної спроби
$ php bin/console messenger:failed:remove 20

# видалити повідомлення без повторних спроб та показати кожне повідомлення перед видаленням
$ php bin/console messenger:failed:remove 20 30 --show-messages

New in version 5.1: Опція --show-messages була представлена в Symfony 5.1.

Якщо повідомлення знов зазнає невдачі, воно буде відправлене назад у транспорт помилок у відповідності зі звичайними правилами повторних спроб. Як тільки буде досягнено максимум повторних спроб, повідомлення буде скинуте перманентно.

Декілька помилкових транспортів

New in version 5.3: Можливість використовувати декілька транспортів помилок була представлена в Symfony 5.3.

Іноді недостатньо мати один глобальний сконфігурований failed transport, тому що деякі повідомлення важливіші за інші. В таких випадках ви можете перевизначити транспорт помилок лише для певних транспортів:

  • YAML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    # config/packages/messenger.yaml
    framework:
        messenger:
            # після повторних спроб повідомлення будуть відправлені транспорту "failed"
            # за замовчуванням, якщо всередині транспорту не сконфігурований "failed_transport"
            failure_transport: failed_default
    
            transports:
                async_priority_high:
                    dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                    failure_transport: failed_high_priority
    
                # так як транспорт помилок не сконфігурований, буде використовуватися встановлений
                # глобальний набір "failure_transport"
                async_priority_low:
                    dsn: 'doctrine://default?queue_name=async_priority_low'
    
                failed_default: 'doctrine://default?queue_name=failed_default'
                failed_high_priority: 'doctrine://default?queue_name=failed_high_priority'
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <!-- після повторних спроб повідомлення будуть відправлені транспорту "failed"
            за замовчуванням, якщо всередині транспорту не сконфігурований "failed_transport" -->
            <framework:messenger failure-transport="failed_default">
                <framework:transport name="async_priority_high" dsn="%env(MESSENGER_TRANSPORT_DSN)%" failure-transport="failed_high_priority"/>
                <!-- так як транспорт помилок не сконфігурований, буде використовуватися встановлений
                глобальний набір "failure_transport" -->
                <framework:transport name="async_priority_low" dsn="doctrine://default?queue_name=async_priority_low"/>
    
                <framework:transport name="failed_default" dsn="doctrine://default?queue_name=failed_default"/>
                <framework:transport name="failed_high_priority" dsn="doctrine://default?queue_name=failed_high_priority"/>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $messenger = $framework->messenger();
    
        // після повторних спроб повідомлення будуть відправлені транспорту "failed"
        // за замовчуванням, якщо всередині транспорту не сконфігурований "failed_transport"
        $messenger->failureTransport('failed_default');
    
        $messenger->transport('async_priority_high')
            ->dsn('%env(MESSENGER_TRANSPORT_DSN)%')
            ->failureTransport('failed_high_priority');
    
        // так як транспорт помилок не сконфігурований, буде використовуватися встановлений
        // глобальний набір "failure_transport"
       $messenger->transport('async_priority_low')
            ->dsn('doctrine://default?queue_name=async_priority_low');
    
       $messenger->transport('failed_default')
            ->dsn('doctrine://default?queue_name=failed_default');
    
       $messenger->transport('failed_high_priority')
            ->dsn('doctrine://default?queue_name=failed_high_priority');
    };
    

Якщо немає визначеного failure_transport глобально або на рівні транспорту, повідомлення буде скинуте після певної кількості спроб.

Невдалі команди мають необов’язкову опцію --transport, щоб вказати failure_transport, сконфігурований на рівні транспорту.

1
2
3
4
5
6
7
8
# побачити всі повідомлення в транспорті "failure_transport"
$ php bin/console messenger:failed:show --transport=failure_transport

# повторна спроба конкретних повідомлень з "failure_transport"
$ php bin/console messenger:failed:retry 20 30 --transport=failure_transport --force

# видалити повідомлення без повторної спроби з "failure_transport"
$ php bin/console messenger:failed:remove 20 --transport=failure_transport

Конфігурація транспорту

Месенджер підтримує декілька різних типів транспорту, кожний зі своїми опціями. Опції можуть бути передані транспорту через рядок DSN або конфігурацію.

1
2
# .env
MESSENGER_TRANSPORT_DSN=amqp://localhost/%2f/messages?auto_setup=false
  • YAML
    1
    2
    3
    4
    5
    6
    7
    8
    # config/packages/messenger.yaml
    framework:
        messenger:
            transports:
                my_transport:
                    dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
                    options:
                        auto_setup: false
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <framework:messenger>
                <framework:transport name="my_transport" dsn="%env(MESSENGER_TRANSPORT_DSN)%">
                    <framework:options auto-setup="false"/>
                </framework:transport>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $messenger = $framework->messenger();
    
        $messenger->transport('my_transport')
            ->dsn('%env(MESSENGER_TRANSPORT_DSN)%')
            ->options(['auto_setup' => false]);
    };
    

Опції, визначені під options, панують над визначеними в DSN.

Транспорт AMQP

Транспорт AMQP використовує PHP-розширення AMQP для відправлення повідомлень в чергу на кшталт RabbitMQ.

New in version 5.1: Починаючи з Symfony 5.1, транспорт AMQP переїхав у окремий пакет. Встановіть його, виконавши:

1
$ composer require symfony/amqp-messenger

DSN транспорту AMQP може виглядати так:

1
2
3
4
5
# .env
MESSENGER_TRANSPORT_DSN=amqp://guest:[email protected]:5672/%2f/messages

# або використайте протокол AMQPS
MESSENGER_TRANSPORT_DSN=amqps://guest:[email protected]/%2f/messages

New in version 5.2: Підтримка протоколу AMQPS була представлена в Symfony 5.2.

Якщо ви хочете виокристати AMQP, закодований за допомогою TLS/SSL, ви маєте також надати CA-сертифікат. Визначіть шлях сертифікату в налаштуванні PHP.ini amqp.cacert (наприклад, amqp.cacert = /etc/ssl/certs) або в параметрі DSN cacert (наприклад, amqps://localhost?cacert=/etc/ssl/certs/).

Порт, за замовчуванням використовуваний AMQP, закодованим за допомогою TLS/SSL, - 5671, але ви можете перевизначити його в параметрі DSN port (наприклад, amqps://localhost?cacert=/etc/ssl/certs/&port=12345).

Note

За замовчуванням, транспорт автоматично створюватиме будь-які необхідні обміни, черги та сполучні ключі. Це можна відключити, але деяків функції можуть працювати некоректно (на кшталт відкладених черг).

Note

З Symfony 5.3 або новіше, ви можете обмежити споживачів транспорту AMQP, щоб вони обробляли лише повідомлення з деяких черг якогось обміну. Див. Limit Consuming to Specific Queues.

Транспорт має інші опції, включно із способами конфігурації обміну, сполучними ключами, чергами і т.д. Дивіться документацію Connection.

Транспорт має ряд опцій:

New in version 5.2: Опція confirm_timeout була представлена в Symfony 5.2.

Deprecated since version 5.3: Опція prefetch_count застаріла в Symfony 5.3 так як вона не має ефекту в транспорті AMQP месенджера.

Ви можете також сконфігурувати налаштування спеціально для AMQP у вашому повідомленні, додавши AmqpStamp до вашого Конверту:

use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp;
// ...

$attributes = [];
$bus->dispatch(new SmsNotification(), [
    new AmqpStamp('custom-routing-key', AMQP_NOPARAM, $attributes),
]);

Caution

Споживачі не відображаються в панелі адміна, так як цей транспорт не покладається на \AmqpQueue::consume(), який блокує. Наявність блокуючого отримувача робить опції --time-limit/--memory-limit команди messenger:consume. а також команди messenger:stop-workers марними, так як вони покладаються на той факт, що отримувач повертається негайно, незалежно від того, знаходить він повідомлення, чи ні. Робітник споживання відповідає за ітерацію до отримання повідомлення для обробки та/або до того, як буде досягнена одна з умов зупинки. Таким чином, логіка зупинки робітника може бути досягнена, якщо він застряг на блокуючому виклику.

Транспорт Doctrine

Транспорт Doctrine може бути використаний для зберігання повідомлень в таблиці бази даних.

New in version 5.1: Починаючи з Symfony 5.1, транспорт Doctrine було переміщено в окремий пакет. Встановіть його, виконавши:

1
$ composer require symfony/doctrine-messenger

DSN транспорту Doctrine може виглядати так:

1
2
# .env
MESSENGER_TRANSPORT_DSN=doctrine://default

Формат doctrine://<connection_name>, у випадку, якщо у вас є декілька з’єднань і ви хочете використати якесь відмінне від “default”. Транспорт буде автоматично створювати таблицю під назвою messenger_messages.

New in version 5.1: Можливість автоматично генерувати міграцію для таблиці messenger_messages була вредставлена в Symfony 5.1 та DoctrineBundle 2.1.

Або, для створення таблиці самостійно, встановіть опцію auto_setup як false, та згенеруйте міграцію.

Caution

Властивість повідомлень datetime, яка зберігається в базі даних, використовує часову зону поточної системи. Це може викликати проблеми, якщо декілька машин з різними конфігураціями часових зон використовують одне сховище.

Транспорт має такі опції:

Опція Опис За замовчуванням
table_name Назва таблиці messenger_messages
queue_name Назва черги (стовпчик в таблиці, щоб вкиористовувати одну таблицю для декількох транспортів) default
redeliver_timeout Тайм-аут перед повторною спробою повідомлення в черзі, але не в стані “обробки” (якщо робітник за якоїсь причини зупинився, станеться це, і зрештою ви маєте знову спробувати повідомлення) - в секундах. 3600
auto_setup Чи має таблиця бути створена автоматично під час відправки/отримання. true

New in version 5.1: Можливість використовувати PostgreSQL LISTEN/NOTIFY була представлена в Symfony 5.1.

При використанні PostgreSQL, у вас є доступ до наступних опцій для отримання переваг функції LISTEN/NOTIFY. Це дозволяє більш продуктивний підхід, ніж поведінка голосування транспорту Doctrine за замовчуванням, так як PostgreSQL буде напряму повідомляти робітників, коли нове повідомлення з’являтиметься в таблиці.

Опція Опис За замовчуванням
use_notify Чи використовувати LISTEN/NOTIFY. true
check_delayed_interval Інтервал перевірки відкладених повідомлень в мілісекундах. Встановіть як 0, щоб відключити перевірку. 1000
get_notify_timeout Тривалість часу очікування відповіді при виклику PDO::pgsqlGetNotify`, в мілісекундах. 0

Транспорт Beanstalkd

New in version 5.2: Транспорт Beanstalkd було представлено в Symfony 5.2.

Транспорт Beanstalkd відправляє повідомлення прямо в робочу чергу Beanstalkd. Встановіть його, виконавши:

1
$ composer require symfony/beanstalkd-messenger

DSN транспорту Beanstalkd може виглядати так:

1
2
3
4
5
# .env
MESSENGER_TRANSPORT_DSN=beanstalkd://localhost:11300?tube_name=foo&timeout=4&ttr=120

# Якщо порту немає, він за замовчуванням буде 11300
MESSENGER_TRANSPORT_DSN=beanstalkd://localhost

Транспорт має ряд опцій:

Опція Опис За замовчуванням
tube_name Назва черги default
timeout Тайм-аут резерву повідомлення - в секундах. 0 (змусить сервер одразу ж або повернути відповідь, або викликати TransportException)
ttr Час виконання повідомлення перед тим як помістити його назад у чергу готовності - в секундах. 90

Транспорт Redis

Транспорт Redis використовує потоки для створення черги повідомлень. Цей транспорт вимагає PHP-розширення Redis (>=4.3) та працюючого серверу Redis (^5.0).

New in version 5.1: Починаючи з Symfony 5.1, транспорт Redis було переміщено в окремий пакет. Встановіть його, виконавши:

1
$ composer require symfony/redis-messenger

DSN транспорту Redis може виглядати так:

1
2
3
4
5
6
7
8
# .env
MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
# Повний приклад DSN
MESSENGER_TRANSPORT_DSN=redis://[email protected]:6379/messages/symfony/consumer?auto_setup=true&serializer=1&stream_max_entries=0&dbindex=0&delete_after_ack=true
# Приклад кластеру Redis
MESSENGER_TRANSPORT_DSN=redis://host-01:6379,redis://host-02:6379,redis://host-03:6379,redis://host-04:6379
# Приклад Unix-сокету
MESSENGER_TRANSPORT_DSN=redis:///var/run/redis.sock

New in version 5.1: DSN Unix-сокету була представлена в Symfony 5.1.

Деякі опції можуть бути сконфігуровані через DSN або ключ options під транспортом в messenger.yaml:

Caution

Ніколи не повинно бути більше однієї команди messenger:consume, виконуваної з однаковою комбінацією stream, group та consumer, інашке повідомлення можуть бути оброблені більше, ніж один раз. Якщо ви запускаєте декілька робітників черги, consumer може бути встановлено як змінну оточення (на кшталт %env(MESSENGER_CONSUMER_NAME)%), встановлено Супервізором (приклад нижче) або будь-яким іншим сервісом, використовуваним для управління процесами робітників. В середовищі контейнера, HOSTNAME може бути використано як ім’я споживача, так як там лише один робітник на контейнер/хостинг. Якщо ви використовуєте Kubernetes для управління контейнерами, розгляньте використання StatefulSet для стабілізації імен.

Tip

Встановіть delete_after_ack як true (якщо у вас одна група) або визначте stream_max_entries (якщо ви можете припустити, яка максимальна кількість записів припустима у вашому випадку), щоб уникнути витоків пам’яті. В іншому випадку, всі повідомлення назавжди залишаться в Redis.

New in version 5.1: Опції delete_after_ack, redeliver_timeout и claim_interval були представлені в Symfony 5.1.

New in version 5.2: Опції delete_after_reject і lazy були представлені в Symfony 5.2.

New in version 5.4: Опції sentinel_persistent_id, sentinel_retry_interval, sentinel_read_timeout, sentinel_timeout, і sentinel_master були представлені в Symfony 5.4.

Deprecated since version 5.4: Відсутність установки чіткого значення для опції delete_after_ack не заохочується починаючи з версії Symfony 5.4. В Symfony 6.0, значення цієї опції за замовчуванням змінюється з false на true.

Транспорт у пам’яті

Транспорт in-memory насправді не доставляє повідомлення. Замість цього, він тримає їх у пам’яті під час запиту, що може бути корисним для тестування. Наприклад, якщо у вас є транспорт async_priority_normal, ви можете перевизначати його в середовищі test, щоб використати цей транспорт:

  • YAML
    1
    2
    3
    4
    5
    # config/packages/test/messenger.yaml
    framework:
        messenger:
            transports:
                async_priority_normal: 'in-memory://'
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    <!-- config/packages/test/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <framework:messenger>
                <framework:transport name="async_priority_normal" dsn="in-memory://"/>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
    1
    2
    3
    4
    5
    6
    7
    8
    9
    // config/packages/test/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $messenger = $framework->messenger();
    
        $messenger->transport('async_priority_normal')
            ->dsn('in-memory://');
    };
    

Тоді під час тестування повідомлення не будуть відправлені реальному транспорту. Навіть краще, у тесті, ви можете перевірити, щоб лише одне повідомлення було відправлене під час запиту:

// tests/Controller/DefaultControllerTest.php
namespace App\Tests\Controller;

use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;
use Symfony\Component\Messenger\Transport\InMemoryTransport;

class DefaultControllerTest extends WebTestCase
{
    public function testSomething()
    {
        $client = static::createClient();
        // ...

        $this->assertSame(200, $client->getResponse()->getStatusCode());

        /* @var InMemoryTransport $transport */
        $transport = self::$container->get('messenger.transport.async_priority_normal');
        $this->assertCount(1, $transport->getSent());
    }
}

Транспорт має ряд опцій:

serialize (бульову, за замовчуванням: false)
Чи сериалізувати повідомлення. Це корисно для тестування додатковго шару, особливо коли ви використовуєте власний сериалізатор повідомлень.

New in version 5.3: Опція serialize була представлена в Symfony 5.3.

Note

Всі транспорти in-memory будуть автоматично скинуті після кожного тесту в класах тестів, що розширюють KernelTestCase або WebTestCase.

Amazon SQS

New in version 5.1: Транспорт Amazon SQS було представлено в Symfony 5.1.

Транспорт Amazon SQS чудово підходить для додатку на AWS. Встановіть його, виконавши:

1
$ composer require symfony/amazon-sqs-messenger

DSN SQS транспорту виглядає так:

1
2
3
# .env
MESSENGER_TRANSPORT_DSN=https://sqs.eu-west-3.amazonaws.com/123456789012/messages?access_key=AKIAIOSFODNN7EXAMPLE&secret_key=j17M97ffSVoKI0briFoo9a
MESSENGER_TRANSPORT_DSN=sqs://localhost:9494/messages?sslmode=disable

Note

Транспорт автоматично створить необхідні черги. Це можна відключити, встановивши опцію auto_setup як false.

Tip

До відправлення або отримання повідомлення, Symfony необхідно конвертувати назву черги в URL черги AWS, викликавши API GetQueueUrl в AWS. Цього додаткового API-виклику можна уникнути, надавши DSN, яка є URL черги.

New in version 5.2: Функція надання URL черги в DSN була представлена в Symfony 5.2.

Транспорт має ряд опцій:

Опція Опис За замовчуванням
access_key Ключ доступу AWS  
account Ідентифікатор AWS-аккаунту Володар облікових даних
auto_setup Чи потрібно автоматично створювати чергу під час відправки/отримання. true
buffer_size Кількість повідомлень для попереднього вилучення 9
debug Якщо true, веде лог всіх HTTP запитів та відповідей (це впливає на продуктивність) false
endpoint Абсолютний URL до SQS-сервісу https://sqs.eu-west-1.amazonaws.com
poll_timeout Час очікування нового повідомлення в секундах 0.1
queue_name Назва черги messages
region Назва AWS-регіону eu-west-1
secret_key Секретныи ключ AWS  
visibility_timeout Кількість секунд, протягом яких повідомлення не буде видимим (Visibility Timeout) Конфігурація черги
wait_time Тривалість Long polling в секундах 20

New in version 5.3: Опція debug була представлена в Symfony 5.3.

Note

Параметр wait_time визначає максимальний час очікування для Amazon SQS до того, як повідомлення буде доступне в черзі, перед відправленням відповіді. Це допомагає знизити вартість використання Amazon SQS, усуваючи деяку кількість пустих відповідей.

Параметр poll_timeout визначає час очікування отримувача до повернення null. Він уникає блокування інших отримувачів від виклику.

Note

Якщо назва черги має суфікс .fifo, AWS створить чергу FIFO. Використайте марку AmazonSqsFifoStamp, щоб визначити Message group ID і Message deduplication ID.

Черги FIFO не підтримують установки затримки окремих повідомлень, значення delay: 0 потребується в налаштуваннях стратегії повторних спроб.

Сериалізація повідомлень

Коли повідомлення відправляються (та отримуються) в транпсорт, вони сериалізуються з використанням нативних функцій PHP serialize() і unserialize(). Ви можете змінити це глобально (або для кожного транспорту) на сервіс, який реалізує SerializerInterface:

  • YAML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    # config/packages/messenger.yaml
    framework:
        messenger:
            serializer:
                default_serializer: messenger.transport.symfony_serializer
                symfony_serializer:
                    format: json
                    context: { }
    
            transports:
                async_priority_normal:
                    dsn: # ...
                    serializer: messenger.transport.symfony_serializer
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <framework:messenger>
                <framework:serializer default-serializer="messenger.transport.symfony_serializer">
                    <framework:symfony-serializer format="json">
                        <framework:context/>
                    </framework:symfony-serializer>
                </framework:serializer>
    
                <framework:transport name="async_priority_normal" dsn="..." serializer="messenger.transport.symfony_serializer"/>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $messenger = $framework->messenger();
    
        $messenger->serializer()
            ->defaultSerializer('messenger.transport.symfony_serializer')
            ->symfonySerializer()
                ->format('json')
                ->context('foo', 'bar');
    
        $messenger->transport('async_priority_normal')
            ->dsn(...)
            ->serializer('messenger.transport.symfony_serializer');
    };
    

messenger.transport.symfony_serializer - це вбудований сервіс, який використовує компонент Сериалізатор і може бути сконфігурований декількома способами. Якщо ви оберете використовувати сериалізатор Symfony, ви зможете контролювати контекст для кожного випадку окремо через SerializerStamp (див. Конверти та марки).

Tip

При відправленні/отримаванні повідомлен в/з іншого транспорту, вам може знадобитися більше контролю над процесом сериалізації. Використання користувацького сериалізатору надає такий контроль. Див. `Туторіал по сериалізації повідомлень SymfonyCasts`_, щоб дізнатися більше.

Налаштування обробників

Конфігурація обробників вручну

Symfony зазвичай буде знаходити та реєструвати вашого обробника автоматично. Але ви також можете сконфігурувати його вручну - і передати йому додаткову конфігурацію - тегувавши сервіс обробника messenger.message_handler

  • YAML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    # config/services.yaml
    services:
        App\MessageHandler\SmsNotificationHandler:
            tags: [messenger.message_handler]
    
            # або сконфігурувати з опціями
            tags:
                -
                    name: messenger.message_handler
                    # небхідно лише якщо неможливо вгадати за підказкою
                    handles: App\Message\SmsNotification
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    <!-- config/services.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd">
    
        <services>
            <service id="App\MessageHandler\SmsNotificationHandler">
                 <!-- handles небхідно лише якщо неможливо вгадати за підказкою -->
                 <tag name="messenger.message_handler"
                      handles="App\Message\SmsNotification"/>
            </service>
        </services>
    </container>
    
  • PHP
    1
    2
    3
    4
    5
    6
    7
    8
    9
    // config/services.php
    use App\Message\SmsNotification;
    use App\MessageHandler\SmsNotificationHandler;
    
    $container->register(SmsNotificationHandler::class)
        ->addTag('messenger.message_handler', [
            // небхідно лише якщо неможливо вгадати за підказкою
            'handles' => SmsNotification::class,
        ]);
    

Можливі опції конфігурації з тегами:

  • bus
  • from_transport
  • handles
  • method
  • priority

Підписник та опції обробника

Клас обробника може обробляти багато повідомлень або конфігурувати сам себе, реалізуючи MessageSubscriberInterface:

// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;

use App\Message\OtherSmsNotification;
use App\Message\SmsNotification;
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;

class SmsNotificationHandler implements MessageSubscriberInterface
{
    public function __invoke(SmsNotification $message)
    {
        // ...
    }

    public function handleOtherSmsNotification(OtherSmsNotification $message)
    {
        // ...
    }

    public static function getHandledMessages(): iterable
    {
        // обробити це повідомлення в __invoke
        yield SmsNotification::class;

        // також обробити це повідомлення в handleOtherSmsNotification
        yield OtherSmsNotification::class => [
            'method' => 'handleOtherSmsNotification',
            //'priority' => 0,
            //'bus' => 'messenger.bus.default',
        ];
    }
}

Пов’язування обробників з різними транспортами

Кожне повідомлення може мати декілька обробників, і коли повідомлення споживається, викликаються всі його оброники. Але ви можете також сконфігурувати обробника так, щоб він викликався лише тоді, коли повідомлення отримане з конкретного транспорту. Це дозволяє вам мати одне повідомлення, де кожний обробник викликається різними “робітниками”, що споживають різний транспорт.

Уявіть, що у вас є повідомлення UploadedImage з двома обробниками:

  • ThumbnailUploadedImageHandler: ви хочете, щоб цей оброблялося транспортом під назвою image_transport
  • NotifyAboutNewUploadedImageHandler: ви хочете, щоб цей оброблялося транспортом під назвою async_priority_normal

Щоб зробити це, додайте опцію from_transport до кожного обробника. Наприклад:

// src/MessageHandler/ThumbnailUploadedImageHandler.php
namespace App\MessageHandler;

use App\Message\UploadedImage;
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;

class ThumbnailUploadedImageHandler implements MessageSubscriberInterface
{
    public function __invoke(UploadedImage $uploadedImage)
    {
        // створити мініатюри
    }

    public static function getHandledMessages(): iterable
    {
        yield UploadedImage::class => [
            'from_transport' => 'image_transport',
        ];
    }
}

І, схожим чином:

// src/MessageHandler/NotifyAboutNewUploadedImageHandler.php
// ...

class NotifyAboutNewUploadedImageHandler implements MessageSubscriberInterface
{
    // ...

    public static function getHandledMessages(): iterable
    {
        yield UploadedImage::class => [
            'from_transport' => 'async_priority_normal',
        ];
    }
}

Потім, переконайтеся, що “маршрутизуєте” ваше повідомлення до обох транспортів:

  • YAML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    # config/packages/messenger.yaml
    framework:
        messenger:
            transports:
                async_priority_normal: # ...
                image_transport: # ...
    
            routing:
                # ...
                'App\Message\UploadedImage': [image_transport, async_priority_normal]
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <framework:messenger>
                <framework:transport name="async_priority_normal" dsn="..."/>
                <framework:transport name="image_transport" dsn="..."/>
    
                <framework:routing message-class="App\Message\UploadedImage">
                    <framework:sender service="image_transport"/>
                    <framework:sender service="async_priority_normal"/>
                </framework:routing>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $messenger = $framework->messenger();
    
        $messenger->transport('async_priority_normal')->dsn(...);
        $messenger->transport('image_transport')->dsn(...);
    
        $messenger->routing('App\Message\UploadedImage')
            ->senders(['image_transport', 'async_priority_normal']);
    };
    

Ось і все! Тепер ви можете споживати кожний транспорт:

1
2
3
4
# викличе ThumbnailUploadedImageHandler лише при обробці повідомлення
$ php bin/console messenger:consume image_transport -vv

$ php bin/console messenger:consume async_priority_normal -vv

Caution

Якщо обробник не має конфігурації from_transport, він буде виконаний в кожному транспорті, з якого буде отримано це повідомлення.

Розширення месенджера

Конверти та марки

Повідомлення може бути будь-яким PHP-об’єктом. Інколи вам може знадобитися сконфігурувати щось додаткове в повідомленні - на кшталт того, як воно має бути оброблене всередині AMQP або додавання затримки перед обробкою повідомлення. Ви можете зробити це, додавши марку (“stamp”) до вашого повідомлення:

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;

public function index(MessageBusInterface $bus)
{
    $bus->dispatch(new SmsNotification('...'), [
        // почекати 5 секунд перед обробкою
        new DelayStamp(5000),
    ]);

    // ябо чітко створіть Конверт
    $bus->dispatch(new Envelope(new SmsNotification('...'), [
        new DelayStamp(5000),
    ]));

    // ...
}

Внутрішньо, кожне повідомлення огортається в конверт (Envelope), який містить повідомлення та марки. Ви можете створити його вручну або дозволити автобусу повідомлень зробити це. Існує багато різних марок для різних цілей і вони використовуються внутрішньо для відслідковування інформації про повідомлення - на кшталт того, який автобус обробляє його і чи має воно повторні спроби після невдачі.

Проміжкове ПЗ

Те, що відбувається після запуску повідомлення в автобус повідомлень, залежить від його набору проміжкового ПЗ та його порядку. За замовчуванням, проміжкове ПЗ, сконфігуроване для кожного атрибуту, виглядає так:

  1. add_bus_name_stamp_middleware - додає марку для запису того, в якому автобусі було запущено це повідомлення;
  2. dispatch_after_current_bus - див. Transactional Messages: Handle New Messages After Handling is Done;
  3. failed_message_processing_middleware - обробляє повідомлення, які мають повторні спроби через транспорт помилок, щоб вони правильно функціонували, ніби-то вони були отримані з початкового транспорту;
  4. Ваша власна колекція middleware_;
  5. send_message - якщо маршрутизація сконфігурована для транспорту, відправляє повідомлення цьому трнаспорту та зупиняє ланцюжок проміжкового ПЗ;
  6. handle_message - викликає обробника(ів) повідомлень для заданого повідомлення.

Note

Ці назви проміжкового ПЗ - насправді скорочення. Справжні id сервісів мають префікс messenger.middleware. (наприклад,``messenger.middleware.handle_message``).

Проміжкове ПЗ виконується після запуску повідомлення, а також ще раз, коли повідомлення отримане через робітника (для повідомлень, які були відправлені транспорту для асинхронної обробки). Пам’ятайте це, якщо ви створите власне проміжкове ПЗ.

Ви можете додати власне проміжкове ПЗ в список, або повністю відключити проміжкове ПЗ за замовчуванням і додати лише ваше власне:

  • YAML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    # config/packages/messenger.yaml
    framework:
        messenger:
            buses:
                messenger.bus.default:
                    # відключити проміжкове ПЗ за замовчуванням
                    default_middleware: false
    
                    # та/або додати ваше власне
                    middleware:
                        # id сервісів, що реалізують Symfony\Component\Messenger\Middleware\MiddlewareInterface
                        - 'App\Middleware\MyMiddleware'
                        - 'App\Middleware\AnotherMiddleware'
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <framework:messenger>
                <!-- default-middleware: відключити проміжкове ПЗ за замовчуванням -->
                <framework:bus name="messenger.bus.default" default-middleware="false"/>
    
                <!-- та/або додати ваше власне -->
                <framework:middleware id="App\Middleware\MyMiddleware"/>
                <framework:middleware id="App\Middleware\AnotherMiddleware"/>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $messenger = $framework->messenger();
    
        $bus = $messenger->bus('messenger.bus.default')
            ->defaultMiddleware(false);
        $bus->middleware()->id('App\Middleware\MyMiddleware');
        $bus->middleware()->id('App\Middleware\AnotherMiddleware');
    };
    

Note

Якщо сервіс проміжкового ПЗ абстрактний, буде створено другий екземпляр сервісу для кожного автобусу.

Проміжкове ПЗ для Doctrine

New in version 1.11: Наступне проміжкове ПЗ для Doctrine було представлено в DoctrineBundle 1.11.

Якщо ви у своєму додатку використовуєте Doctrine, існує ряд необов’язкового проміжкового ПЗ, яке ви можете захотіти використати:

  • YAML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    # config/packages/messenger.yaml
    framework:
        messenger:
            buses:
                command_bus:
                    middleware:
                        # кожний раз при обробці повідомлення, з'єднання Doctrine
                        # "пінгується" і повторно підключається, якщо воно закрите. Корисно,
                        # якщо ваші робітники працюють довгий час і з'єднання бази даних
                        # інколи втрачається
                        - doctrine_ping_connection
    
                        # Після обробки, з'єднання Doctrine закривається, що може
                        # звільнити з'єднання бази даних в робітнику, замість того,
                        # щоб утримувати їх відкритими завжди
                        - doctrine_close_connection
    
                        # огортає всіх обробників в одну транзакцію Doctrine
                        # обробникам не потрібно викликати flush(), а помилка в будь-якому
                        # обробнику викличе відкат
                        - doctrine_transaction
    
                        # або передати інший менеджер сутностей будь-якому
                        #- doctrine_transaction: ['custom']
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <framework:messenger>
                <framework:bus name="command_bus">
                    <framework:middleware id="doctrine_transaction"/>
                    <framework:middleware id="doctrine_ping_connection"/>
                    <framework:middleware id="doctrine_close_connection"/>
    
                    <!-- або передати інший менеджер сутностей будь-якому -->
                    <!--
                    <framework:middleware id="doctrine_transaction">
                        <framework:argument>custom</framework:argument>
                    </framework:middleware>
                    -->
                </framework:bus>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $messenger = $framework->messenger();
    
        $bus = $messenger->bus('command_bus');
        $bus->middleware()->id('doctrine_transaction');
        $bus->middleware()->id('doctrine_ping_connection');
        $bus->middleware()->id('doctrine_close_connection');
        // Використання іншого менеджеру сутностей
        $bus->middleware()->id('doctrine_transaction')
            ->arguments(['custom']);
    };
    

Інше проміжкове ПЗ

New in version 5.3: Проміжкове ПЗ router_context було представлено в Symfony 5.3.

Додайте проміжкове ПЗ router_context, якщо вам потрібно генерувати абсолютні URL у споживачі (наприклад, відображати шаблон з посиланнями). Це проміжкове ПЗ зберігає контекст початкового запиту (тобто хостинг, HTTP-порт і т.д.), що необхідно при створенні абсолютних URL.

  • YAML
    1
    2
    3
    4
    5
    6
    7
    # config/packages/messenger.yaml
    framework:
        messenger:
            buses:
                command_bus:
                    middleware:
                        - router_context
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <framework:messenger>
                <framework:bus name="command_bus">
                    <framework:middleware id="router_context"/>
                </framework:bus>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
    1
    2
    3
    4
    5
    6
    7
    8
    9
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $messenger = $framework->messenger();
    
        $bus = $messenger->bus('command_bus');
        $bus->middleware()->id('router_context');
    };
    

Події Месенджера

На додаток до проміжкового ПЗ, Месенджер також запускає декілька подій. Ви можете створити слухача подій, щоб підключатися до різних частин процесу. Для кожного, клас подій буде назвою події:

Декілька автобусів, автобусів команд та подій

Месенджер надає вам один сервіс автобусу повідомлень за замовчуванням. Але ви можете сконфігурувати стільки, скільки ви хочете, створивши автобуси “команд”, “запитів” або “подій”, та контролюючи їх проміжкове ПЗ. Див. Multiple Buses.

Эта документация является переводом официальной документации Symfony и предоставляется по свободной лицензии CC BY-SA 3.0.