Messenger: робота з синхронізованими повідомленнями та повідомленнями у черзі

Дата оновлення перекладу 2022-05-23

Messenger: робота з синхронізованими повідомленнями та повідомленнями у черзі

Messenger надає автобус повідомлень з можливістю відправлення повідомлень, а потім роботи з ними одразу ж у вашому додатку, або їх відправлення через транспорт (наприклад, чергу) для обробки пізніше. Щоб дізнатися про це більше, прочитайте документацію компонента Messenger.

Установка

У додатках, що використовують Symfony Flex , виконайте цю команду, щоб встановити Messenger:

1
$ composer require symfony/messenger

Створення повідомлення та обробника

Messenger будується на двох різних класах, які ви створите: (1) класі повідомлення, який містить дані, та (2) класі обробника(ів), який буде викликано після запуску повідомлення. Клас обробника читатиме клас повідомлення та виконувати якусь дію.

До класу повідомлення немає особливих вимог, окрім того, що його можна серіалізувати:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// src/Message/SmsNotification.php
namespace App\Message;

class SmsNotification
{
    private $content;

    public function __construct(string $content)
    {
        $this->content = $content;
    }

    public function getContent(): string
    {
        return $this->content;
    }
}

Обробник повідомлень - це PHP-викликане, рекомендований спосіб його створення - створити клас, який реалізує атрибут AsMessageHandler та має метод __invoke(), який має підказки класу повідомлення (або інтерфейс повідомлення):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;

use App\Message\SmsNotification;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
class SmsNotificationHandler
{
    public function __invoke(SmsNotification $message)
    {
        // ... зробіть щось - на кшталт відправки SMS!
    }
}

Tip

Ви також можете використати атрибут #[AsMessageHandler] в окремих методах класу. Ви можете використовувати атрибут у стількох методах одного класу, скількох ви захочете, що дозволяє вам групувати обробку багатьох повʼязаних типів повідомлень.

6.1

Підтримка для #[AsMessageHandler] у методах була представлена в Symfony 6.1.

Завдяки автоконфігурації та підказці SmsNotification, Symfony знає, що цей обробник має бути викликаний, коли запускається повідомлення SmsNotification. У більшості випадків, це все, що вам треба буде зробити. Але ви можете також сконфігурувати обробники повідомлень вручну . Щоб побачити всі сконфігуровані обробники, виконайте:

1
$ php bin/console debug:messenger

Оголошення повідомлення

Ви готові! Для запуску повідомлення (та виклику обробника), впровадьте сервіс messenger.default_bus (через MessageBusInterface), наприклад, у контролер:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// src/Controller/DefaultController.php
namespace App\Controller;

use App\Message\SmsNotification;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\Messenger\MessageBusInterface;

class DefaultController extends AbstractController
{
    public function index(MessageBusInterface $bus)
    {
        // призведе до виклику SmsNotificationHandler
        $bus->dispatch(new SmsNotification('Look! I created a message!'));

        // ...
    }
}

Транспорт: асинхронні повідомлення/повідомлення в черзі

За замовчуванням, повідомлення обробляються одразу ж після запуску. Якщо ви хочете обробити повідомлення асинхронно, ви можете сконфігурувати транспорт. Транспорт поже відправляти повідомлення (наприклад, в систему черги), а потім отримувати через робітника . Messenger підтримує декілька транспортів .

Note

Якщо ви хочете використати транспорт, який не підтримується, подивіться на транспорт Enqueue, який підтримує речі на кшталт Kafka і Google Pub/Sub.

Транспорт реєструється з використанням "DSN". Завдяки рецепту Flex для Messenger, ваш файл .env вже має декілька прикладів.

1
2
3
# MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages
# MESSENGER_TRANSPORT_DSN=doctrine://default
# MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages

Розкоментуйте той транспорт, який ви хочете (або встановіть його в .env.local). Див. , щоб дізнатися більше деталей.

Далі, в config/packages/messenger.yaml, давайте визначимо транспорт під назвою async, який використовує цю конфігурацію:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async: "%env(MESSENGER_TRANSPORT_DSN)%"

            # або розширено для конфігурації більшої кількості опцій
            #async:
            #    dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
            #    options: []

Маршрутизація повідомлень до транспорту

Тепер, коли у вас є сконфігурований транспорт, замість негайної обробки повідомлень, ви можете сконфігурувати їх так, щоб вони були відправлені транспорту:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async: "%env(MESSENGER_TRANSPORT_DSN)%"

        routing:
            # async - це те ім'я, яке ви дали вашому транспорту вище
            'App\Message\SmsNotification': async

Завдяки цьому, App\Message\SmsNotification буде відправлено транспорту async, і його обробник(и) не будуть викликані одразу ж. Всі повідомлення, не співпавші з routing будуть оброблені негайно.

Note

Ви можете використати '*' як клас повідомлення. Це діятиме як правило маршрутизації за замовчуванням для будь-якого повідомлення, яке не співпало з routing. Це корисно для того, щоб гарантувати, що жодне повідомлення не буде оброблено синхронно за замовчуванням.

Єдиним недоліком є те, що '*' буде також застосовано до електронних листів, відправлених за допомогою Symfony Mailer (який використовує SendEmailMessage, якщо доступний Messenger). Це може призвести до проблем, якщо ваші листи не серіалзовувані (наприклад, якщо вони містять вкладення файлів в якості PHP джерел/потоків).

Ви також можете маршрутизувати класи за їхнім батьківським класом або інтерфейсом. Або відправляти повідомлення декільком транспортам:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
# config/packages/messenger.yaml
framework:
    messenger:
        routing:
            # маршрутизуйте всі повідомлення, що розширюють цей приклад базового класу або інтерфейс
            'App\Message\AbstractAsyncMessage': async
            'App\Message\AsyncMessageInterface': async

            'My\Message\ToBeSentToTwoSenders': [async, audit]

Note

Якщо ви сконфігуруєте маршрутизація і для дочірнього, і для батьківського класу, використовуються обидва правила. Наприклад, якщо у вас є об'єкт SmsNotification, що розширюється з Notification, будуть використані маршрутизації і для Notification, і для SmsNotification.

Сутності Doctrine у повідомленнях

Якщо вам потрібно передати сутність Doctrine у повідомленні, краще за все передати основний ключ сутності (або будь-яку релевантну інформацію, необхідну обробнику, на кшталт email, та ін.) заміть об'єкта:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// src/Message/NewUserWelcomeEmail.php
namespace App\Message;

class NewUserWelcomeEmail
{
    private $userId;

    public function __construct(int $userId)
    {
        $this->userId = $userId;
    }

    public function getUserId(): int
    {
        return $this->userId;
    }
}

Потім, у вашому обробнику, ви можете зробити запит свіжого об'єкта:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// src/MessageHandler/NewUserWelcomeEmailHandler.php
namespace App\MessageHandler;

use App\Message\NewUserWelcomeEmail;
use App\Repository\UserRepository;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
class NewUserWelcomeEmailHandler
{
    private $userRepository;

