Messenger: робота з синхронізованими повідомленнями та повідомленнями у черзі

Дата оновлення перекладу 2024-06-09

Messenger: робота з синхронізованими повідомленнями та повідомленнями у черзі

Messenger надає автобус повідомлень з можливістю відправлення повідомлень, а потім роботи з ними одразу ж у вашому додатку, або їх відправлення через транспорт (наприклад, чергу) для обробки пізніше. Щоб дізнатися про це більше, прочитайте документацію компонента Messenger.

Установка

У додатках, що використовують Symfony Flex , виконайте цю команду, щоб встановити Messenger:

1
$ composer require symfony/messenger

Створення повідомлення та обробника

Messenger будується на двох різних класах, які ви створите: (1) класі повідомлення, який містить дані, та (2) класі обробника(ів), який буде викликано після запуску повідомлення. Клас обробника читатиме клас повідомлення та виконувати якусь дію.

До класу повідомлення немає особливих вимог, окрім того, що його можна серіалізувати:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// src/Message/SmsNotification.php
namespace App\Message;

class SmsNotification
{
    public function __construct(
        private string $content,
    ) {
    }

    public function getContent(): string
    {
        return $this->content;
    }
}

Обробник повідомлень - це PHP-викликане, рекомендований спосіб його створення - створити клас, який реалізує атрибут AsMessageHandler та має метод __invoke(), який має підказки класу повідомлення (або інтерфейс повідомлення):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;

use App\Message\SmsNotification;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
class SmsNotificationHandler
{
    public function __invoke(SmsNotification $message)
    {
        // ... зробіть щось - на кшталт відправки SMS!
    }
}

Tip

Ви також можете використати атрибут #[AsMessageHandler] в окремих методах класу. Ви можете використовувати атрибут у стількох методах одного класу, скількох ви захочете, що дозволяє вам групувати обробку багатьох повʼязаних типів повідомлень.

Завдяки автоконфігурації та підказці SmsNotification, Symfony знає, що цей обробник має бути викликаний, коли запускається повідомлення SmsNotification. У більшості випадків, це все, що вам треба буде зробити. Але ви можете також сконфігурувати обробники повідомлень вручну . Щоб побачити всі сконфігуровані обробники, виконайте:

1
$ php bin/console debug:messenger

Оголошення повідомлення

Ви готові! Для запуску повідомлення (та виклику обробника), впровадьте сервіс messenger.default_bus (через MessageBusInterface), наприклад, у контролер:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// src/Controller/DefaultController.php
namespace App\Controller;

use App\Message\SmsNotification;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Messenger\MessageBusInterface;

class DefaultController extends AbstractController
{
    public function index(MessageBusInterface $bus): Response
    {
        // призведе до виклику SmsNotificationHandler
        $bus->dispatch(new SmsNotification('Look! I created a message!'));

        // ...
    }
}

Транспорт: асинхронні повідомлення/повідомлення в черзі

За замовчуванням, повідомлення обробляються одразу ж після запуску. Якщо ви хочете обробити повідомлення асинхронно, ви можете сконфігурувати транспорт. Транспорт поже відправляти повідомлення (наприклад, в систему черги), а потім отримувати через робітника . Messenger підтримує декілька транспортів .

Note

Якщо ви хочете використати транспорт, який не підтримується, подивіться на транспорт Enqueue, який підтримує речі на кшталт Kafka і Google Pub/Sub.

Транспорт реєструється з використанням "DSN". Завдяки рецепту Flex для Messenger, ваш файл .env вже має декілька прикладів.

1
2
3
# MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages
# MESSENGER_TRANSPORT_DSN=doctrine://default
# MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages

Розкоментуйте той транспорт, який ви хочете (або встановіть його в .env.local). Див. , щоб дізнатися більше деталей.

Далі, в config/packages/messenger.yaml, давайте визначимо транспорт під назвою async, який використовує цю конфігурацію:

1
2
3
4
5
6
7
8
9
10
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async: "%env(MESSENGER_TRANSPORT_DSN)%"

            # або розширено для конфігурації більшої кількості опцій
            #async:
            #    dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
            #    options: []

Маршрутизація повідомлень до транспорту

Тепер, коли у вас є сконфігурований транспорт, замість негайної обробки повідомлень, ви можете сконфігурувати їх так, щоб вони були відправлені транспорту:

1
2
3
4
5
6
7
8
9
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async: "%env(MESSENGER_TRANSPORT_DSN)%"

        routing:
            # async - це те ім'я, яке ви дали вашому транспорту вище
            'App\Message\SmsNotification': async

Завдяки цьому, App\Message\SmsNotification буде відправлено транспорту async, і його обробник(и) не будуть викликані одразу ж. Всі повідомлення, не співпавші з routing будуть оброблені негайно.

Note

Ви можете використовувати частковий простір імен PHP, наприклад 'App\Message\*', щоб зіставити всі повідомлення у відповідному просторі імен. Єдина вимога полягає в тому, що підстановочний символ '*' має бути розміщено в кінці простору імен.

Ви можете використати '*' як клас повідомлення. Це діятиме як правило маршрутизації за замовчуванням для будь-якого повідомлення, яке не співпало з routing. Це корисно для того, щоб гарантувати, що жодне повідомлення не буде оброблено синхронно за замовчуванням.

Єдиним недоліком є те, що '*' буде також застосовано до електронних листів, відправлених за допомогою Symfony Mailer (який використовує SendEmailMessage, якщо доступний Messenger). Це може призвести до проблем, якщо ваші листи не серіалзовувані (наприклад, якщо вони містять вкладення файлів в якості PHP джерел/потоків).

Ви також можете маршрутизувати класи за їхнім батьківським класом або інтерфейсом. Або відправляти повідомлення декільком транспортам:

1
2
3
4
5
6
7
8
9
# config/packages/messenger.yaml
framework:
    messenger:
        routing:
            # маршрутизуйте всі повідомлення, що розширюють цей приклад базового класу або інтерфейс
            'App\Message\AbstractAsyncMessage': async
            'App\Message\AsyncMessageInterface': async

            'My\Message\ToBeSentToTwoSenders': [async, audit]

Note

Якщо ви сконфігуруєте маршрутизація і для дочірнього, і для батьківського класу, використовуються обидва правила. Наприклад, якщо у вас є об'єкт SmsNotification, що розширюється з Notification, будуть використані маршрутизації і для Notification, і для SmsNotification.

Tip

Ви можете визначати та перевизначати транспорт, який використовує повідомлення, під час рантайму, використовуючи TransportNamesStamp в конверті повідомлення. Цей штамп бере масив імені транспорту як єдиний аргумент. Щоб дізнатися більше про штампи, див. Конвети і штампи.

Сутності Doctrine у повідомленнях

Якщо вам потрібно передати сутність Doctrine у повідомленні, краще за все передати основний ключ сутності (або будь-яку релевантну інформацію, необхідну обробнику, на кшталт email, та ін.) заміть об'єкта:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// src/Message/NewUserWelcomeEmail.php
namespace App\Message;

class NewUserWelcomeEmail
{
    public function __construct(
        private int $userId,
    ) {
    }

    public function getUserId(): int
    {
        return $this->userId;
    }
}

Потім, у вашому обробнику, ви можете зробити запит свіжого об'єкта:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// src/MessageHandler/NewUserWelcomeEmailHandler.php
namespace App\MessageHandler;

use App\Message\NewUserWelcomeEmail;
use App\Repository\UserRepository;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
class NewUserWelcomeEmailHandler
{
    public function __construct(
        private UserRepository $userRepository,
    ) {
    }

    public function __invoke(NewUserWelcomeEmail $welcomeEmail): void
    {
        $user = $this->userRepository->find($welcomeEmail->getUserId());

        // ... відправити електронний лист!
    }
}

Це гарантує, що сутність містить свіжі дані.

Синхронна обробка повідомлень