    public function __construct(UserRepository $userRepository)
    {
        $this->userRepository = $userRepository;
    }

    public function __invoke(NewUserWelcomeEmail $welcomeEmail)
    {
        $user = $this->userRepository->find($welcomeEmail->getUserId());

        // ... відправити електронний лист!
    }
}

Це гарантує, що сутність містить свіжі дані.

Синхронна обробка повідомлень

Якщо повідомлення не співпадає з жодним правилом маршрутизації , воно не буде відправлене жодному транспорту, і буде оброблене негайно. В деяких випадках (наприклад, при пов'язуванні обробників з різними транспортами), легше та гнучкіше обробити їх чітко: створивши транспорт sync та "відправивши" повідомлення у нього для негайної обробки:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            # ... інші транспорти

            sync: 'sync://'

        routing:
            App\Message\SmsNotification: sync

Створення вашого власного транспорту

Ви також можете створити власний транспорт, якщо вам потрібно відправляти або отримувати повідомлення через щось, що не підтримується. Див. Как создать ваш собственный транспорт сообщений.

Споживання повідомлень (запуск робітника)

Коли ваші повідомлення маршрутизовані, в більшості випадків, вам треба буде "споживати" їх. Ви можете зробити це за допомогою команди messenger:consume:

1
2
3
4
$ php bin/console messenger:consume async

# використайте -vv, щоб побачити деталі того що відбувається
$ php bin/console messenger:consume async -vv

Перший аргумент - це ім'я отримувача (або id сервісу, якщо ви маршрутизували до користувацького сервісу). За замовчуванням, команда буде виконуватися нескінченно: шукати нові повідомлення у вашому транспорті та обробляти їх. Ця команда називається вашим "робітником".

Tip

Щоб правильно зупинити робітника, викликайте екземпляр StopWorkerException.

Розгортання у виробництві

У виробництві є декілька важливих речей, про які варто подумати:

Використовуйте менеджер процесів на кшталт Supervisor або systemd, щоб ваш(і) робітник(и) працювали
Ви хочете, щоб один або більше "робітників" працювали весь час. Щоб зробити це, використайте систему контролю процесу на кшталт Supervisor або systemd .
Не дозволяйте робітникам працювати нескінченно
Деякі сервіси (на кшталт EntityManager Doctrine) будуть споживати все більше пам'яті з часом. Тому замість того, щоб дозволяти вашому робітнику працювати завжди, використовуйте прапорець на кшталт messenger:consume --limit=10, щоб вказати робітнику, що він має обробити лише 10 повідомлень до припинення роботи (потім Супервізор створить новий процес). Також є інші опції на кшталт --memory-limit=128M и --time-limit=3600.
Зупиняйте робітників, які стикаються з помилками
Якщо залежність робітника типу ваш сервер бази даних не працює, або досягнуто тайм-ауту, ви можете спробувати додати логіку повторного зʼєднання , або просто зупинити робітника, якщо він отримує забагато помилок, за допомогою опції --failure-limit команди messenger:consume.
Перезавантажуйте робітників при розгортанні
Кожний раз при розгортанні вам треба буде перезавантажити всі процеси ваших робітників, щоб вони бачили новозапущений код. Щоб зробити це, виконайте messenger:stop-workers при запуску. Це сигналізує кожному робітнику, що він має завершити обробку поточного повідомлення та граціозно завершити роботу. Потім, Супервізор створить нові процеси робітників. Команда використовує кеш app внутрішньо - тому переконайтеся в тому, що він сконфігурований для використання адаптеру за вашим смаком.
Використовуйте одиин кеш між розгортаннями
Якщо ваша стратегія розгортання полягає у створенні нових цільових каталогів, вам необхідно встановити значення опції конфігурації cache.prefix.seed , щоб використовувати один і теж простір імен кешу між запусками. Інакше, пул cache.app буде використовувати значення параметру kernel.project_dir в якості бази для простору імен, що призведе до різних просторів імен кожний раз при запуску.

Пріоритизований транспорт

Іноді певні типи повідомлень повинні мати вищий пріоритет та бути оброблені до інших. Щоб зробити це можливим, ви можете створити декілька транспортів та маршрутизувати різні повідомлення до них. Наприклад:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async_priority_high:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    # queue_name відповідає транспорту doctrine
                    queue_name: high

                    # для AMQP відправте окремий обмін, а потім поставте у чергу
                    #exchange:
                    #    name: high
                    #queues:
                    #    messages_high: ~
                    # або спробуйте redis "group"
            async_priority_low:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    queue_name: low

        routing:
            'App\Message\SmsNotification':  async_priority_low
            'App\Message\NewUserWelcomeEmail':  async_priority_high

Потім ви зможете запускати окремних робітників для кожного транспорту, або інструктувати одного робітника, щоб він обробляв повідомлення в порядку пріоритетності:

1
$ php bin/console messenger:consume async_priority_high async_priority_low

Робітник завжди спочатку шукатиме повідомлення, які очікують async_priority_high. Якщо таких немає, потім він споживатиме повідомлення з async_priority_low.

Обмежте споживання для конкретних черг

Деякий транспорт (особливо AMQP) має концепцію обміну та черг. Транспорт Symfony завжди пов'язаний з обміном. За замовчуванням, робітник споживає з усіх черг, з'єднаних з обміном вказаного транспорту. Однак є приклади використання, коли вам потрібно, щоб робітник споживав лише з конкретної черги.

Ви можете обмежити робітника, щоб він обробляв лише повідомлення з конкретних черг:

1
2
3
$ php bin/console messenger:consume my_transport --queues=fasttrack
# ви можете передати опцію --queues більше одного разу, щоб обробляти багато черг
$ php bin/console messenger:consume my_transport --queues=fasttrack1 --queues=fasttrack2

Щоб дозволити використання опції queues, отримувач має реалізовувати QueueReceiverInterface.

Перевірка кількості повідомлень у черзі у транспорті

Виконайте команду messenger:stats, щоб дізнатися, скільки повідомлень знаходяться у "чергах" якогось або всього транспорту:

1
2
3
4
5
# відображає кількість повідомлень у черзі у всьому транспорті
$ php bin/console messenger:stats

# показує статистику лише для деякого транспорту
$ php bin/console messenger:stats my_transport_name other_transport_name

Note

Для того, щоб ця команда працювала, сконфігурований отримувач транспорту має реалізовувати MessageCountAwareInterface.

6.2

Команда messenger:stats була представлена в Symfony 6.2.

Конфігурація Supervisor

Supervisor - це чудовий інструмент для гарантії того, що процес(и) ваших робітників завжди виконується (навіть якщо він закривається у зв'язку з помилкою, досягненням ліміту повідомлень або завдяки messenger:stop-workers). Ви можете встановити його на Ubuntu, наприклад, через:

1
$ sudo apt-get install supervisor

Файли конфігурації Supervisor зазвичай живуть в каталозі /etc/supervisor/conf.d. Наприклад, ви можете створити там новий файл messenger-worker.conf, щоб переконатися, що 2 екземпляри messenger:consume працюють завжди:

1
2
3
4
5
6
7
8
9
;/etc/supervisor/conf.d/messenger-worker.conf
[program:messenger-consume]
command=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600
user=ubuntu
numprocs=2
startsecs=0
autostart=true
autorestart=true
process_name=%(program_name)s_%(process_num)02d

Змініть аргумент async, щоб він використовував назву вашого транспорту (або транспортів) та user на Unix-користувача на вашому сервері.

Caution

Під час розгортання, щось може бути недоступним (наприклад, база даних), що викличе помилку початку роботи споживача. У такій ситуації, Supervisor спробує startretries декілька разів, щоб перезапустити команду. Переконайтеся в тому, що змінили це налаштування, щоб уникнути отримання команди у стані FATAL, яка ніколи не запуститься знову.

Кожний перезапуск Supervisor збільшує затримку на 1 секунду. Наприклад, якщо значення - 10, він зачекає 1 с, 2 с, 3 с та далі. Це надає сервісу загалом 55 секунд, щоб знову стати доступним. Збільшіть налаштування startretries, щоб охопити максимум очікуваного часу простою.

Якщо ви використовуєте транспорт Redis, відмітьте, що кожному робітнику необхідне унікальне ім'я споживача, щоб уникнути обробки одного повідомлення декількома робітниками. Один зі способів досягнути цього - встановити змінну середовища в файлі конфігурації Супервізора, на яку ви потім зможете послатися в messenger.yaml (див. розділ Redis вище):

1
environment=MESSENGER_CONSUMER_NAME=%(program_name)s_%(process_num)02d

Далі, вкажіть Supervisor прочитати вашу конфігурацію та запустити ваших робітників:

1
2
3
4
5
$ sudo supervisorctl reread

$ sudo supervisorctl update

$ sudo supervisorctl start messenger-consume:*

Див. документацію Supervisor, щоб дізнатися більше деталей.

Граціозне вимкнення

Якщо ви встановили у своєму проекті PHP-розширення PCNTL, робітники будуть обробляти POSIX сигнал SIGTERM для завершення обробки поточного повідомлення перед виходом.

У деяких випадках, сигнал SIGTERM відправляється самим Супервізором (наприклад, зупинка контейнера Docker в якому Супервізор - точка входу). В таких випадках, вам треба додати ключ stopwaitsecs до конфігурації програми (зі значенням бажаного періоду відстрочки в секундах) для того, щоб виконати граціозне вимкнення:

1
2
[program:x]
stopwaitsecs=20

Конфігурація Systemd

Хоча Supervisor є чудовим інструментом, його недолік в тому, що вам потрібен системний доступ, щоб його запустити. Systemd став стандартом у більшості розповсюджень Linux, і має гарну альтернативу під назвою сервіси користувача.

Файли конфігурації сервісу користувача Systemd зазвичай живуть у каталозі ~/.config/systemd/user. Наприклад, ви можете створити новий файл messenger-worker.service. Або файл messenger-worker@.service, якщо ви хочете, щоб більше екземплярів працювали одночасно:

1
2
3
4
5
6
7
8
9
10
[Unit]
Description=Symfony messenger-consume %i

[Service]
ExecStart=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600
Restart=always
RestartSec=30

[Install]
WantedBy=default.target

Тепер, скажіть systemd підключити та запустити одного робітника:

1
2
3
4
5
6
$ systemctl --user enable messenger-worker@1.service
$ systemctl --user start messenger-worker@1.service

# щоб підключити та запустити 20 робітників
$ systemctl --user enable messenger-worker@{1..20}.service
$ systemctl --user start messenger-worker@{1..20}.service

Якщо ви зміните ваш файл конфігурації сервісу, вам потрібно перезавантажити daemon:

1
$ systemctl --user daemon-reload

Щоб перезапустити всіх ваших споживачів:

1
$ systemctl --user restart messenger-consume@*.service

Екземпляр користувача systemd запускається лише після першого входу у систему конкретним користувачем. Споживачу, натомість, часто потрібно запускатися під час початкового завантаження системи. Підключіть зволікання у користувачі, щоб активувати цю поведінку:

1
$ loginctl enable-linger <your-username>

Логами управляє journald і з ними можна працювати з використанням команди journalctl:

1
2
3
4
5
6
7
8
# слідувати логам споживача номер 11
$ journalctl -f --user-unit messenger-consume@11.service

# слідувати логам усіх споживачів
$ journalctl -f --user-unit messenger-consume@*

# слідувати всім логам з ваших сервісів користувача
$ journalctl -f _UID=$UID

Дивіться документацію systemd, щоб дізнатися більше.

Note

Вам потрібні або підвищені привілеї для команди journalctl, або додати вашого користувача до групи systemd-journal:

1
$ sudo usermod -a -G systemd-journal <your-username>

Робітник без стану

PHP створений бути без стану, різни запити не мають спільних джерел. В HTTP-контексті PHP очищує все перед відправленням відповіді, тому ви можете вирішити не турбуватися про сервіси, які можуть допускати витік пам'яті.

З іншої сторони, робітники зазвичай працюють у довгострокових CLI-процесах, які не завершуються після обробки повідомлення. Тому вам треба бути обережними зі станами сервісів, щоб вони не давали витік інформації та/або пам'яті з одного повідомлення у інше.

Однак, деякі сервіси Symfony, на кшталт обробника fingers crossed Monolog, допускають витік за своїм задумом. Symfony надає функцію перезавантаження сервісу, щоб вирішити цю проблему. При автоматично перезавантаженні контейнера між двома повідомленнями, Symfony шукає будь-які сервіси, що реалізують ResetInterface (включно з вашими власними сервісами) і викликає їх метод reset(), щоб вони могли очистити свій внутрішній стан.

Якщо сервіс не без стану і ви хочете перезавантажувати його властивості після кожного повідомлення, тоді сервіс має реалізовувати ResetInterface, де ви можете перезавантажувати властивості у методі reset().

Якщо ви не хочете перезавантажувати контейнер, додайте опцію --no-reset при виконанні команди messenger:consume.

6.1

У версіях Symfony до 6.1, сервіс-контейнер автоматично не перезавантажувався між повідомленнями і вам потрібно було встановлювати опцію framework.messenger.reset_on_message як true.

Повторні спроби та помилки

Якщо під час споживання повідомлення з транспорту буде викликано виключення, воно буде автоматично повторно відправлене транспорту, щоб спробувати знову. За замовчуванням, повідомлення має 3 спроби перед скиданням або відправкою транспорту помилок . Кожна спроба буде також відкладена в часі, на випадок, якщо вона була викликана тимчасовою проблемою. Все це можна сконфігурувати для кожного транспорту:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async_priority_high:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'

                # конфігурація за замовчуванням
                retry_strategy:
                    max_retries: 3
                    # затримка в мілісекундах
                    delay: 1000
                    # робить так, щоб затримка була довшою перед кожною наступною спробою
                    # наприклад, затримка в 1 секунду, 2 секунди, 4 секунди
                    multiplier: 2
                    max_delay: 0
                    # перевизначити це все сервісом, який реалізує
                    # Symfony\Component\Messenger\Retry\RetryStrategyInterface
                    # service: null

Tip

Symfony запускає WorkerMessageRetriedEvent, коли повідомлення має повторні спроби, тому ви можете виконати власну логіку.

Note

Завдяки SerializedMessageStamp, серіалізована форма повідомлення зберігається, що запобігає його повторній серіалізації, якщо відбувається повторна спроба повідомлення.

6.1

Клас SerializedMessageStamp було представлено в Symfony 6.1.

Уникання повторних спроб

Іноді обробка повідомлення може бути невдалою, і ви будете знати, що це перманентно і повторних спроб не потрібно. Якщо ви викличете UnrecoverableMessageHandlingException, повідомлення не матиме повторних спроб.

Форсування повторних спроб

Іноді обробка повідомлення може бути невдалою, і ви будете знати, що це тимчасово, і необхідна повторна спроба. Якщо ви викличете RecoverableMessageHandlingException, повідомлення завжди матиме повторну спробу.

Збереження та повторні спроби невдалих повідомлень

Якщо повідомлення зазнає невдачі та має декілька повторних спроб (max_retries), воно потім буде відбраковане. Щоб уникнути цього, ви можете сконфігурувати failure_transport:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
# config/packages/messenger.yaml
framework:
    messenger:
        # після повторних спроб, повідомлення будуть відправлені транспорту "failed"
        failure_transport: failed

        transports:
            # ... інший транспорт

            failed: 'doctrine://default?queue_name=failed'

У цьому прикладі, якщо обробка повідомлення зазнає невдачі 3 рази (значення за замовчуванням max_retries), воно потім буде відправлене транспорту failed. Хоча ви можете використати messenger:consume failed для споживання його, як звичайного транспорту, вам скоріше захочеться вручну переглянути повідомлення в транспорті помилок та вирішити щодо повторних спроб:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# побачити всі повідомлення в транспорті помилок
$ php bin/console messenger:failed:show

# побачити перших 10 повідомлень
$ php bin/console messenger:failed:show --max=10

# побачити лише повідомлення MyClass
$ php bin/console messenger:failed:show --class-filter='MyClass'

# побачити кількість повідомлень класу повідомлення
$ php bin/console messenger:failed:show --stats

# побачити деталі конкретної помилки
$ php bin/console messenger:failed:show 20 -vv

# побачити та повторно спробувати кожне повідомлення окремо
$ php bin/console messenger:failed:retry -vv

# повторна спроба конкретних повідомлень
$ php bin/console messenger:failed:retry 20 30 --force

# видалити повідомлення без повторної спроби
$ php bin/console messenger:failed:remove 20

# видалити повідомлення без повторних спроб та показати кожне повідомлення перед видаленням
$ php bin/console messenger:failed:remove 20 30 --show-messages

6.2

Опції --class-filter та --stats були представлені в Symfony 6.2.

Якщо повідомлення знов зазнає невдачі, воно буде відправлене назад у транспорт помилок у відповідності зі звичайними правилами повторних спроб . Як тільки буде досягнено максимум повторних спроб, повідомлення буде скинуте перманентно.

Декілька помилкових транспортів

Іноді недостатньо мати один глобальний сконфігурований failed transport, тому що деякі повідомлення важливіші за інші. В таких випадках ви можете перевизначити транспорт помилок лише для певних транспортів:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# config/packages/messenger.yaml
framework:
    messenger:
        # після повторних спроб повідомлення будуть відправлені транспорту "failed"
        # за замовчуванням, якщо всередині транспорту не сконфігурований "failed_transport"
        failure_transport: failed_default

        transports:
            async_priority_high:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                failure_transport: failed_high_priority

            # так як транспорт помилок не сконфігурований, буде використовуватися встановлений
            # глобальний набір "failure_transport"
            async_priority_low:
                dsn: 'doctrine://default?queue_name=async_priority_low'

            failed_default: 'doctrine://default?queue_name=failed_default'
            failed_high_priority: 'doctrine://default?queue_name=failed_high_priority'

Якщо немає визначеного failure_transport глобально або на рівні транспорту, повідомлення буде скинуте після певної кількості спроб.

Невдалі команди мають необов'язкову опцію --transport, щоб вказати failure_transport, сконфігурований на рівні транспорту.

1
2
3
4
5
6
7
8
# побачити всі повідомлення в транспорті "failure_transport"
$ php bin/console messenger:failed:show --transport=failure_transport

# повторна спроба конкретних повідомлень з "failure_transport"
$ php bin/console messenger:failed:retry 20 30 --transport=failure_transport --force

# видалити повідомлення без повторної спроби з "failure_transport"
$ php bin/console messenger:failed:remove 20 --transport=failure_transport

Конфігурація транспорту

Messenger підтримує декілька різних типів транспорту, кожний зі своїми опціями. Опції можуть бути передані транспорту через рядок DSN або конфігурацію.

1
2
# .env
MESSENGER_TRANSPORT_DSN=amqp://localhost/%2f/messages?auto_setup=false
  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            my_transport:
                dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
                options:
                    auto_setup: false

Опції, визначені під options, панують над визначеними в DSN.

Транспорт AMQP

Транспорт AMQP використовує PHP-розширення AMQP для відправлення повідомлень в чергу на кшталт RabbitMQ.

1
$ composer require symfony/amqp-messenger

DSN транспорту AMQP може виглядати так:

1
2
3
4
5
# .env
MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages

# або використайте протокол AMQPS
MESSENGER_TRANSPORT_DSN=amqps://guest:guest@localhost/%2f/messages

Якщо ви хочете виокристати AMQP, закодований за допомогою TLS/SSL, ви маєте також надати CA-сертифікат. Визначіть шлях сертифікату в налаштуванні PHP.ini amqp.cacert (наприклад, amqp.cacert = /etc/ssl/certs) або в параметрі DSN cacert (наприклад, amqps://localhost?cacert=/etc/ssl/certs/).

Порт, за замовчуванням використовуваний AMQP, закодованим за допомогою TLS/SSL, - 5671, але ви можете перевизначити його в параметрі DSN port (наприклад, amqps://localhost?cacert=/etc/ssl/certs/&port=12345).

Note

За замовчуванням, транспорт автоматично створюватиме будь-які необхідні обміни, черги та сполучні ключі. Це можна відключити, але деяків функції можуть працювати некоректно (на кшталт відкладених черг).

Щоб не автоматично створювати жодних черг повторно, ви можете сконфігурувати транспорт за допомогою queues: [].

Note

Ви можете обмежити споживача транспорту AMQP, щоб він обробляв лише повідомлення з деяких черг обміну. Див. .

Транспорт має інші опції, включно із способами конфігурації обміну, сполучними ключами, чергами і т.д. Дивіться документацію Connection.

Транспорт має ряд опцій:

????? ???? ?? ?????????????
auto_setup ?? ??? ??????? ???? ???????? ??????????? ??? ??? ???????? / ?????????. true
cacert ???? ?? ????? CA-??????????? ? PEM-???????.  
cert ???? ?? ??????????? ??????? ? PEM-???????.  
channel_max ?????? ????????? ????? ???????, ??? ???????? ??????. 0 ??????? ??????????? ????? ?????????.  
confirm_timeout ????-??? ??? ????????????? ? ????????; ???? ?? ??????? ????????? ?? ??????????? ????????????? ????????????. ????????: 0 ??? ?????? ??????. ???? ???? ????????.  
connect_timeout ????-??? ?'???????. ????????: 0 ??? ?????? ??????. ???? ???? ????????.  
frame_max ?????????? ?????? ??????, ???? ???????? ?????? ??? ?'???????, ??????? ?? ?????????? ????? ?? ???-??????. 0 ??????? ??????????? ????? ?????????? (???????? ??? ?????? ??????? ?????? librabbimq ?? ?????????????)  
heartbeat ???????? ????????? ?'??????? (? ????????), ??? ???? ??????. 0 ???????, ?? ?????? ?? ???? ?????????. ?????????, ?? librabbimq ??? ???????? ????????? ?????????, ?? ???????, ?? ????????? ????????????? ???? ??? ??? ?????????? ????????.  
host ??'? ???????? AMQP-???????  
key ???? ?? ????? ??????? ? PEM-???????.  
password ?????? ??? ???????????? ??? ?'??????? ? AMQP-????????  
persistent   'false'
port ???? AMQP-???????  
prefetch_count    
read_timeout ????-??? ??????? ??????????. ????????: 0 ??? ?????? ??????. ???? ???? ????????.  
retry    
sasl_method    
connection_name ??? ?????????????? ???? ???????? (??????? ?????????? ?????? ?????????? PHP AMQP 1.10)  
verify ?????? ??? ??????? ??????????? ?????. ???? ??????????? ????????, ?? ???????? ??'? ? ??????????? ??????? ??? ?????????? ? ??'?? ???????. ??????????? ???????? ?? ?????????????.  
vhost ??????????? ??????? ??? ???????????? ? AMQP-????????  
write_timeout ????-??? ???????? ??????????. ????????: 0 ??? ?????? ??????. ???? ???? ????????.  
delay[queue_name_pattern] ??????, ???????????????? ??? ????????? ???? delay_%exchange_name%_%routing_key%_%delay%
delay[exchange_name] ??'? ??????, ??????????????? ??? ???????????/????????? ??????????? delays
queues[name][arguments] ????????? ?????????  
queues[name][binding_arguments] ?????????, ??????????????? ??? ???'???????? ?????.  
queues[name][binding_keys] ???????? ????? (???? ?) ??? ???'???????? ? ??????  
queues[name][flags] ???????? ????? AMQP_DURABLE
exchange[arguments] ????????? ????????? ??? ?????? (?????????,alternate-exchange)  
exchange[default_publish_routing_key] ???? ?????????????, ???????????????? ??? ??????????, ???? ??? ?? ???????? ? ????????????  
exchange[flags] ???????? ?????? AMQP_DURABLE
exchange[name] ????? ??????  
exchange[type] ??? ?????? fanout

6.1

Опція connection_name була представлена в Symfony 6.1.

Ви можете також сконфігурувати налаштування спеціально для AMQP у вашому повідомленні, додавши AmqpStamp до вашого Конверту:

1
2
3
4
5
6
7
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp;
// ...

$attributes = [];
$bus->dispatch(new SmsNotification(), [
    new AmqpStamp('custom-routing-key', AMQP_NOPARAM, $attributes),
]);

Caution

Споживачі не відображаються в панелі адміна, так як цей транспорт не покладається на \AmqpQueue::consume(), який блокує. Наявність блокуючого отримувача робить опції --time-limit/--memory-limit команди messenger:consume. а також команди messenger:stop-workers марними, так як вони покладаються на той факт, що отримувач повертається негайно, незалежно від того, знаходить він повідомлення, чи ні. Робітник споживання відповідає за ітерацію до отримання повідомлення для обробки та/або до того, як буде досягнена одна з умов зупинки. Таким чином, логіка зупинки робітника може бути досягнена, якщо він застряг на блокуючому виклику.

Транспорт Doctrine

Транспорт Doctrine може бути використаний для зберігання повідомлень в таблиці бази даних.

1
$ composer require symfony/doctrine-messenger

DSN транспорту Doctrine може виглядати так:

1
2
# .env
MESSENGER_TRANSPORT_DSN=doctrine://default

Формат doctrine://<connection_name>, у випадку, якщо у вас є декілька з'єднань і ви хочете використати якесь відмінне від "default". Транспорт буде автоматично створювати таблицю під назвою messenger_messages.

Або, для створення таблиці самостійно, встановіть опцію auto_setup як false, та згенеруйте міграцію .

Tip

Щоб уникнути того, що інструменти типу Doctrine Migrations намагаються видалити цю таблицю через те, що вона не є частиною вашої нормальної схеми, ви можете встановити опцію schema_filter:

  • YAML
  • XML
  • PHP
1
2
3
4
# config/packages/doctrine.yaml
doctrine:
    dbal:
        schema_filter: '~^(?!messenger_messages)~'

Caution

Властивість повідомлень datetime, яка зберігається в базі даних, використовує часову зону поточної системи. Це може викликати проблеми, якщо декілька машин з різними конфігураціями часових зон використовують одне сховище.

Транспорт має такі опції:

????? ???? ?? ?????????????
table_name ????? ??????? messenger_messages
queue_name ????? ????? (???????? ? ???????, ??? ??????????????? ???? ??????? ??? ????????? ???????????) default
redeliver_timeout ????-??? ????? ????????? ??????? ???????????? ? ?????, ??? ?? ? ????? "???????" (???? ???????? ?? ?????? ??????? ?????????, ????????? ??, ? ??????? ?? ????? ????? ?????????? ????????????) - ? ????????. 3600
auto_setup ?? ??? ??????? ???? ???????? ??????????? ??? ??? ?????????/?????????. true

Note

Встановіть redeliver_timeout у більшому значенні, ніж тривалість вашого найповільнішого повідомлення. Інакше, деякі повідомлення починатимуться вдруге, поки перше все ще обробляється.

При використанні PostgreSQL, у вас є доступ до наступних опцій для отримання переваг функції LISTEN/NOTIFY. Це дозволяє більш продуктивний підхід, ніж поведінка голосування транспорту Doctrine за замовчуванням, так як PostgreSQL буде напряму повідомляти робітників, коли нове повідомлення з'являтиметься в таблиці.

????? ???? ?? ?????????????
use_notify ?? ??????????????? LISTEN/NOTIFY. true
check_delayed_interval ???????? ????????? ??????????? ??????????? ? ????????????. ?????????? ?? 0, ??? ?????????? ?????????. 1000
get_notify_timeout ?????????? ???? ?????????? ????????? ??? ??????? PDO::pgsqlGetNotify`, ? ????????????. 0

Транспорт Beanstalkd

Транспорт Beanstalkd відправляє повідомлення прямо в робочу чергу Beanstalkd. Встановіть його, виконавши:

1
$ composer require symfony/beanstalkd-messenger

DSN транспорту Beanstalkd може виглядати так:

1
2
3
4
5
# .env
MESSENGER_TRANSPORT_DSN=beanstalkd://localhost:11300?tube_name=foo&timeout=4&ttr=120

# Якщо порту немає, він за замовчуванням буде 11300
MESSENGER_TRANSPORT_DSN=beanstalkd://localhost

Транспорт має ряд опцій:

????? ???? ?? ?????????????
tube_name ????? ????? default
timeout ????-??? ??????? ???????????? - ? ????????. 0 (??????? ?????? ?????? ? ??? ????????? ?????????, ??? ????????? TransportException)
ttr ??? ????????? ???????????? ????? ??? ?? ????????? ???? ????? ? ????? ?????????? - ? ????????. 90

Транспорт Redis

Транспорт Redis використовує потоки для створення черги повідомлень. Цей транспорт вимагає PHP-розширення Redis (>=4.3) та працюючого серверу Redis (^5.0).

1
$ composer require symfony/redis-messenger

DSN транспорту Redis може виглядати так:

1
2
3
4
5
6
7
8
# .env
MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
# Повний приклад DSN
MESSENGER_TRANSPORT_DSN=redis://password@localhost:6379/messages/symfony/consumer?auto_setup=true&serializer=1&stream_max_entries=0&dbindex=0&delete_after_ack=true
# Приклад кластеру Redis
MESSENGER_TRANSPORT_DSN=redis://host-01:6379,redis://host-02:6379,redis://host-03:6379,redis://host-04:6379
# Приклад Unix-сокету
MESSENGER_TRANSPORT_DSN=redis:///var/run/redis.sock

Деякі опції можуть бути сконфігуровані через DSN або ключ options під транспортом в messenger.yaml:

????? ???? ?? ?????????????
stream ????? ?????? Redis messages
group ????? ????? ?????????? Redis symfony
consumer ????? ?????????, ????????????????? ? Redis consumer
auto_setup ???????? ????? Redis ???????????? true
auth ?????? Redis  
delete_after_ack ???? true, ???????????? ????????????? ??????????? ????? ??????? false
delete_after_reject ???? true, ???????????? ????????????? ???????????, ???? ???? ???????????? true
lazy ?'?????????? ???? ???? ?'??????? ?????? ????????? false
serializer ?? ????????????? ???????? ???????????? ? Redis (????? Redis::OPT_SERIALIZER ) Redis::SERIALIZER_PHP
stream_max_entries ??????????? ????????? ???????, ?? ???? ???? ??????? ?????. ?????????? ????????? ?????? ????????, ??? ???????? ?????? "??????????" ??????????? 0 (??? ???????? "?? ???????")
tls ???????? TLS-????????? ?'??????? false
redeliver_timeout ????-??? ????? ????????? ??????? "???????????" ????????????, ??? ???????? ?????????? ????????? (???? ???????? ?? ?????? ??????? ?????, ????????? ??, ? ??????? ??? ????? ???? ???????? ???????? ???????????? ????????????) - ? ????????. 3600
claim_interval ???????? ? ???? ????? ?????????? "?????????"/???????? ???????????? - ? ???????????? 60000 (1 ??????)
persistent_id ?????, ???? null-????????? ?? ?????? non-persistent. null
retry_interval ???? ?????, ???????? ? ???????????? 0
read_timeout ????????, ???????? ? ????????, ?? ????????????? ??????? "??? ????????" 0
timeout ????????, ???????? ? ????????, ??. ????????????? ??????? "??? ????????" 0
sentinel_master ?????, ???? null ??? ??????, ????????????? ????????? Sentinel null

6.1

Опції persistent_id, retry_interval, read_timeout, timeout, та sentinel_master були представлені в Symfony 6.1.

Caution

Ніколи не повинно бути більше однієї команди messenger:consume, виконуваної з однаковою комбінацією stream, group та consumer, інашке повідомлення можуть бути оброблені більше, ніж один раз. Якщо ви запускаєте декілька робітників черги, consumer може бути встановлено як змінну оточення (на кшталт %env(MESSENGER_CONSUMER_NAME)%), встановлено Супервізором (приклад нижче) або будь-яким іншим сервісом, використовуваним для управління процесами робітників. В середовищі контейнера, HOSTNAME може бути використано як ім'я споживача, так як там лише один робітник на контейнер/хостинг. Якщо ви використовуєте Kubernetes для управління контейнерами, розгляньте використання StatefulSet для стабілізації імен.

Tip

Встановіть delete_after_ack як true (якщо у вас одна група) або визначте stream_max_entries (якщо ви можете припустити, яка максимальна кількість записів припустима у вашому випадку), щоб уникнути витоків пам'яті. В іншому випадку, всі повідомлення назавжди залишаться в Redis.

Tip

Встановіть delete_after_ack як true (якщо ви використовуєте одну групу) або визначіть stream_max_entries (якщо ви можете припустити, яка максимальна кількість запитів припустима у вашому випадку), щоб уникнути витоку памʼяті. Інаккше, всі повідомлення назавжди залишаться в Redis.

Транспорт у пам'яті

Транспорт in-memory насправді не доставляє повідомлення. Замість цього, він тримає їх у пам'яті під час запиту, що може бути корисним для тестування. Наприклад, якщо у вас є транспорт async_priority_normal, ви можете перевизначати його в середовищі test, щоб використати цей транспорт:

  • YAML
  • XML
  • PHP
1
2
3
4
5
# config/packages/test/messenger.yaml
framework:
    messenger:
        transports:
            async_priority_normal: 'in-memory://'

Тоді під час тестування повідомлення не будуть відправлені реальному транспорту. Навіть краще, у тесті, ви можете перевірити, щоб лише одне повідомлення було відправлене під час запиту:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// tests/Controller/DefaultControllerTest.php
namespace App\Tests\Controller;

use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;
use Symfony\Component\Messenger\Transport\InMemoryTransport;

class DefaultControllerTest extends WebTestCase
{
    public function testSomething()
    {
        $client = static::createClient();
        // ...

        $this->assertSame(200, $client->getResponse()->getStatusCode());

        /* @var InMemoryTransport $transport */
        $transport = self::$container->get('messenger.transport.async_priority_normal');
        $this->assertCount(1, $transport->getSent());
    }
}

Транспорт має ряд опцій:

serialize (бульову, за замовчуванням: false)
Чи сериалізувати повідомлення. Це корисно для тестування додатковго шару, особливо коли ви використовуєте власний сериалізатор повідомлень.

Note

Всі транспорти in-memory будуть автоматично скинуті після кожного тесту в класах тестів, що розширюють KernelTestCase або WebTestCase.

Amazon SQS

Транспорт Amazon SQS чудово підходить для додатку на AWS. Встановіть його, виконавши:

1
$ composer require symfony/amazon-sqs-messenger

DSN SQS транспорту виглядає так:

1
2
3
# .env
MESSENGER_TRANSPORT_DSN=https://sqs.eu-west-3.amazonaws.com/123456789012/messages?access_key=AKIAIOSFODNN7EXAMPLE&secret_key=j17M97ffSVoKI0briFoo9a
MESSENGER_TRANSPORT_DSN=sqs://localhost:9494/messages?sslmode=disable

Note

Транспорт автоматично створить необхідні черги. Це можна відключити, встановивши опцію auto_setup як false.

Tip

До відправлення або отримання повідомлення, Symfony необхідно конвертувати назву черги в URL черги AWS, викликавши API GetQueueUrl в AWS. Цього додаткового API-виклику можна уникнути, надавши DSN, яка є URL черги.

Транспорт має ряд опцій:

????? ???? ?? ?????????????
access_key ???? ??????? AWS  
account ????????????? AWS-???????? ??????? ????????? ?????
auto_setup ?? ???????? ??????????? ?????????? ????? ??? ??? ?????????/?????????. true
buffer_size ????????? ??????????? ??? ???????????? ????????? 9
debug ???? true, ???? ??? ???? HTTP ??????? ?? ?????????? (?? ??????? ?? ??????????????) false
endpoint ?????????? URL ?? SQS-??????? https://sqs.eu-west-1.amazonaws.com
poll_timeout ??? ?????????? ?????? ???????????? ? ???????? 0.1
queue_name ????? ????? messages
region ????? AWS-??????? eu-west-1
secret_key ????????? ???? AWS  
session_token ????? ?????  
visibility_timeout ????????? ??????, ???????? ???? ???????????? ?? ???? ??????? (Visibility Timeout) ???????????? ?????
wait_time ?????????? Long polling ? ???????? 20

6.1

Опція session_token була представлена в Symfony 6.1.

Note

Параметр wait_time визначає максимальний час очікування для Amazon SQS до того, як повідомлення буде доступне в черзі, перед відправленням відповіді. Це допомагає знизити вартість використання Amazon SQS, усуваючи деяку кількість пустих відповідей.

Параметр poll_timeout визначає час очікування отримувача до повернення null. Він уникає блокування інших отримувачів від виклику.

Note

Якщо назва черги має суфікс .fifo, AWS створить чергу FIFO. Використайте марку AmazonSqsFifoStamp, щоб визначити Message group ID і Message deduplication ID.

Черги FIFO не підтримують установки затримки окремих повідомлень, значення delay: 0 потребується в налаштуваннях стратегії повторних спроб.

Серіалізація повідомлень

Коли повідомлення відправляються (та отримуються) в транпсорт, вони сериалізуються з використанням нативних функцій PHP serialize() і unserialize(). Ви можете змінити це глобально (або для кожного транспорту) на сервіс, який реалізує SerializerInterface:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
11
12
13
# config/packages/messenger.yaml
framework:
    messenger:
        serializer:
            default_serializer: messenger.transport.symfony_serializer
            symfony_serializer:
                format: json
                context: { }

        transports:
            async_priority_normal:
                dsn: # ...
                serializer: messenger.transport.symfony_serializer

messenger.transport.symfony_serializer - це вбудований сервіс, який використовує компонент Serializer і може бути сконфігурований декількома способами. Якщо ви оберете використовувати сериалізатор Symfony, ви зможете контролювати контекст для кожного випадку окремо через SerializerStamp (див. Конверти та марки).

Tip

При відправленні/отримаванні повідомлен в/з іншого транспорту, вам може знадобитися більше контролю над процесом сериалізації. Використання користувацького сериалізатору надає такий контроль. Див. Туторіал по сериалізації повідомлень SymfonyCasts, щоб дізнатися більше.

Налаштування обробників

Конфігурація обробників з використанням атрибутів

Ви можете сконфігурувати вашого обробника, передавши опції атриубуту:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;

use App\Message\OtherSmsNotification;
use App\Message\SmsNotification;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler(fromTransport: 'async', priority: 10)]
class SmsNotificationHandler
{
    public function __invoke(SmsNotification $message)
    {
        // ...
    }
}

Можливі опції для конфігурації з атрибутом:

  • bus
  • fromTransport
  • handles
  • method
  • priority

Конфігурація обробників вручну

Symfony зазвичай буде знаходити та реєструвати вашого обробника автоматично . Але ви також можете сконфігурувати його вручну - і передати йому додаткову конфігурацію - тегувавши сервіс обробника messenger.message_handler

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
11
# config/services.yaml
services:
    App\MessageHandler\SmsNotificationHandler:
        tags: [messenger.message_handler]

        # або сконфігурувати з опціями
        tags:
            -
                name: messenger.message_handler
                # небхідно лише якщо неможливо вгадати за підказкою
                handles: App\Message\SmsNotification

Можливі опції конфігурації з тегами:

  • bus
  • from_transport
  • handles
  • method
  • priority

Обробка множини повідомлень

Один клас обробника може обробляти багато повідомлень. Для цього, додайте атрибут #AsMessageHandler до всіх методів обробки:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;

use App\Message\OtherSmsNotification;
use App\Message\SmsNotification;

class SmsNotificationHandler
{
    #[AsMessageHandler]
    public function handleSmsNotification(SmsNotification $message)
    {
        // ...
    }

    #[AsMessageHandler]
    public function handleOtherSmsNotification(OtherSmsNotification $message)
    {
        // ...
    }
}

6.2

Реалізація MessageSubscriberInterface - це ще один спосіб обробляти багато повідомлень за допомогою одного класу обробника. Цей інтерфейс застарів у Symfony 6.2.

Пов'язування обробників з різними транспортами

Кожне повідомлення може мати декілька обробників, і коли повідомлення споживається, викликаються всі його оброники. Але ви можете також сконфігурувати обробника так, щоб він викликався лише тоді, коли повідомлення отримане з конкретного транспорту. Це дозволяє вам мати одне повідомлення, де кожний обробник викликається різними "робітниками", що споживають різний транспорт.

Уявіть, що у вас є повідомлення UploadedImage з двома обробниками:

  • ThumbnailUploadedImageHandler: ви хочете, щоб цей оброблялося транспортом під назвою image_transport
  • NotifyAboutNewUploadedImageHandler: ви хочете, щоб цей оброблялося транспортом під назвою async_priority_normal

Щоб зробити це, додайте опцію from_transport до кожного обробника. Наприклад:

1
2
3
4
5
6
7
8
9
10
11
12
13
// src/MessageHandler/ThumbnailUploadedImageHandler.php
namespace App\MessageHandler;

use App\Message\UploadedImage;

#[AsMessageHandler(from_transport: 'image_transport')]
class ThumbnailUploadedImageHandler
{
    public function __invoke(UploadedImage $uploadedImage)
    {
        // створити мініатюри
    }
}

І, схожим чином:

1
2
3
4
5
6
7
8
// src/MessageHandler/NotifyAboutNewUploadedImageHandler.php
// ...

#[AsMessageHandler(from_transport: 'async_priority_normal')]
class NotifyAboutNewUploadedImageHandler
{
    // ...
}

Потім, переконайтеся, що "маршрутизуєте" ваше повідомлення до обох транспортів:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async_priority_normal: # ...
            image_transport: # ...

        routing:
            # ...
            'App\Message\UploadedImage': [image_transport, async_priority_normal]

Ось і все! Тепер ви можете споживати кожний транспорт:

1
2
3
4
# викличе ThumbnailUploadedImageHandler лише при обробці повідомлення
$ php bin/console messenger:consume image_transport -vv

$ php bin/console messenger:consume async_priority_normal -vv

Caution

Якщо обробник не має конфігурації from_transport, він буде виконаний в кожному транспорті, з якого буде отримано це повідомлення.

Розширення Messenger

Конверти та марки

Повідомлення може бути будь-яким PHP-об'єктом. Інколи вам може знадобитися сконфігурувати щось додаткове в повідомленні - на кшталт того, як воно має бути оброблене всередині AMQP або додавання затримки перед обробкою повідомлення. Ви можете зробити це, додавши марку ("stamp") до вашого повідомлення:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;

public function index(MessageBusInterface $bus)
{
    $bus->dispatch(new SmsNotification('...'), [
        // почекати 5 секунд перед обробкою
        new DelayStamp(5000),
    ]);

    // ябо чітко створіть Конверт
    $bus->dispatch(new Envelope(new SmsNotification('...'), [
        new DelayStamp(5000),
    ]));

    // ...
}

Внутрішньо, кожне повідомлення огортається в конверт (Envelope), який містить повідомлення та марки. Ви можете створити його вручну або дозволити автобусу повідомлень зробити це. Існує багато різних марок для різних цілей і вони використовуються внутрішньо для відслідковування інформації про повідомлення - на кшталт того, який автобус обробляє його і чи має воно повторні спроби після невдачі.

Проміжкове ПЗ

Те, що відбувається після запуску повідомлення в автобус повідомлень, залежить від його набору проміжкового ПЗ та його порядку. За замовчуванням, проміжкове ПЗ, сконфігуроване для кожного атрибуту, виглядає так:

  1. add_bus_name_stamp_middleware - додає марку для запису того, в якому автобусі було запущено це повідомлення;
  2. dispatch_after_current_bus - див. Транзакційні повідомлення: обробляйте повідомлення після того, як обробку завершено;
  3. failed_message_processing_middleware - обробляє повідомлення, які мають повторні спроби через транспорт помилок , щоб вони правильно функціонували, ніби-то вони були отримані з початкового транспорту;
  4. Ваша власна колекція middleware;
  5. send_message - якщо маршрутизація сконфігурована для транспорту, відправляє повідомлення цьому трнаспорту та зупиняє ланцюжок проміжкового ПЗ;
  6. handle_message - викликає обробника(ів) повідомлень для заданого повідомлення.

Note

Ці назви проміжкового ПЗ - насправді скорочення. Справжні id сервісів мають префікс messenger.middleware. (наприклад,messenger.middleware.handle_message).

Проміжкове ПЗ виконується після запуску повідомлення, а також ще раз, коли повідомлення отримане через робітника (для повідомлень, які були відправлені транспорту для асинхронної обробки). Пам'ятайте це, якщо ви створите власне проміжкове ПЗ.

Ви можете додати власне проміжкове ПЗ в список, або повністю відключити проміжкове ПЗ за замовчуванням і додати лише ваше власне:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
11
12
13
# config/packages/messenger.yaml
framework:
    messenger:
        buses:
            messenger.bus.default:
                # відключити проміжкове ПЗ за замовчуванням
                default_middleware: false

                # та/або додати ваше власне
                middleware:
                    # id сервісів, що реалізують Symfony\Component\Messenger\Middleware\MiddlewareInterface
                    - 'App\Middleware\MyMiddleware'
                    - 'App\Middleware\AnotherMiddleware'

Note

Якщо сервіс проміжкового ПЗ абстрактний, буде створено другий екземпляр сервісу для кожного автобусу.

Проміжкове ПЗ для Doctrine

1.11

Наступне проміжкове ПЗ для Doctrine було представлено в DoctrineBundle 1.11.

Якщо ви у своєму додатку використовуєте Doctrine, існує ряд необов'язкового проміжкового ПЗ, яке ви можете захотіти використати:

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# config/packages/messenger.yaml
framework:
    messenger:
        buses:
            command_bus:
                middleware:
                    # кожний раз при обробці повідомлення, з'єднання Doctrine
                    # "пінгується" і повторно підключається, якщо воно закрите. Корисно,
                    # якщо ваші робітники працюють довгий час і з'єднання бази даних
                    # інколи втрачається
                    - doctrine_ping_connection

                    # Після обробки, з'єднання Doctrine закривається, що може
                    # звільнити з'єднання бази даних в робітнику, замість того,
                    # щоб утримувати їх відкритими завжди
                    - doctrine_close_connection

                    # огортає всіх обробників в одну транзакцію Doctrine
                    # обробникам не потрібно викликати flush(), а помилка в будь-якому
                    # обробнику викличе відкат
                    - doctrine_transaction

                    # або передати інший менеджер сутностей будь-якому
                    #- doctrine_transaction: ['custom']

Інше проміжкове ПЗ

Додайте проміжкове ПЗ router_context, якщо вам потрібно генерувати абсолютні URL у споживачі (наприклад, відображати шаблон з посиланнями). Це проміжкове ПЗ зберігає контекст початкового запиту (тобто хостинг, HTTP-порт і т.д.), що необхідно при створенні абсолютних URL.

Додайте проміжкове ПЗ validation, якщо вам потрібно валідувати обʼєкт повідомлення, використовуючи компонент Validator перед його обробкою. Якщо валідація невдала, буде викликано ValidationFailedException. ValidationStamp може бути використано для конфігурації груп валідації.

  • YAML
  • XML
  • PHP
1
2
3
4
5
6
7
8
# config/packages/messenger.yaml
framework:
    messenger:
        buses:
            command_bus:
                middleware:
                    - router_context
                    - validation

Події Messenger

На додаток до проміжкового ПЗ, Messenger також запускає декілька подій. Ви можете створити слухача подій, щоб підключатися до різних частин процесу. Для кожного, клас подій буде назвою події:

Декілька автобусів, автобуси команд та подій

Messenger надає вам один сервіс автобусу повідомлень за замовчуванням. Але ви можете сконфігурувати стільки, скільки ви хочете, створивши автобуси "команд", "запитів" або "подій", та контролюючи їх проміжкове ПЗ. Див. Декілька автобусів.