Якщо повідомлення не співпадає з жодним правилом маршрутизації , воно не буде відправлене жодному транспорту, і буде оброблене негайно. В деяких випадках (наприклад, при пов'язуванні обробників з різними транспортами), легше та гнучкіше обробити їх чітко: створивши транспорт sync та "відправивши" повідомлення у нього для негайної обробки:

1
2
3
4
5
6
7
8
9
10
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            # ... інші транспорти

            sync: 'sync://'

        routing:
            App\Message\SmsNotification: sync

Створення вашого власного транспорту

Ви також можете створити власний транспорт, якщо вам потрібно відправляти або отримувати повідомлення через щось, що не підтримується. Див. Як створити ваш власний транспорт повідомлень.

Споживання повідомлень (запуск робітника)

Коли ваші повідомлення маршрутизовані, в більшості випадків, вам треба буде "споживати" їх. Ви можете зробити це за допомогою команди messenger:consume:

1
2
3
4
$ php bin/console messenger:consume async

# використайте -vv, щоб побачити деталі того що відбувається
$ php bin/console messenger:consume async -vv

Перший аргумент - це ім'я отримувача (або id сервісу, якщо ви маршрутизували до користувацького сервісу). За замовчуванням, команда буде виконуватися нескінченно: шукати нові повідомлення у вашому транспорті та обробляти їх. Ця команда називається вашим "робітником".

Якщо ви хочете споживати повідомлення від усіх доступних отримувачів, ви можете скористатися командою з опцією --all:

1
$ php bin/console messenger:consume --all

7.1

Опція --all була представлена в Symfony 7.1.

Tip

У середовищі розробки, якщо ви використовуєте інструмент Symfony CLI, ви можете сконфігурувати автоматичний запуск робітників разом з веб-сервером. Ви можете знайти більше інформації в розділі документації
Робітники Symfony CLI Workers .

Tip

Щоб правильно зупинити робітника, викликайте екземпляр StopWorkerException.

Розгортування у виробництві

У виробництві є декілька важливих речей, про які варто подумати:

Використовуйте менеджер процесів на кшталт Supervisor або systemd, щоб ваш(і) робітник(и) працювали
Ви хочете, щоб один або більше "робітників" працювали весь час. Щоб зробити це, використайте систему контролю процесу на кшталт Supervisor або systemd .
Не дозволяйте робітникам працювати нескінченно
Деякі сервіси (на кшталт EntityManager Doctrine) будуть споживати все більше пам'яті з часом. Тому замість того, щоб дозволяти вашому робітнику працювати завжди, використовуйте прапорець на кшталт messenger:consume --limit=10, щоб вказати робітнику, що він має обробити лише 10 повідомлень до припинення роботи (потім Супервізор створить новий процес). Також є інші опції на кшталт --memory-limit=128M и --time-limit=3600.
Зупиняйте робітників, які стикаються з помилками
Якщо залежність робітника типу ваш сервер бази даних не працює, або досягнуто тайм-ауту, ви можете спробувати додати логіку повторного зʼєднання , або просто зупинити робітника, якщо він отримує забагато помилок, за допомогою опції --failure-limit команди messenger:consume.
Перезавантажуйте робітників при розгортуванні
Кожний раз при розгортуванні вам треба буде перезавантажити всі процеси ваших робітників, щоб вони бачили новозапущений код. Щоб зробити це, виконайте messenger:stop-workers при запуску. Це сигналізує кожному робітнику, що він має завершити обробку поточного повідомлення та граціозно завершити роботу. Потім, Супервізор створить нові процеси робітників. Команда використовує кеш app внутрішньо - тому переконайтеся в тому, що він сконфігурований для використання адаптеру за вашим смаком.
Використовуйте одиин кеш між розгортуваннями
Якщо ваша стратегія розгортування полягає у створенні нових цільових каталогів, вам необхідно встановити значення опції конфігурації cache.prefix.seed , щоб використовувати один і теж простір імен кешу між запусками. Інакше, пул cache.app буде використовувати значення параметру kernel.project_dir в якості бази для простору імен, що призведе до різних просторів імен кожний раз при запуску.

Пріоритизований транспорт

Іноді певні типи повідомлень повинні мати вищий пріоритет та бути оброблені до інших. Щоб зробити це можливим, ви можете створити декілька транспортів та маршрутизувати різні повідомлення до них. Наприклад:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async_priority_high:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    # queue_name відповідає транспорту doctrine
                    queue_name: high

                    # для AMQP відправте окремий обмін, а потім поставте у чергу
                    #exchange:
                    #    name: high
                    #queues:
                    #    messages_high: ~
                    # або спробуйте redis "group"
            async_priority_low:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    queue_name: low

        routing:
            'App\Message\SmsNotification':  async_priority_low
            'App\Message\NewUserWelcomeEmail':  async_priority_high

Потім ви зможете запускати окремних робітників для кожного транспорту, або інструктувати одного робітника, щоб він обробляв повідомлення в порядку пріоритетності:

1
$ php bin/console messenger:consume async_priority_high async_priority_low

Робітник завжди спочатку шукатиме повідомлення, які очікують async_priority_high. Якщо таких немає, потім він споживатиме повідомлення з async_priority_low.

Обмежте споживання для конкретних черг

Деякий транспорт (особливо AMQP) має концепцію обміну та черг. Транспорт Symfony завжди пов'язаний з обміном. За замовчуванням, робітник споживає з усіх черг, з'єднаних з обміном вказаного транспорту. Однак є приклади використання, коли вам потрібно, щоб робітник споживав лише з конкретної черги.

Ви можете обмежити робітника, щоб він обробляв лише повідомлення з конкретних черг:

1
2
3
$ php bin/console messenger:consume my_transport --queues=fasttrack
# ви можете передати опцію --queues більше одного разу, щоб обробляти багато черг
$ php bin/console messenger:consume my_transport --queues=fasttrack1 --queues=fasttrack2

Щоб дозволити використання опції queues, отримувач має реалізовувати QueueReceiverInterface.

Перевірка кількості повідомлень у черзі у транспорті

Виконайте команду messenger:stats, щоб дізнатися, скільки повідомлень знаходяться у "чергах" якогось або всього транспорту:

1
2
3
4
5
# відображає кількість повідомлень у черзі у всьому транспорті
$ php bin/console messenger:stats

# показує статистику лише для деякого транспорту
$ php bin/console messenger:stats my_transport_name other_transport_name

Note

Для того, щоб ця команда працювала, сконфігурований отримувач транспорту має реалізовувати MessageCountAwareInterface.

Конфігурація Supervisor

Supervisor - це чудовий інструмент для гарантії того, що процес(и) ваших робітників завжди виконується (навіть якщо він закривається у зв'язку з помилкою, досягненням ліміту повідомлень або завдяки messenger:stop-workers). Ви можете встановити його на Ubuntu, наприклад, через:

1
$ sudo apt-get install supervisor

Файли конфігурації Supervisor зазвичай живуть в каталозі /etc/supervisor/conf.d. Наприклад, ви можете створити там новий файл messenger-worker.conf, щоб переконатися, що 2 екземпляри messenger:consume працюють завжди:

1
2
3
4
5
6
7
8
9
;/etc/supervisor/conf.d/messenger-worker.conf
[program:messenger-consume]
command=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600
user=ubuntu
numprocs=2
startsecs=0
autostart=true
autorestart=true
process_name=%(program_name)s_%(process_num)02d

Змініть аргумент async, щоб він використовував назву вашого транспорту (або транспортів) та user на Unix-користувача на вашому сервері.

Caution

Під час розгортування, щось може бути недоступним (наприклад, база даних), що викличе помилку початку роботи споживача. У такій ситуації, Supervisor спробує startretries декілька разів, щоб перезапустити команду. Переконайтеся в тому, що змінили це налаштування, щоб уникнути отримання команди у стані FATAL, яка ніколи не запуститься знову.

Кожний перезапуск Supervisor збільшує затримку на 1 секунду. Наприклад, якщо значення - 10, він зачекає 1 с, 2 с, 3 с та далі. Це надає сервісу загалом 55 секунд, щоб знову стати доступним. Збільшіть налаштування startretries, щоб охопити максимум очікуваного часу простою.

Якщо ви використовуєте транспорт Redis, відмітьте, що кожному робітнику необхідне унікальне ім'я споживача, щоб уникнути обробки одного повідомлення декількома робітниками. Один зі способів досягнути цього - встановити змінну середовища в файлі конфігурації Супервізора, на яку ви потім зможете послатися в messenger.yaml (див. розділ Redis вище):

1
environment=MESSENGER_CONSUMER_NAME=%(program_name)s_%(process_num)02d

Далі, вкажіть Supervisor прочитати вашу конфігурацію та запустити ваших робітників:

1
2
3
4
5
6
7
8
9
$ sudo supervisorctl reread

$ sudo supervisorctl update

$ sudo supervisorctl start messenger-consume:*

# Якщо ви розгортаєте оновлення коду, не забудьте перезапустити ваших робітників
# для запуску нового коду
$ sudo supervisorctl restart messenger-consume:*

Див. документацію Supervisor, щоб дізнатися більше деталей.

Поступое вимкнення

Якщо ви встановили у своєму проекті PHP-розширення PCNTL, робітники будуть обробляти POSIX сигнал SIGTERM для завершення обробки поточного повідомлення перед виходом.

Втім, ви можете віддати перевагу використанню інших POSIX-сигналів для поступового завершення роботи. Ви можете перевизначити стандартні сигнали встановивши опцію конфігурації framework.messenger.stop_worker_on_signals.

У деяких випадках, сигнал SIGTERM відправляється самим Супервізором (наприклад, зупинка контейнера Docker в якому Супервізор - точка входу). В таких випадках, вам треба додати ключ stopwaitsecs до конфігурації програми (зі значенням бажаного періоду відстрочки в секундах) для того, щоб виконати граціозне вимкнення:

1
2
[program:x]
stopwaitsecs=20

Конфігурація Systemd

Хоча Supervisor є чудовим інструментом, його недолік в тому, що вам потрібен системний доступ, щоб його запустити. Systemd став стандартом у більшості розповсюджень Linux, і має гарну альтернативу під назвою сервіси користувача.

Файли конфігурації сервісу користувача Systemd зазвичай живуть у каталозі ~/.config/systemd/user. Наприклад, ви можете створити новий файл messenger-worker.service. Або файл messenger-worker@.service, якщо ви хочете, щоб більше екземплярів працювали одночасно:

1
2
3
4
5
6
7
8
9
10
[Unit]
Description=Symfony messenger-consume %i

[Service]
ExecStart=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600
Restart=always
RestartSec=30

[Install]
WantedBy=default.target

Тепер, скажіть systemd підключити та запустити одного робітника:

1
2
3
4
5
6
$ systemctl --user enable messenger-worker@1.service
$ systemctl --user start messenger-worker@1.service

# щоб підключити та запустити 20 робітників
$ systemctl --user enable messenger-worker@{1..20}.service
$ systemctl --user start messenger-worker@{1..20}.service

Якщо ви зміните ваш файл конфігурації сервісу, вам потрібно перезавантажити daemon:

1
$ systemctl --user daemon-reload

Щоб перезапустити всіх ваших споживачів:

1
$ systemctl --user restart messenger-consume@*.service

Екземпляр користувача systemd запускається лише після першого входу у систему конкретним користувачем. Споживачу, натомість, часто потрібно запускатися під час початкового завантаження системи. Підключіть зволікання у користувачі, щоб активувати цю поведінку:

1
$ loginctl enable-linger <your-username>

Логами управляє journald і з ними можна працювати з використанням команди journalctl:

1
2
3
4
5
6
7
8
# слідувати логам споживача номер 11
$ journalctl -f --user-unit messenger-consume@11.service

# слідувати логам усіх споживачів
$ journalctl -f --user-unit messenger-consume@*

# слідувати всім логам з ваших сервісів користувача
$ journalctl -f _UID=$UID

Дивіться документацію systemd, щоб дізнатися більше.

Note

Вам потрібні або підвищені привілеї для команди journalctl, або додати вашого користувача до групи systemd-journal:

1
$ sudo usermod -a -G systemd-journal <your-username>

Робітник без стану

PHP створений бути без стану, різни запити не мають спільних джерел. В HTTP-контексті PHP очищує все перед відправленням відповіді, тому ви можете вирішити не турбуватися про сервіси, які можуть допускати витік пам'яті.

З іншої сторони, робітники зазвичай працюють у довгострокових CLI-процесах, які не завершуються після обробки повідомлення. Тому вам треба бути обережними зі станами сервісів, щоб вони не давали витік інформації та/або пам'яті з одного повідомлення у інше.

Однак, деякі сервіси Symfony, на кшталт обробника fingers crossed Monolog, допускають витік за своїм задумом. Symfony надає функцію перезавантаження сервісу, щоб вирішити цю проблему. При автоматично перезавантаженні контейнера між двома повідомленнями, Symfony шукає будь-які сервіси, що реалізують ResetInterface (включно з вашими власними сервісами) і викликає їх метод reset(), щоб вони могли очистити свій внутрішній стан.

Якщо сервіс не без стану і ви хочете перезавантажувати його властивості після кожного повідомлення, тоді сервіс має реалізовувати ResetInterface, де ви можете перезавантажувати властивості у методі reset().

Якщо ви не хочете перезавантажувати контейнер, додайте опцію --no-reset при виконанні команди messenger:consume.

Транспорт з обмеженням швидкості

Іноді вам може знадобитися обмежити швидкість роботи вашого працівника з повідомленнями. Ви можете налаштувати обмежувач швидкості транспорту (потрібен
компонент RateLimiter), встановивши його опцію rate_limiter:

1
2
3
4
5
6
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async:
                rate_limiter: your_rate_limiter_name

Caution

Якщо в транспорті сконфігуровано обмежувач швидкості, він заблокує всього працівника, коли буде досягнуто ліміту. Вам слід переконатися, що ви сконфігурували спеціального робітника для транспорту з обмеженням швидкості, щоб уникнути блокування інших транспортів.

Повторні спроби та помилки

Якщо під час споживання повідомлення з транспорту буде викликано виключення, воно буде автоматично повторно відправлене транспорту, щоб спробувати знову. За замовчуванням, повідомлення має 3 спроби перед скиданням або відправкою транспорту помилок . Кожна спроба буде також відкладена в часі, на випадок, якщо вона була викликана тимчасовою проблемою. Все це можна сконфігурувати для кожного транспорту:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async_priority_high:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'

                # конфігурація за замовчуванням
                retry_strategy:
                    max_retries: 3
                    # затримка в мілісекундах
                    delay: 1000
                    # робить так, щоб затримка була довшою перед кожною наступною спробою
                    # наприклад, затримка в 1 секунду, 2 секунди, 4 секунди
                    multiplier: 2
                    max_delay: 0
                    # перевизначити це все сервісом, який реалізує
                    # Symfony\Component\Messenger\Retry\RetryStrategyInterface
                    # service: null

7.1

Опція jitter була представлена в Symfony 7.1.

Tip

Symfony запускає WorkerMessageRetriedEvent, коли повідомлення має повторні спроби, тому ви можете виконати власну логіку.

Note

Завдяки SerializedMessageStamp, серіалізована форма повідомлення зберігається, що запобігає його повторній серіалізації, якщо відбувається повторна спроба повідомлення.

Уникання повторних спроб

Іноді обробка повідомлення може бути невдалою, і ви будете знати, що це перманентно і повторних спроб не потрібно. Якщо ви викличете UnrecoverableMessageHandlingException, повідомлення не матиме повторних спроб.

Note

Повідомлення, які не будуть повторно оброблені, все одно будуть показані у сконфігурованому транспорті невдач. Якщо ви хочете уникнути цього, подумайте про те, щоб обробити помилку самостійно і дозволити обробнику успішно завершитися.

Форсування повторних спроб

Іноді обробка повідомлення може бути невдалою, і ви будете знати, що це тимчасово, і необхідна повторна спроба. Якщо ви викличете RecoverableMessageHandlingException, повідомлення завжди матиме повторну спробу.

Збереження та повторні спроби невдалих повідомлень

Якщо повідомлення зазнає невдачі та має декілька повторних спроб (max_retries), воно потім буде відбраковане. Щоб уникнути цього, ви можете сконфігурувати failure_transport:

1
2
3
4
5
6
7
8
9
10
# config/packages/messenger.yaml
framework:
    messenger:
        # після повторних спроб, повідомлення будуть відправлені транспорту "failed"
        failure_transport: failed

        transports:
            # ... інший транспорт

            failed: 'doctrine://default?queue_name=failed'

У цьому прикладі, якщо обробка повідомлення зазнає невдачі 3 рази (значення за замовчуванням max_retries), воно потім буде відправлене транспорту failed. Хоча ви можете використати messenger:consume failed для споживання його, як звичайного транспорту, вам скоріше захочеться вручну переглянути повідомлення в транспорті помилок та вирішити щодо повторних спроб:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# побачити всі повідомлення в транспорті помилок
$ php bin/console messenger:failed:show

# побачити перших 10 повідомлень
$ php bin/console messenger:failed:show --max=10

# побачити лише повідомлення MyClass
$ php bin/console messenger:failed:show --class-filter='MyClass'

# побачити кількість повідомлень класу повідомлення
$ php bin/console messenger:failed:show --stats

# побачити деталі конкретної помилки
$ php bin/console messenger:failed:show 20 -vv

# побачити та повторно спробувати кожне повідомлення окремо
$ php bin/console messenger:failed:retry -vv

# повторна спроба конкретних повідомлень
$ php bin/console messenger:failed:retry 20 30 --force

# видалити повідомлення без повторної спроби
$ php bin/console messenger:failed:remove 20

# видалити повідомлення без повторних спроб та показати кожне повідомлення перед видаленням
$ php bin/console messenger:failed:remove 20 30 --show-messages

Якщо повідомлення знов зазнає невдачі, воно буде відправлене назад у транспорт помилок у відповідності зі звичайними правилами повторних спроб . Як тільки буде досягнено максимум повторних спроб, повідомлення буде скинуте перманентно.

Декілька помилкових транспортів

Іноді недостатньо мати один глобальний сконфігурований failed transport, тому що деякі повідомлення важливіші за інші. В таких випадках ви можете перевизначити транспорт помилок лише для певних транспортів:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# config/packages/messenger.yaml
framework:
    messenger:
        # після повторних спроб повідомлення будуть відправлені транспорту "failed"
        # за замовчуванням, якщо всередині транспорту не сконфігурований "failed_transport"
        failure_transport: failed_default

        transports:
            async_priority_high:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                failure_transport: failed_high_priority

            # так як транспорт помилок не сконфігурований, буде використовуватися встановлений
            # глобальний набір "failure_transport"
            async_priority_low:
                dsn: 'doctrine://default?queue_name=async_priority_low'

            failed_default: 'doctrine://default?queue_name=failed_default'
            failed_high_priority: 'doctrine://default?queue_name=failed_high_priority'

Якщо немає визначеного failure_transport глобально або на рівні транспорту, повідомлення буде скинуте після певної кількості спроб.

Невдалі команди мають необов'язкову опцію --transport, щоб вказати failure_transport, сконфігурований на рівні транспорту.

1
2
3
4
5
6
7
8
# побачити всі повідомлення в транспорті "failure_transport"
$ php bin/console messenger:failed:show --transport=failure_transport

# повторна спроба конкретних повідомлень з "failure_transport"
$ php bin/console messenger:failed:retry 20 30 --transport=failure_transport --force

# видалити повідомлення без повторної спроби з "failure_transport"
$ php bin/console messenger:failed:remove 20 --transport=failure_transport

Конфігурація транспорту

Messenger підтримує декілька різних типів транспорту, кожний зі своїми опціями. Опції можуть бути передані транспорту через рядок DSN або конфігурацію.

1
2
# .env
MESSENGER_TRANSPORT_DSN=amqp://localhost/%2f/messages?auto_setup=false
1
2
# .env
MESSENGER_TRANSPORT_DSN=amqp://localhost/%2f/messages?auto_setup=false
1
2
3
4
5
6
7
8
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            my_transport:
                dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
                options:
                    auto_setup: false

Опції, визначені під options, панують над визначеними в DSN.

Транспорт AMQP

Транспорт AMQP використовує PHP-розширення AMQP для відправлення повідомлень в чергу на кшталт RabbitMQ.

1
$ composer require symfony/amqp-messenger

DSN транспорту AMQP може виглядати так:

1
2
3
4
5
# .env
MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages

# або використайте протокол AMQPS
MESSENGER_TRANSPORT_DSN=amqps://guest:guest@localhost/%2f/messages

Якщо ви хочете виокристати AMQP, закодований за допомогою TLS/SSL, ви маєте також надати CA-сертифікат. Визначіть шлях сертифікату в налаштуванні PHP.ini amqp.cacert (наприклад, amqp.cacert = /etc/ssl/certs) або в параметрі DSN cacert (наприклад, amqps://localhost?cacert=/etc/ssl/certs/).

Порт, за замовчуванням використовуваний AMQP, закодованим за допомогою TLS/SSL, - 5671, але ви можете перевизначити його в параметрі DSN port (наприклад, amqps://localhost?cacert=/etc/ssl/certs/&port=12345).

Note

За замовчуванням, транспорт автоматично створюватиме будь-які необхідні обміни, черги та сполучні ключі. Це можна відключити, але деяків функції можуть працювати некоректно (на кшталт відкладених черг).

Щоб не автоматично створювати жодних черг повторно, ви можете сконфігурувати транспорт за допомогою queues: [].

Note

Ви можете обмежити споживача транспорту AMQP, щоб він обробляв лише повідомлення з деяких черг обміну. Див. .

Транспорт має інші опції, включно із способами конфігурації обміну, сполучними ключами, чергами і т.д. Дивіться документацію Connection.

Транспорт має ряд опцій:

????? ???? ?? ?????????????
auto_setup ?? ??? ??????? ???? ???????? ??????????? ??? ??? ???????? / ?????????. true
cacert ???? ?? ????? CA-??????????? ? PEM-???????.  
cert ???? ?? ??????????? ??????? ? PEM-???????.  
channel_max ?????? ????????? ????? ???????, ??? ???????? ??????. 0 ??????? ??????????? ????? ?????????.  
confirm_timeout ????-??? ??? ????????????? ? ????????; ???? ?? ??????? ????????? ?? ??????????? ????????????? ????????????. ????????: 0 ??? ?????? ??????. ???? ???? ????????.  
connect_timeout ????-??? ?'???????. ????????: 0 ??? ?????? ??????. ???? ???? ????????.  
frame_max ?????????? ?????? ??????, ???? ???????? ?????? ??? ?'???????, ??????? ?? ?????????? ????? ?? ???-??????. 0 ??????? ??????????? ????? ?????????? (???????? ??? ?????? ??????? ?????? librabbimq ?? ?????????????)  
heartbeat ???????? ????????? ?'??????? (? ????????), ??? ???? ??????. 0 ???????, ?? ?????? ?? ???? ?????????. ?????????, ?? librabbimq ??? ???????? ????????? ?????????, ?? ???????, ?? ????????? ????????????? ???? ??? ??? ?????????? ????????.  
host ??'? ???????? AMQP-???????  
key ???? ?? ????? ??????? ? PEM-???????.  
login ???? ???????????, ??? ???? ??????????????? ??? ??????????? ?? ??????? AMQP  
password ?????? ??? ???????????? ??? ?'??????? ? AMQP-????????  
persistent   'false'
port ???? AMQP-???????  
read_timeout ????-??? ??????? ??????????. ????????: 0 ??? ?????? ??????. ???? ???? ????????.  
retry    
sasl_method    
connection_name ??? ?????????????? ???? ???????? (??????? ?????????? ?????? ?????????? PHP AMQP 1.10)  
verify ?????? ??? ??????? ??????????? ?????. ???? ??????????? ????????, ?? ???????? ??'? ? ??????????? ??????? ??? ?????????? ? ??'?? ???????. ??????????? ???????? ?? ?????????????.  
vhost ??????????? ??????? ??? ???????????? ? AMQP-????????  
write_timeout ????-??? ???????? ??????????. ????????: 0 ??? ?????? ??????. ???? ???? ????????.  
delay[queue_name_pattern] ??????, ???????????????? ??? ????????? ???? delay_%exchange_name%_%routing_key%_%delay%
delay[exchange_name] ??'? ??????, ??????????????? ??? ???????????/????????? ??????????? delays
queues[name][arguments] ????????? ?????????  
queues[name][binding_arguments] ?????????, ??????????????? ??? ???'???????? ?????.  
queues[name][binding_keys] ???????? ????? (???? ?) ??? ???'???????? ? ??????  
queues[name][flags] ???????? ????? AMQP_DURABLE
exchange[arguments] ????????? ????????? ??? ?????? (?????????,alternate-exchange)  
exchange[default_publish_routing_key] ???? ?????????????, ???????????????? ??? ??????????, ???? ??? ?? ???????? ? ????????????  
exchange[flags] ???????? ?????? AMQP_DURABLE
exchange[name] ????? ??????  
exchange[type] ??? ?????? fanout

Ви можете також сконфігурувати налаштування спеціально для AMQP у вашому повідомленні, додавши AmqpStamp до вашого Конверту:

1
2
3
4
5
6
7
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp;
// ...

$attributes = [];
$bus->dispatch(new SmsNotification(), [
    new AmqpStamp('custom-routing-key', AMQP_NOPARAM, $attributes),
]);

Caution

Споживачі не відображаються в панелі адміна, так як цей транспорт не покладається на \AmqpQueue::consume(), який блокує. Наявність блокуючого отримувача робить опції --time-limit/--memory-limit команди messenger:consume. а також команди messenger:stop-workers марними, так як вони покладаються на той факт, що отримувач повертається негайно, незалежно від того, знаходить він повідомлення, чи ні. Робітник споживання відповідає за ітерацію до отримання повідомлення для обробки та/або до того, як буде досягнена одна з умов зупинки. Таким чином, логіка зупинки робітника може бути досягнена, якщо він застряг на блокуючому виклику.

Tip

Якщо ваш додаток стикається з виключеннями сокетів або високим відтоком з'єднань (що проявляється у швидкому створенні і видаленні з'єднань), розгляньте можливість використання AMQProxy. Цей інструмент працює як шлюз між Symfony Messenger і сервером AMQP, підтримуючи стабільні з'єднання і мінімізуючи накладні витрати (що також покращує загальну продуктивність).

Транспорт Doctrine

Транспорт Doctrine може бути використаний для зберігання повідомлень в таблиці бази даних.

1
$ composer require symfony/doctrine-messenger

DSN транспорту Doctrine може виглядати так:

1
2
# .env
MESSENGER_TRANSPORT_DSN=doctrine://default

Формат doctrine://<connection_name>, у випадку, якщо у вас є декілька з'єднань і ви хочете використати якесь відмінне від "default". Транспорт буде автоматично створювати таблицю під назвою messenger_messages.

Якщо ви хочете змінити ім'я таблиці за замовчуванням, передайте власне ім'я таблиці в DSN за допомогою опції table_name:

1
2
# .env
MESSENGER_TRANSPORT_DSN=doctrine://default?table_name=your_custom_table_name

Або, для створення таблиці самостійно, встановіть опцію auto_setup як false, та згенеруйте міграцію .

Caution

Властивість повідомлень datetime, що зберігаються у базі даних, використовує часовий пояс поточної системи. Це може спричинити проблеми, якщо декілька машин з різними конфігураціями часових поясів використовують одне сховище.

Транспорт має такі опції:

????? ???? ?? ?????????????
table_name ????? ??????? messenger_messages
queue_name ????? ????? (???????? ? ???????, ??? ??????????????? ???? ??????? ??? ????????? ???????????) default
redeliver_timeout ????-??? ????? ????????? ??????? ???????????? ? ?????, ??? ?? ? ????? "???????" (???? ???????? ?? ?????? ??????? ?????????, ????????? ??, ? ??????? ?? ????? ????? ?????????? ????????????) - ? ????????. 3600
auto_setup ?? ??? ??????? ???? ???????? ??????????? ??? ??? ?????????/?????????. true

Note

Встановіть redeliver_timeout у більшому значенні, ніж тривалість вашого найповільнішого повідомлення. Інакше, деякі повідомлення починатимуться вдруге, поки перше все ще обробляється.

При використанні PostgreSQL, у вас є доступ до наступних опцій для отримання переваг функції LISTEN/NOTIFY. Це дозволяє більш продуктивний підхід, ніж поведінка голосування транспорту Doctrine за замовчуванням, так як PostgreSQL буде напряму повідомляти робітників, коли нове повідомлення з'являтиметься в таблиці.

????? ???? ?? ?????????????
use_notify ?? ??????????????? LISTEN/NOTIFY. true
check_delayed_interval ???????? ????????? ??????????? ??????????? ? ????????????. ?????????? ?? 0, ??? ?????????? ?????????. 1000
get_notify_timeout ?????????? ???? ?????????? ????????? ??? ??????? PDO::pgsqlGetNotify`, ? ????????????. 0

Транспорт Beanstalkd

Транспорт Beanstalkd відправляє повідомлення прямо в робочу чергу Beanstalkd. Встановіть його, виконавши:

1
$ composer require symfony/beanstalkd-messenger

DSN транспорту Beanstalkd може виглядати так:

1
2
3
4
5
# .env
MESSENGER_TRANSPORT_DSN=beanstalkd://localhost:11300?tube_name=foo&timeout=4&ttr=120

# Якщо порту немає, він за замовчуванням буде 11300
MESSENGER_TRANSPORT_DSN=beanstalkd://localhost

Транспорт має ряд опцій:

????? ???? ?? ?????????????
tube_name ????? ????? default
timeout ????-??? ??????? ???????????? - ? ????????. 0 (??????? ?????? ?????? ? ??? ????????? ?????????, ??? ????????? TransportException)
ttr ??? ????????? ???????????? ????? ??? ?? ????????? ???? ????? ? ????? ?????????? - ? ????????. 90

Транспорт Redis

Транспорт Redis використовує потоки для створення черги повідомлень. Цей транспорт вимагає PHP-розширення Redis (>=4.3) та працюючого серверу Redis (^5.0).

1
$ composer require symfony/redis-messenger

DSN транспорту Redis може виглядати так:

1
2
3
4
5
6
7
8
9
10
11
12
# .env
MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
# Повний приклад DSN
MESSENGER_TRANSPORT_DSN=redis://password@localhost:6379/messages/symfony/consumer?auto_setup=true&serializer=1&stream_max_entries=0&dbindex=0&delete_after_ack=true
# Приклад кластеру Redis
MESSENGER_TRANSPORT_DSN=redis://host-01:6379,redis://host-02:6379,redis://host-03:6379,redis://host-04:6379
# Приклад Unix-сокету
MESSENGER_TRANSPORT_DSN=redis:///var/run/redis.sock
# Приклад TLS
MESSENGER_TRANSPORT_DSN=rediss://localhost:6379/messages
# Приклад багатьох хостів Redis Sentinel
MESSENGER_TRANSPORT_DSN=redis:?host[redis1:26379]&host[redis2:26379]&host[redis3:26379]&sentinel_master=db

Деякі опції можуть бути сконфігуровані через DSN або ключ options під транспортом в messenger.yaml:

????? ???? ?? ?????????????
stream ????? ?????? Redis messages
group ????? ????? ?????????? Redis symfony
consumer ????? ?????????, ????????????????? ? Redis consumer
auto_setup ???????? ????? Redis ???????????? true
auth ?????? Redis  
delete_after_ack ???? true, ???????????? ????????????? ??????????? ????? ??????? false
delete_after_reject ???? true, ???????????? ????????????? ???????????, ???? ???? ???????????? true
lazy ?'?????????? ???? ???? ?'??????? ?????? ????????? false
serializer ?? ????????????? ???????? ???????????? ? Redis (????? Redis::OPT_SERIALIZER ) Redis::SERIALIZER_PHP
stream_max_entries ??????????? ????????? ???????, ?? ???? ???? ??????? ?????. ?????????? ????????? ?????? ????????, ??? ???????? ?????? "??????????" ??????????? 0 (??? ???????? "?? ???????")
redeliver_timeout ????-??? ????? ????????? ??????? "???????????" ????????????, ??? ???????? ?????????? ????????? (???? ???????? ?? ?????? ??????? ?????, ????????? ??, ? ??????? ??? ????? ???? ???????? ???????? ???????????? ????????????) - ? ????????. 3600
claim_interval ???????? ? ???? ????? ?????????? "?????????"/???????? ???????????? - ? ???????????? 60000 (1 ??????)
persistent_id ?????, ???? null-????????? ?? ?????? non-persistent. null
retry_interval ???? ?????, ???????? ? ???????????? 0
read_timeout ????????, ???????? ? ????????, ?? ????????????? ??????? "??? ????????" 0
timeout ????????, ???????? ? ????????, ??. ????????????? ??????? "??? ????????" 0
sentinel_master ?????, ???? null ??? ??????, ????????????? ????????? Sentinel null
redis_sentinel ????????? ??????????  

7.1

Опція `redis_sentinel` як псевдонім `sentinel_master` була представлена в Symfony 7.1.

Caution

Ніколи не повинно бути більше однієї команди messenger:consume, виконуваної з однаковою комбінацією stream, group та consumer, інашке повідомлення можуть бути оброблені більше, ніж один раз. Якщо ви запускаєте декілька робітників черги, consumer може бути встановлено як змінну оточення (на кшталт %env(MESSENGER_CONSUMER_NAME)%), встановлено Супервізором (приклад нижче) або будь-яким іншим сервісом, використовуваним для управління процесами робітників. В середовищі контейнера, HOSTNAME може бути використано як ім'я споживача, так як там лише один робітник на контейнер/хостинг. Якщо ви використовуєте Kubernetes для управління контейнерами, розгляньте використання StatefulSet для стабілізації імен.

Tip

Встановіть delete_after_ack як true (якщо у вас одна група) або визначте stream_max_entries (якщо ви можете припустити, яка максимальна кількість записів припустима у вашому випадку), щоб уникнути витоків пам'яті. В іншому випадку, всі повідомлення назавжди залишаться в Redis.

Транспорт у пам'яті

Транспорт in-memory насправді не доставляє повідомлення. Замість цього, він тримає їх у пам'яті під час запиту, що може бути корисним для тестування. Наприклад, якщо у вас є транспорт async_priority_normal, ви можете перевизначати його в середовищі test, щоб використати цей транспорт:

1
2
3
4
5
# config/packages/test/messenger.yaml
framework:
    messenger:
        transports:
            async_priority_normal: 'in-memory://'

Тоді під час тестування повідомлення не будуть відправлені реальному транспорту. Навіть краще, у тесті, ви можете перевірити, щоб лише одне повідомлення було відправлене під час запиту:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// tests/Controller/DefaultControllerTest.php
namespace App\Tests\Controller;

use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;
use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport;

class DefaultControllerTest extends WebTestCase
{
    public function testSomething(): void
    {
        $client = static::createClient();
        // ...

        $this->assertSame(200, $client->getResponse()->getStatusCode());

        /* @var InMemoryTransport $transport */
        $transport = $this->getContainer()->get('messenger.transport.async_priority_normal');
        $this->assertCount(1, $transport->getSent());
    }
}

Транспорт має ряд опцій:

serialize (бульову, за замовчуванням: false)
Чи сериалізувати повідомлення. Це корисно для тестування додатковго шару, особливо коли ви використовуєте власний сериалізатор повідомлень.

Note

Всі транспорти in-memory будуть автоматично скинуті після кожного тесту в класах тестів, що розширюють KernelTestCase або WebTestCase.

Amazon SQS

Транспорт Amazon SQS чудово підходить для додатку на AWS. Встановіть його, виконавши:

1
$ composer require symfony/amazon-sqs-messenger

DSN SQS транспорту виглядає так:

1
2
3
# .env
MESSENGER_TRANSPORT_DSN=https://sqs.eu-west-3.amazonaws.com/123456789012/messages?access_key=AKIAIOSFODNN7EXAMPLE&secret_key=j17M97ffSVoKI0briFoo9a
MESSENGER_TRANSPORT_DSN=sqs://localhost:9494/messages?sslmode=disable

Note

Транспорт автоматично створить необхідні черги. Це можна відключити, встановивши опцію auto_setup як false.

Tip

До відправлення або отримання повідомлення, Symfony необхідно конвертувати назву черги в URL черги AWS, викликавши API GetQueueUrl в AWS. Цього додаткового API-виклику можна уникнути, надавши DSN, яка є URL черги.

Транспорт має ряд опцій:

????? ???? ?? ?????????????
access_key ???? ??????? AWS  
account ????????????? AWS-???????? ??????? ????????? ?????
auto_setup ?? ???????? ??????????? ?????????? ????? ??? ??? ?????????/?????????. true
buffer_size ????????? ??????????? ??? ???????????? ????????? 9
debug ???? true, ???? ??? ???? HTTP ??????? ?? ?????????? (?? ??????? ?? ??????????????) false
endpoint ?????????? URL ?? SQS-??????? https://sqs.eu-west-1.amazonaws.com
poll_timeout ??? ?????????? ?????? ???????????? ? ???????? 0.1
queue_name ????? ????? messages
region ????? AWS-??????? eu-west-1
secret_key ????????? ???? AWS  
session_token ????? ?????  
visibility_timeout ????????? ??????, ???????? ???? ???????????? ?? ???? ??????? (Visibility Timeout) ???????????? ?????
wait_time ?????????? Long polling ? ???????? 20

Note

Параметр wait_time визначає максимальний час очікування для Amazon SQS до того, як повідомлення буде доступне в черзі, перед відправленням відповіді. Це допомагає знизити вартість використання Amazon SQS, усуваючи деяку кількість пустих відповідей.

Параметр poll_timeout визначає час очікування отримувача до повернення null. Він уникає блокування інших отримувачів від виклику.

Note

Якщо назва черги має суфікс .fifo, AWS створить чергу FIFO. Використайте марку AmazonSqsFifoStamp, щоб визначити Message group ID і Message deduplication ID.

Іншою можливістю є включення AddFifoStampMiddleware. Якщо у ваше повідомлення реалізує MessageDeduplicationAwareInterface, проміжне програмне забезпечення автоматично додасть AmazonSqsFifoStamp' і встановіть 23927af27945b87a5f0ed78f168f61668cea9b25. Крім того, якщо ваше повідомлення реалізує :class:Symfony\Component\Messenger\Bridge\AmazonSqs\MessageGroupAwareInterface`, проміжне програмне забезпечення автоматично встановить мітку Ідентифікатор групи повідомлень.

Ви можете дізнатися більше про проміжне програмне забезпечення у спеціальному розділі .

Черги FIFO не підтримують установки затримки окремих повідомлень, значення delay: 0 потребується в налаштуваннях стратегії повторних спроб.

Серіалізація повідомлень

Коли повідомлення відправляються (та отримуються) в транпсорт, вони сериалізуються з використанням нативних функцій PHP serialize() і unserialize(). Ви можете змінити це глобально (або для кожного транспорту) на сервіс, який реалізує SerializerInterface:

1
2
3
4
5
6
7
8
9
10
11
12
13
# config/packages/messenger.yaml
framework:
    messenger:
        serializer:
            default_serializer: messenger.transport.symfony_serializer
            symfony_serializer:
                format: json
                context: { }

        transports:
            async_priority_normal:
                dsn: # ...
                serializer: messenger.transport.symfony_serializer

messenger.transport.symfony_serializer - це вбудований сервіс, який використовує компонент Serializer і може бути сконфігурований декількома способами. Якщо ви оберете використовувати сериалізатор Symfony, ви зможете контролювати контекст для кожного випадку окремо через SerializerStamp (див. Конверти та марки).

Tip

При відправленні/отримаванні повідомлен в/з іншого транспорту, вам може знадобитися більше контролю над процесом сериалізації. Використання користувацького сериалізатору надає такий контроль. Див. Туторіал по сериалізації повідомлень SymfonyCasts, щоб дізнатися більше.

Запуск команд та зовнішніх процесів

Запуск команди

Запустити будь-яку команду можна за допомогою розгоратння RunCommandMessage. Symfony подбає про обробку цього повідомлення і виконає команду, передану до параметра повідомлення:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use Symfony\Component\Console\Messenger\RunCommandMessage;
use Symfony\Component\Messenger\MessageBusInterface;

class CleanUpService
{
    public function __construct(private readonly MessageBusInterface $bus)
    {
    }

    public function cleanUp(): void
    {
        // Довге завдання з деяким кешуванням...

        // Після завершення, разгорніть деякі команди очищення
        $this->bus->dispatch(new RunCommandMessage('app:my-cache:clean-up --dir=var/temp'));
        $this->bus->dispatch(new RunCommandMessage('cache:clear'));
    }
}

Ви можете сконфігурувати поведінку на випадок, якщо щось піде не так, під час виконання команди. Для цього ви можете скористатися параметрами throwOnFailure та catchExceptions при створенні вашого екземпляру RunCommandMessage.

Після обробки, обробник поверне значення RunCommandContext, яке містить багато корисної інформації, наприклад, код завершення роботи або виведення процесу. Ви можете звернутися до сторінки, присвяченої результатам обробника для отримання додаткової інформації.

Запуск зовнішнього процесу

Messenger постачається зі зручним помічником для запуску зовнішніх процесів за допомогою розгортання повідомлення. Для цього використовуються переваги Компонента Process. Розгортаючи RunProcessMessage, Messenger подбає про створення нового процесу з параметрами, які ви передали:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Process\Messenger\RunProcessMessage;

class CleanUpService
{
    public function __construct(private readonly MessageBusInterface $bus)
    {
    }

    public function cleanUp(): void
    {
        $this->bus->dispatch(new RunProcessMessage(['rm', '-rf', 'var/log/temp/*'], cwd: '/my/custom/working-dir'));

        // ...
    }
}

Після обробки обробник поверне значення RunProcessContext, яке містить багато корисної інформації, наприклад, код завершення роботи або виведення процесу.
Ви можете звернутися до сторінки, присвяченої результатам обробника для отримання додаткової інформації.

Пінг веб-сервісу

Іноді вам може знадобитися регулярно пінгувати веб-сервіс, щоб дізнатися його статус, наприклад чи він працює. Це можна зробити, розгорнувши команду PingWebhookMessage:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
use Symfony\Component\HttpClient\Messenger\PingWebhookMessage;
use Symfony\Component\Messenger\MessageBusInterface;

class LivenessService
{
    public function __construct(private readonly MessageBusInterface $bus)
    {
    }

    public function ping(): void
    {
        // Викликано HttpExceptionInterface в 3xx/4xx/5xx
        $this->bus->dispatch(new PingWebhookMessage('GET', 'https://example.com/status'));

        // Пінг, але він не викликає 3xx/4xx/5xx
        $this->bus->dispatch(new PingWebhookMessage('GET', 'https://example.com/status', throw: false));

        // Може бути використана будь-яка валідна опція HttpClientInterface
        $this->bus->dispatch(new PingWebhookMessage('POST', 'https://example.com/status', [
            'headers' => [
                'Authorization' => 'Bearer ...'
            ],
            'json' => [
                'data' => 'some-data',
            ],
        ]));
    }
}

Обробник поверне значення ResponseInterface, що дозволяє вам збирати та обробляти інформацію, що повертається HTTP-запитом.

Отримання результатів від ваших обробників

Коли повідомлення обробляється, HandleMessageMiddleware додає HandledStamp для кожного об'єкта, який обробив повідомлення. Ви можете використовувати його для отримання значення, повернутого обробником(ами):

1
2
3
4
5
6
7
8
9
10
11
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\HandledStamp;

$envelope = $messageBus->dispatch(new SomeMessage());

// отримати значення, яке було повернуто останнім обробником повідомлення
$handledStamp = $envelope->last(HandledStamp::class);
$handledStamp->getResult();

// або отримаи інформацію про всі обробники
$handledStamps = $envelope->all(HandledStamp::class);

Отримання результатів при роботі з автобусами команди та запиту

Компонент Messenger можна використовувати в архітектурах CQRS, де автобуси команд і запитів є центральними частинами додатку. Прочитайте статтю Мартіна Фаулера про CQRS, щоб дізнатися більше і як сконфігурувати декілька автобусів .

Оскільки запити зазвичай синхронні і очікується, що вони будуть оброблені один раз, отримання результату від обробника є звичайною потребою.

Для отримання результату обробника при обробці синхронних запитів існує HandleTrait. Він також гарантує, що буде зареєстровано лише один обробник. HandleTrait може бути використаний у будь-якому класі, який має властивість $messageBus:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// src/Action/ListItems.php
namespace App\Action;

use App\Message\ListItemsQuery;
use App\MessageHandler\ListItemsQueryResult;
use Symfony\Component\Messenger\HandleTrait;
use Symfony\Component\Messenger\MessageBusInterface;

class ListItems
{
    use HandleTrait;

    public function __construct(
        private MessageBusInterface $messageBus,
    ) {
    }

    public function __invoke(): void
    {
        $result = $this->query(new ListItemsQuery(/* ... */));

        // Зробити щось з результатом
        // ...
    }

    // Створення такого методу не є обовʼязковим, але довзоляю додавати до результату підказки
    private function query(ListItemsQuery $query): ListItemsQueryResult
    {
        return $this->handle($query);
    }
}

Отже, ви можете використовувати рису для створення класів шини автобусів та запитів. Наприклад, ви можете створити спеціальний клас QueryBus і впроваджувати його всюди, де вам потрібна поведінка автобуса запитів замість MessageBusInterface:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// src/MessageBus/QueryBus.php
namespace App\MessageBus;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\HandleTrait;
use Symfony\Component\Messenger\MessageBusInterface;

class QueryBus
{
    use HandleTrait;

    public function __construct(
        private MessageBusInterface $messageBus,
    ) {
    }

    /**
     * @param object|Envelope $query
     *
     * @return mixed The handler returned value
     */
    public function query($query): mixed
    {
        return $this->handle($query);
    }
}

Налаштування обробників

Конфігурація обробників з використанням атрибутів

Ви можете сконфігурувати вашого обробника, передавши опції атриубуту:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;

use App\Message\OtherSmsNotification;
use App\Message\SmsNotification;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler(fromTransport: 'async', priority: 10)]
class SmsNotificationHandler
{
    public function __invoke(SmsNotification $message): void
    {
        // ...
    }
}

Можливі опції для конфігурації з атрибутом:

????? ????
bus ????? ????????, ? ????? ???????? ???? ?????????? ????????????, ?? ????????????? - ??? ????????.
fromTransport ????? ??????????, ? ????? ???????? ???? ?????????? ????????????, ?? ????????????? - ??? ??????????.
handles ??? ??????????? (FQCN), ??? ?????? ???? ????????? ??????????, ???????? ?????? ???? ?? ????? ??????? ?? ????????? ????????.
method ??'? ??????, ???? ???? ????????? ????????????, ?????? ???? ??'????? ? ????.
priority ????????? ?????????, ???? ???????? ?????????? ?????? ????????? ???? ? ?? ???? ????????????.

Конфігурація обробників вручну

Symfony зазвичай буде знаходити та реєструвати вашого обробника автоматично . Але ви також можете сконфігурувати його вручну - і передати йому додаткову конфігурацію - тегувавши сервіс обробника messenger.message_handler

1
2
3
4
5
6
7
8
9
10
11
# config/services.yaml
services:
    App\MessageHandler\SmsNotificationHandler:
        tags: [messenger.message_handler]

        # або сконфігурувати з опціями
        tags:
            -
                name: messenger.message_handler
                # небхідно лише якщо неможливо вгадати за підказкою
                handles: App\Message\SmsNotification

Можливі опції конфігурації з тегами:

????? ????
bus ????? ????????, ? ????? ???????? ???? ?????????? ????????????, ?? ????????????? - ??? ????????.
from_transport ????? ??????????, ? ????? ???????? ???? ?????????? ????????????, ?? ????????????? - ??? ??????????.
handles ??? ??????????? (FQCN), ??? ?????? ???? ????????? ??????????, ???????? ?????? ???? ?? ????? ??????? ?? ????????? ????????.
method ??'? ??????, ???? ???? ????????? ????????????, ?????? ???? ??'????? ? ????.
priority ????????? ?????????, ???? ???????? ?????????? ?????? ????????? ???? ? ?? ???? ????????????.

Обробка множини повідомлень

Один клас обробника може обробляти багато повідомлень. Для цього, додайте атрибут #AsMessageHandler до всіх методів обробки:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;

use App\Message\OtherSmsNotification;
use App\Message\SmsNotification;

class SmsNotificationHandler
{
    #[AsMessageHandler]
    public function handleSmsNotification(SmsNotification $message): void
    {
        // ...
    }

    #[AsMessageHandler]
    public function handleOtherSmsNotification(OtherSmsNotification $message): void
    {
        // ...
    }
}

Транзакційні повідомлення: Обробка нових повідомлень після завершення обробки

Обробник повідомлень може dispatch нові повідомлення, одночасно обробляючи інші, до того самого або іншого автобуса (якщо у додатку визначено декілька автобусів ). Будь-які помилки або винятки, які виникають під час цього процесу, можуть мати непередбачувані наслідки, такі як:

  1. Якщо при використанні DoctrineTransactionMiddleware розгорнуте повідомлення викликає виключення, то будь-які транзакції бази даних у початковому обробнику будуть відкочені.
  2. Якщо повідомлення відправляється на інший автобус, то відправлене повідомлення буде оброблено, навіть якщо якийсь код пізніше в поточному обробнику згенерує виключення.

Приклад проццесу RegisterUser

Розглянемо додаток з автобусами команд та подій. Додаток розгортає команду з іменем RegisterUser у команді автобуса. Команда обробляється RegisterUserHandler, який створює об'єкт User, зберігає цей об'єкт до бази даних і надсилає повідомлення UserRegistered на до автобуса подій.

Існує багато обробників повідомлення UserRegistered, один обробник може надсилати привітальний лист новому користувачеві. Ми використовуємо DoctrineTransactionMiddleware для обгортання всіх запитів до бази даних в одну транзакцію бази даних.

Проблема 1: Якщо під час надсилання привітального листа буде викликано виключення, то користувач не буде створений, оскільки DoctrineTransactionMiddleware
відкотить транзакцію Doctrine, в якій було створено користувача.

Проблема 2: Якщо при збереженні користувача до бази даних буде згенеровано виключення, привітальний лист все одно буде надіслано, оскільки він обробляється асинхронно.

Проміжкове ПО DispatchAfterCurrentBusMiddleware

Для багатьох додатків бажаною поведінкою є обробка лише тих повідомлень, які які розгортаються обробником після повного завершення роботи цього обробника. Це можна зробити за допомогою використання DispatchAfterCurrentBusMiddleware і додавання DispatchAfterCurrentBusStamp до конверта повідомлення :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// src/Messenger/CommandHandler/RegisterUserHandler.php
namespace App\Messenger\CommandHandler;

use App\Entity\User;
use App\Messenger\Command\RegisterUser;
use App\Messenger\Event\UserRegistered;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp;

class RegisterUserHandler
{
    public function __construct(
        private MessageBusInterface $eventBus,
        private EntityManagerInterface $em,
    ) {
    }

    public function __invoke(RegisterUser $command): void
    {
        $user = new User($command->getUuid(), $command->getName(), $command->getEmail());
        $this->em->persist($user);

        // DispatchAfterCurrentBusStamp позначає, що повідомлення події має бути оброблено лише
        // якщо його обробник не викликає виключення

        $event = new UserRegistered($command->getUuid());
        $this->eventBus->dispatch(
            (new Envelope($event))
                ->with(new DispatchAfterCurrentBusStamp())
        );

        // ...
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// src/Messenger/EventSubscriber/WhenUserRegisteredThenSendWelcomeEmail.php
namespace App\Messenger\EventSubscriber;

use App\Entity\User;
use App\Messenger\Event\UserRegistered;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Mailer\MailerInterface;
use Symfony\Component\Mime\RawMessage;

class WhenUserRegisteredThenSendWelcomeEmail
{
    public function __construct(
        private MailerInterface $mailer,
        EntityManagerInterface $em,
    ) {
    }

    public function __invoke(UserRegistered $event): void
    {
        $user = $this->em->getRepository(User::class)->find($event->getUuid());

        $this->mailer->send(new RawMessage('Welcome '.$user->getFirstName()));
    }
}

Це означає, що повідомлення UserRegistered не буде оброблено лише після завершення роботи обробника RegisterUserHandler і збереження нового User в базі даних. Якщо RegisterUserHandler зустріне виключення, подія UserRegistered ніколи не буде оброблена. І якщо виключення буде викликано під час надсилання привітального листа, транзакція Doctrine не буде відкочена.

Note

Якщо WhenUserRegisteredThenSendWelcomeEmail викликає виключення, це виключення буде обгорнуто у виключення DelayedMessageHandlingException. Використання DelayedMessageHandlingException::getWrappedExceptions дасть вам всі виключення, які виникають при обробці повідомлення з DispatchAfterCurrentBusStamp.

Проміжне програмне забезпечення dispatch_after_current_bus увімкнено за замовчуванням. Якщо ви конфігуруєте проміжне ПЗ вручну, обов'язково зареєструйте dispatch_after_current_bus перед doctrine_transaction у ланцюжку проміжного програмного забезпечення. Крім того, проміжне програмне забезпечення dispatch_after_current_bus має бути завантажено для всіх використовуваних автобусів.

Пов'язування обробників з різними транспортами

Кожне повідомлення може мати декілька обробників, і коли повідомлення споживається, викликаються всі його оброники. Але ви можете також сконфігурувати обробника так, щоб він викликався лише тоді, коли повідомлення отримане з конкретного транспорту. Це дозволяє вам мати одне повідомлення, де кожний обробник викликається різними "робітниками", що споживають різний транспорт.

Уявіть, що у вас є повідомлення UploadedImage з двома обробниками:

  • ThumbnailUploadedImageHandler: ви хочете, щоб цей оброблялося транспортом під назвою image_transport
  • NotifyAboutNewUploadedImageHandler: ви хочете, щоб цей оброблялося транспортом під назвою async_priority_normal

Щоб зробити це, додайте опцію from_transport до кожного обробника. Наприклад:

1
2
3
4
5
6
7
8
9
10
11
12
13
// src/MessageHandler/ThumbnailUploadedImageHandler.php
namespace App\MessageHandler;

use App\Message\UploadedImage;

#[AsMessageHandler(fromTransport: 'image_transport')]
class ThumbnailUploadedImageHandler
{
    public function __invoke(UploadedImage $uploadedImage): void
    {
        // do some thumbnailing
    }
}

І, схожим чином:

1
2
3
4
5
6
7
8
// src/MessageHandler/NotifyAboutNewUploadedImageHandler.php
// ...

#[AsMessageHandler(fromTransport: 'async_priority_normal')]
class NotifyAboutNewUploadedImageHandler
{
    // ...
}

Потім, переконайтеся, що "маршрутизуєте" ваше повідомлення до обох транспортів:

1
2
3
4
5
6
7
8
9
10
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async_priority_normal: # ...
            image_transport: # ...

        routing:
            # ...
            'App\Message\UploadedImage': [image_transport, async_priority_normal]

Ось і все! Тепер ви можете споживати кожний транспорт:

1
2
3
4
# викличе ThumbnailUploadedImageHandler лише при обробці повідомлення
$ php bin/console messenger:consume image_transport -vv

$ php bin/console messenger:consume async_priority_normal -vv

Caution

Якщо обробник не має конфігурації from_transport, він буде виконаний в кожному транспорті, з якого буде отримано це повідомлення.

Обробка повідомлень партіями

Ви можете оголосити «спеціальні» обробники, які будуть обробляти повідомлення партіями. Таким чином, обробник буде чекати, поки певної кількості повідомлень, перш ніж почати їх обробку. Оголошення обробника партіями здійснюється за допомогою реалізації BatchHandlerInterface.
BatchHandlerTrait також надано для того, щоб полегшити оголошення цих спеціальних обробників:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
use Symfony\Component\Messenger\Handler\Acknowledger;
use Symfony\Component\Messenger\Handler\BatchHandlerInterface;
use Symfony\Component\Messenger\Handler\BatchHandlerTrait;

class MyBatchHandler implements BatchHandlerInterface
{
    use BatchHandlerTrait;

    public function __invoke(MyMessage $message, Acknowledger $ack = null): mixed
    {
        return $this->handle($message, $ack);
    }

    private function process(array $jobs): void
    {
        foreach ($jobs as [$message, $ack]) {
            try {
                // Обчислити $result з $message...

                // Визнати обробку цього повідомлення
                $ack->ack($result);
            } catch (\Throwable $e) {
                $ack->nack($e);
            }
        }
    }

    // Як варіант, ви можете або перевизначити метод `shouldFlush()` цієї
    // риси, щоб визначити ваш власний розмір партії...
    private function shouldFlush(): bool
    {
        return 100 <= \count($this->jobs);
    }

    // ... або перевизначити метод `getBatchSize()`, якщо поведінка за замовчуванням
    // відповідає вашим потребам
    private function getBatchSize(): int
    {
        return 100;
    }
}

Note

Коли аргумент $ack __invoke() дорівнює null, повідомлення буде оброблено синхронно. В іншому випадку, __invoke() поверне кількість повідомлень, що очікують на обробку. BatchHandlerTrait обробляє це за вас.

Note

За замовчуванням, відкладені партії змиваються, коли працівник простоює, а також так само, як і коли він зупинений.

Розширення Messenger

Конверти та марки

Повідомлення може бути будь-яким PHP-об'єктом. Інколи вам може знадобитися сконфігурувати щось додаткове в повідомленні - на кшталт того, як воно має бути оброблене всередині AMQP або додавання затримки перед обробкою повідомлення. Ви можете зробити це, додавши марку ("stamp") до вашого повідомлення:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;

public function index(MessageBusInterface $bus): void
{
    $bus->dispatch(new SmsNotification('...'), [
        // wait 5 seconds before processing
        new DelayStamp(5000),
    ]);

    // ябо чітко створіть Конверт
    $bus->dispatch(new Envelope(new SmsNotification('...'), [
        new DelayStamp(5000),
    ]));

    // ...
}

Внутрішньо, кожне повідомлення огортається в конверт (Envelope), який містить повідомлення та марки. Ви можете створити його вручну або дозволити автобусу повідомлень зробити це. Існує багато різних марок для різних цілей і вони використовуються внутрішньо для відслідковування інформації про повідомлення - на кшталт того, який автобус обробляє його і чи має воно повторні спроби після невдачі.

Проміжкове ПЗ

Те, що відбувається після запуску повідомлення в автобус повідомлень, залежить від його набору проміжкового ПЗ та його порядку. За замовчуванням, проміжкове ПЗ, сконфігуроване для кожного атрибуту, виглядає так:

  1. add_bus_name_stamp_middleware - додає марку для запису того, в якому автобусі було запущено це повідомлення;
  2. dispatch_after_current_bus - див. Транзакційні повідомлення: обробляйте повідомлення після того, як обробку завершено;
  3. failed_message_processing_middleware - обробляє повідомлення, які мають повторні спроби через транспорт помилок , щоб вони правильно функціонували, ніби-то вони були отримані з початкового транспорту;
  4. Ваша власна колекція middleware;
  5. send_message - якщо маршрутизація сконфігурована для транспорту, відправляє повідомлення цьому трнаспорту та зупиняє ланцюжок проміжкового ПЗ;
  6. handle_message - викликає обробника(ів) повідомлень для заданого повідомлення.

Note

Ці назви проміжкового ПЗ - насправді скорочення. Справжні id сервісів мають префікс messenger.middleware. (наприклад,messenger.middleware.handle_message).

Проміжкове ПЗ виконується після запуску повідомлення, а також ще раз, коли повідомлення отримане через робітника (для повідомлень, які були відправлені транспорту для асинхронної обробки). Пам'ятайте це, якщо ви створите власне проміжкове ПЗ.

Ви можете додати власне проміжкове ПЗ в список, або повністю відключити проміжкове ПЗ за замовчуванням і додати лише ваше власне:

Якщо сервіс проміжного програмного забезпечення є абстрактним, ви можете сконфігурувати аргументи його конструктора і для кожного автобуса буде створено окремий екземпляр.

1
2
3
4
5
6
7
8
9
10
11
12
13
# config/packages/messenger.yaml
framework:
    messenger:
        buses:
            messenger.bus.default:
                # відключити проміжкове ПЗ за замовчуванням
                default_middleware: false

                # та/або додати ваше власне
                middleware:
                    # id сервісів, що реалізують Symfony\Component\Messenger\Middleware\MiddlewareInterface
                    - 'App\Middleware\MyMiddleware'
                    - 'App\Middleware\AnotherMiddleware'

Проміжкове ПЗ для Doctrine

Якщо ви у своєму додатку використовуєте Doctrine, існує ряд необов'язкового проміжкового ПЗ, яке ви можете захотіти використати:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# config/packages/messenger.yaml
framework:
    messenger:
        buses:
            command_bus:
                middleware:
                    # кожний раз при обробці повідомлення, з'єднання Doctrine
                    # "пінгується" і повторно підключається, якщо воно закрите. Корисно,
                    # якщо ваші робітники працюють довгий час і з'єднання бази даних
                    # інколи втрачається
                    - doctrine_ping_connection

                    # Після обробки, з'єднання Doctrine закривається, що може
                    # звільнити з'єднання бази даних в робітнику, замість того,
                    # щоб утримувати їх відкритими завжди
                    - doctrine_close_connection

                    # огортає всіх обробників в одну транзакцію Doctrine
                    # обробникам не потрібно викликати flush(), а помилка в будь-якому
                    # обробнику викличе відкат
                    - doctrine_transaction

                    # або передати інший менеджер сутностей будь-якому
                    #- doctrine_transaction: ['custom']

Інше проміжкове ПЗ

Додайте проміжкове ПЗ router_context, якщо вам потрібно генерувати абсолютні URL у споживачі (наприклад, відображати шаблон з посиланнями). Це проміжкове ПЗ зберігає контекст початкового запиту (тобто хостинг, HTTP-порт і т.д.), що необхідно при створенні абсолютних URL.

Додайте проміжкове ПЗ validation, якщо вам потрібно валідувати обʼєкт повідомлення, використовуючи компонент Validator перед його обробкою. Якщо валідація невдала, буде викликано ValidationFailedException. ValidationStamp може бути використано для конфігурації груп валідації.

1
2
3
4
5
6
7
8
# config/packages/messenger.yaml
framework:
    messenger:
        buses:
            command_bus:
                middleware:
                    - router_context
                    - validation

Події Messenger

На додаток до проміжкового ПЗ, Messenger також запускає декілька подій. Ви можете створити слухача подій, щоб підключатися до різних частин процесу. Для кожного, клас подій буде назвою події:

Додаткові аргументи обробника

Існує можливість змусити месенджер передавати додаткові дані до обробника повідомлень за допомогою HandlerArgumentsStamp. Додайте цей штамп до конверта у проміжному програмному забезпеченні і заповніть його будь-якими додатковими даними, які ви хочете мати у обробнику:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// src/Messenger/AdditionalArgumentMiddleware.php
namespace App\Messenger;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\HandlerArgumentsStamp;

final class AdditionalArgumentMiddleware implements MiddlewareInterface
{
    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    {
        $envelope = $envelope->with(new HandlerArgumentsStamp([
            $this->resolveAdditionalArgument($envelope->getMessage()),
        ]));

        return $stack->next()->handle($envelope, $stack);
    }

    private function resolveAdditionalArgument(object $message): mixed
    {
        // ...
    }
}

Тоді ваш обробник виглядатиме так:

1
2
3
4
5
6
7
8
9
10
11
12
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;

use App\Message\SmsNotification;

final class SmsNotificationHandler
{
    public function __invoke(SmsNotification $message, mixed $additionalArgument)
    {
        // ...
    }
}

Серіалізатор повідомлень для користувацьких форматів даних

Якщо ви отримуєте повідомлення з інших додатків, можливо, вони не у тому форматі, який вам потрібен. Не всі додатки повертатимуть повідомлення у форматі JSON з полями body і headers. У таких випадках вам потрібно створити новий серіалізатор повідомлень, що реалізує SerializerInterface. Припустимо, ви хочете створити дешифратор повідомлень:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
namespace App\Messenger\Serializer;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

class MessageWithTokenDecoder implements SerializerInterface
{
    public function decode(array $encodedEnvelope): Envelope
    {
        $envelope = \json_decode($encodedEnvelope, true);

        try {
            // аналізувати дані, які ви отримали з вашими користувацькими полями
            $data = $envelope['data'];
            $data['token'] = $envelope['token'];

            // інші операції, на кшталт отримання інформації зі штампів
        } catch (\Throwable $throwable) {
            // обгорнути будь-яке виключення, яке може виникнути в конверті, щоб відравити його до транспорту невдач
            return new Envelope($throwable);
        }

        return new Envelope($data);
    }

    public function encode(Envelope $envelope): array
    {
        // цей дешифратор не шифрує повідомлення, але ви можете реалізувати це, повернувши
        // масив з сериалізованими штампами, якщо вам потрібно відправляти повідомлення у користувацькому форматі
        throw new \LogicException('This serializer is only used for decoding messages.');
    }
}

Наступним кроком буде вказати Symfony використовувати цей серіалізатор в одному або декількох ваших транспортах:

1
2
3
4
5
6
7
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            my_transport:
                dsn: '%env(MY_TRANSPORT_DSN)%'
                serializer: 'App\Messenger\Serializer\MessageWithTokenDecoder'

Декілька автобусів, автобуси команд та подій

Messenger надає вам один сервіс автобусу повідомлень за замовчуванням. Але ви можете сконфігурувати стільки, скільки ви хочете, створивши автобуси "команд", "запитів" або "подій", та контролюючи їх проміжкове ПЗ. Див. Декілька автобусів.

Поширеною архітектурою при створенні додатків є відокремлення команд від запитів. Команди - це дії, які щось роблять, а запити отримують дані. Це називається CQRS (Command Query Responsibility Segregation). Дивіться статтю Мартіна Фаулера про CQRS, щоб дізнатися більше. Цю архітектуру можна використовувати разом з компонентом Messenger, визначивши декілька автобусів.

Автобус команд дещо відрізняється від автобуса запитів. Наприклад, автобуси команд зазвичай не надають жодних результатів, а автобуси запитів рідко бувають асинхронними. Ви можете сконфігурувати ці автобуси та їхні правила за допомогою проміжного програмного забезпечення.

Також може бути гарною ідеєю відокремити дії від реакцій, ввівши автобус подій. Автобус подій може мати нуль або більше підписників.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
framework:
    messenger:
        # Автобус, яакий буде впроваджено при впровадженні MessageBusInterface
        default_bus: command.bus
        buses:
            command.bus:
                middleware:
                    - validation
                    - doctrine_transaction
            query.bus:
                middleware:
                    - validation
            event.bus:
                default_middleware:
                    enabled: true
                    # встановити "allow_no_handlers" як (за замовчуванням - false), щоб дозволити
                    # відсутність сконфігурованого обробника для цього автобуса, без виклику виключення
                    allow_no_handlers: false
                    # встановити "allow_no_senders" як false (за замовчуванням - true), щоб викликати виключення,
                    # якщо для цього автобуса не сконфігуровано відправника
                    allow_no_senders: true
                middleware:
                    - validation

У результаті буде створено три нові сервіси:

  • command.bus: автоматично монтується з підказкою MessageBusInterface (оскільки це default_bus);
  • query.bus: автоматично монтується з MessageBusInterface $queryBus;
  • event.bus: автоматично монтується з MessageBusInterface $eventBus.

Обмеження обробників за автобусом

За замовчуванням, кожен обробник буде доступний для обробки повідомлень в усіх ваших автобусах. Щоб запобігти відправленню повідомлення у неправильний автобус без помилки, ви можете обмежити кожен обробник певним автобусом за допомогою тегу messenger.message_handler:

1
2
3
4
# config/services.yaml
services:
    App\MessageHandler\SomeCommandHandler:
        tags: [{ name: messenger.message_handler, bus: command.bus }]

Таким чином, обробник App\MessageHandler\SomeCommandHandler буде відомий лише автобусу command.bus.

Ви також можете автоматично додати цей тег до низки класів за допомогою _конфігурації сервісу instanceof . Використовуючи цей тег, ви можете визначити автобус повідомлень на основі реалізованого інтерфейсу:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# config/services.yaml
services:
    # ...

    _instanceof:
        # всі сервіси, що реалізують CommandHandlerInterface, будуть
        # зареєстровані в автобусі command.bus
        App\MessageHandler\CommandHandlerInterface:
            tags:
                - { name: messenger.message_handler, bus: command.bus }

        # в той час як ті, що реалізують QueryHandlerInterface, будуть
        # зареєстровані в автобусі query.bus
        App\MessageHandler\QueryHandlerInterface:
            tags:
                - { name: messenger.message_handler, bus: query.bus }

Налагодження автобусів

Команда debug:messenger показує список доступних повідомлень та обробників для кожного автобуса. Ви також можете обмежити список певним автобусом, вказавши його назву як аргумент.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
$ php bin/console debug:messenger

  Messenger
  =========

  command.bus
  -----------

   Наступні повідомлення можуть бути розгорнуті:

   ---------------------------------------------------------------------------------------
    App\Message\DummyCommand
        handled by App\MessageHandler\DummyCommandHandler
    App\Message\MultipleBusesMessage
        handled by App\MessageHandler\MultipleBusesMessageHandler
   ---------------------------------------------------------------------------------------

  query.bus
  ---------

   Наступні повідомлення можуть бути розгорнуті:

   ---------------------------------------------------------------------------------------
    App\Message\DummyQuery
        handled by App\MessageHandler\DummyQueryHandler
    App\Message\MultipleBusesMessage
        handled by App\MessageHandler\MultipleBusesMessageHandler
   ---------------------------------------------------------------------------------------

Tip

Команда також покаже PHPDoc опис класів повідомлення та обробника.

Розгортання повідомлення

Якщо ви хочете повторно розгорнути повідомлення (використовуючи той самий транспорт і конверт), створіть новий RedispatchMessage і відправте його через ваш автобус. Повторне використання того самого прикладу SmsNotification, показаного раніше:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;

use App\Message\SmsNotification;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Message\RedispatchMessage;
use Symfony\Component\Messenger\MessageBusInterface;

#[AsMessageHandler]
class SmsNotificationHandler
{
    public function __construct(private MessageBusInterface $bus)
    {
    }

    public function __invoke(SmsNotification $message): void
    {
        // зробити щось з повідомленням
        // потім повторно розгорнути його, засновуючись на вашій власній логіці

        if ($needsRedispatch) {
            $this->bus->dispatch(new RedispatchMessage($message));
        }
    }
}

Вбудований RedispatchMessageHandler подбає про те, щоб це повідомлення було повторно відправлено по тому самому автобусу, по якому воно було відправлено спочатку. Ви також можете використовувати другий аргумент конструктора RedispatchMessage, щоб вказати транспорт, який буде використано при повторному розгортанні повідомлення.