Messenger: робота з синхронізованими повідомленнями та повідомленнями у черзі
Дата оновлення перекладу 2024-06-09
Messenger: робота з синхронізованими повідомленнями та повідомленнями у черзі
Messenger надає автобус повідомлень з можливістю відправлення повідомлень, а потім роботи з ними одразу ж у вашому додатку, або їх відправлення через транспорт (наприклад, чергу) для обробки пізніше. Щоб дізнатися про це більше, прочитайте документацію компонента Messenger.
Установка
У додатках, що використовують Symfony Flex , виконайте цю команду, щоб встановити Messenger:
1
$ composer require symfony/messenger
Створення повідомлення та обробника
Messenger будується на двох різних класах, які ви створите: (1) класі повідомлення, який містить дані, та (2) класі обробника(ів), який буде викликано після запуску повідомлення. Клас обробника читатиме клас повідомлення та виконувати якусь дію.
До класу повідомлення немає особливих вимог, окрім того, що його можна серіалізувати:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// src/Message/SmsNotification.php
namespace App\Message;
class SmsNotification
{
public function __construct(
private string $content,
) {
}
public function getContent(): string
{
return $this->content;
}
}
Обробник повідомлень - це PHP-викликане, рекомендований спосіб його створення - створити клас,
який реалізує атрибут AsMessageHandler та має
метод __invoke()
, який має підказки класу повідомлення (або інтерфейс повідомлення):
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;
use App\Message\SmsNotification;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler]
class SmsNotificationHandler
{
public function __invoke(SmsNotification $message)
{
// ... зробіть щось - на кшталт відправки SMS!
}
}
Tip
Ви також можете використати атрибут #[AsMessageHandler]
в окремих методах класу.
Ви можете використовувати атрибут у стількох методах одного класу, скількох ви захочете,
що дозволяє вам групувати обробку багатьох повʼязаних типів повідомлень.
Завдяки автоконфігурації та підказці SmsNotification
,
Symfony знає, що цей обробник має бути викликаний, коли запускається повідомлення
SmsNotification
. У більшості випадків, це все, що вам треба буде зробити. Але ви
можете також сконфігурувати обробники повідомлень вручну .
Щоб побачити всі сконфігуровані обробники, виконайте:
1
$ php bin/console debug:messenger
Оголошення повідомлення
Ви готові! Для запуску повідомлення (та виклику обробника), впровадьте сервіс
messenger.default_bus
(через MessageBusInterface
), наприклад, у контролер:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// src/Controller/DefaultController.php
namespace App\Controller;
use App\Message\SmsNotification;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Messenger\MessageBusInterface;
class DefaultController extends AbstractController
{
public function index(MessageBusInterface $bus): Response
{
// призведе до виклику SmsNotificationHandler
$bus->dispatch(new SmsNotification('Look! I created a message!'));
// ...
}
}
Транспорт: асинхронні повідомлення/повідомлення в черзі
За замовчуванням, повідомлення обробляються одразу ж після запуску. Якщо ви хочете обробити повідомлення асинхронно, ви можете сконфігурувати транспорт. Транспорт поже відправляти повідомлення (наприклад, в систему черги), а потім отримувати через робітника . Messenger підтримує декілька транспортів .
Note
Якщо ви хочете використати транспорт, який не підтримується, подивіться на транспорт Enqueue, який підтримує речі на кшталт Kafka і Google Pub/Sub.
Транспорт реєструється з використанням "DSN". Завдяки рецепту Flex для Messenger,
ваш файл .env
вже має декілька прикладів.
1 2 3
# MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages
# MESSENGER_TRANSPORT_DSN=doctrine://default
# MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
Розкоментуйте той транспорт, який ви хочете (або встановіть його в .env.local
).
Див. , щоб дізнатися більше деталей.
Далі, в config/packages/messenger.yaml
, давайте визначимо транспорт під назвою
async
, який використовує цю конфігурацію:
1 2 3 4 5 6 7 8 9 10
# config/packages/messenger.yaml
framework:
messenger:
transports:
async: "%env(MESSENGER_TRANSPORT_DSN)%"
# або розширено для конфігурації більшої кількості опцій
#async:
# dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
# options: []
Маршрутизація повідомлень до транспорту
Тепер, коли у вас є сконфігурований транспорт, замість негайної обробки повідомлень, ви можете сконфігурувати їх так, щоб вони були відправлені транспорту:
1 2 3 4 5 6 7 8 9
# config/packages/messenger.yaml
framework:
messenger:
transports:
async: "%env(MESSENGER_TRANSPORT_DSN)%"
routing:
# async - це те ім'я, яке ви дали вашому транспорту вище
'App\Message\SmsNotification': async
Завдяки цьому, App\Message\SmsNotification
буде відправлено транспорту async
,
і його обробник(и) не будуть викликані одразу ж. Всі повідомлення, не співпавші з
routing
будуть оброблені негайно.
Note
Ви можете використовувати частковий простір імен PHP, наприклад 'App\Message\*'
, щоб зіставити всі
повідомлення у відповідному просторі імен. Єдина вимога полягає в тому, що підстановочний символ
'*'
має бути розміщено в кінці простору імен.
Ви можете використати '*'
як клас повідомлення. Це діятиме як правило маршрутизації
за замовчуванням для будь-якого повідомлення, яке не співпало з routing
. Це корисно
для того, щоб гарантувати, що жодне повідомлення не буде оброблено синхронно за
замовчуванням.
Єдиним недоліком є те, що '*'
буде також застосовано до електронних листів, відправлених
за допомогою Symfony Mailer (який використовує SendEmailMessage
, якщо доступний Messenger).
Це може призвести до проблем, якщо ваші листи не серіалзовувані (наприклад, якщо вони містять
вкладення файлів в якості PHP джерел/потоків).
Ви також можете маршрутизувати класи за їхнім батьківським класом або інтерфейсом. Або відправляти повідомлення декільком транспортам:
1 2 3 4 5 6 7 8 9
# config/packages/messenger.yaml
framework:
messenger:
routing:
# маршрутизуйте всі повідомлення, що розширюють цей приклад базового класу або інтерфейс
'App\Message\AbstractAsyncMessage': async
'App\Message\AsyncMessageInterface': async
'My\Message\ToBeSentToTwoSenders': [async, audit]
Note
Якщо ви сконфігуруєте маршрутизація і для дочірнього, і для батьківського класу,
використовуються обидва правила. Наприклад, якщо у вас є об'єкт SmsNotification
,
що розширюється з Notification
, будуть використані маршрутизації і для
Notification
, і для SmsNotification
.
Tip
Ви можете визначати та перевизначати транспорт, який використовує повідомлення, під час рантайму, використовуючи TransportNamesStamp в конверті повідомлення. Цей штамп бере масив імені транспорту як єдиний аргумент. Щоб дізнатися більше про штампи, див. Конвети і штампи.
Сутності Doctrine у повідомленнях
Якщо вам потрібно передати сутність Doctrine у повідомленні, краще за все передати основний
ключ сутності (або будь-яку релевантну інформацію, необхідну обробнику, на кшталт email
,
та ін.) заміть об'єкта:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// src/Message/NewUserWelcomeEmail.php
namespace App\Message;
class NewUserWelcomeEmail
{
public function __construct(
private int $userId,
) {
}
public function getUserId(): int
{
return $this->userId;
}
}
Потім, у вашому обробнику, ви можете зробити запит свіжого об'єкта:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
// src/MessageHandler/NewUserWelcomeEmailHandler.php
namespace App\MessageHandler;
use App\Message\NewUserWelcomeEmail;
use App\Repository\UserRepository;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler]
class NewUserWelcomeEmailHandler
{
public function __construct(
private UserRepository $userRepository,
) {
}
public function __invoke(NewUserWelcomeEmail $welcomeEmail): void
{
$user = $this->userRepository->find($welcomeEmail->getUserId());
// ... відправити електронний лист!
}
}
Це гарантує, що сутність містить свіжі дані.
Синхронна обробка повідомлень
Якщо повідомлення не співпадає з жодним правилом маршрутизації , воно
не буде відправлене жодному транспорту, і буде оброблене негайно. В деяких випадках (наприклад,
при пов'язуванні обробників з різними транспортами), легше та гнучкіше обробити їх чітко:
створивши транспорт sync
та "відправивши" повідомлення у нього для негайної обробки:
1 2 3 4 5 6 7 8 9 10
# config/packages/messenger.yaml
framework:
messenger:
transports:
# ... інші транспорти
sync: 'sync://'
routing:
App\Message\SmsNotification: sync
Створення вашого власного транспорту
Ви також можете створити власний транспорт, якщо вам потрібно відправляти або отримувати повідомлення через щось, що не підтримується. Див. Як створити ваш власний транспорт повідомлень.
Споживання повідомлень (запуск робітника)
Коли ваші повідомлення маршрутизовані, в більшості випадків, вам треба буде
"споживати" їх. Ви можете зробити це за допомогою команди messenger:consume
:
1 2 3 4
$ php bin/console messenger:consume async
# використайте -vv, щоб побачити деталі того що відбувається
$ php bin/console messenger:consume async -vv
Перший аргумент - це ім'я отримувача (або id сервісу, якщо ви маршрутизували до користувацького сервісу). За замовчуванням, команда буде виконуватися нескінченно: шукати нові повідомлення у вашому транспорті та обробляти їх. Ця команда називається вашим "робітником".
Якщо ви хочете споживати повідомлення від усіх доступних отримувачів, ви можете скористатися командою
з опцією --all
:
1
$ php bin/console messenger:consume --all
7.1
Опція --all
була представлена в Symfony 7.1.
Tip
У середовищі розробки, якщо ви використовуєте інструмент Symfony CLI,
ви можете сконфігурувати автоматичний запуск робітників разом з веб-сервером.
Ви можете знайти більше інформації в розділі документації
Робітники Symfony CLI Workers .
Tip
Щоб правильно зупинити робітника, викликайте екземпляр StopWorkerException.
Розгортування у виробництві
У виробництві є декілька важливих речей, про які варто подумати:
- Використовуйте менеджер процесів на кшталт Supervisor або systemd, щоб ваш(і) робітник(и) працювали
- Ви хочете, щоб один або більше "робітників" працювали весь час. Щоб зробити це, використайте систему контролю процесу на кшталт Supervisor або systemd .
- Не дозволяйте робітникам працювати нескінченно
-
Деякі сервіси (на кшталт
EntityManager
Doctrine) будуть споживати все більше пам'яті з часом. Тому замість того, щоб дозволяти вашому робітнику працювати завжди, використовуйте прапорець на кшталтmessenger:consume --limit=10
, щоб вказати робітнику, що він має обробити лише 10 повідомлень до припинення роботи (потім Супервізор створить новий процес). Також є інші опції на кшталт--memory-limit=128M
и--time-limit=3600
. - Зупиняйте робітників, які стикаються з помилками
-
Якщо залежність робітника типу ваш сервер бази даних не працює, або досягнуто тайм-ауту,
ви можете спробувати додати логіку повторного зʼєднання , або
просто зупинити робітника, якщо він отримує забагато помилок, за допомогою опції
--failure-limit
командиmessenger:consume
. - Перезавантажуйте робітників при розгортуванні
-
Кожний раз при розгортуванні вам треба буде перезавантажити всі процеси ваших робітників,
щоб вони бачили новозапущений код. Щоб зробити це, виконайте
messenger:stop-workers
при запуску. Це сигналізує кожному робітнику, що він має завершити обробку поточного повідомлення та граціозно завершити роботу. Потім, Супервізор створить нові процеси робітників. Команда використовує кеш app внутрішньо - тому переконайтеся в тому, що він сконфігурований для використання адаптеру за вашим смаком. - Використовуйте одиин кеш між розгортуваннями
-
Якщо ваша стратегія розгортування полягає у створенні нових цільових каталогів, вам
необхідно встановити значення опції конфігурації cache.prefix.seed ,
щоб використовувати один і теж простір імен кешу між запусками. Інакше, пул
cache.app
буде використовувати значення параметруkernel.project_dir
в якості бази для простору імен, що призведе до різних просторів імен кожний раз при запуску.
Пріоритизований транспорт
Іноді певні типи повідомлень повинні мати вищий пріоритет та бути оброблені до інших. Щоб зробити це можливим, ви можете створити декілька транспортів та маршрутизувати різні повідомлення до них. Наприклад:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
# config/packages/messenger.yaml
framework:
messenger:
transports:
async_priority_high:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
# queue_name відповідає транспорту doctrine
queue_name: high
# для AMQP відправте окремий обмін, а потім поставте у чергу
#exchange:
# name: high
#queues:
# messages_high: ~
# або спробуйте redis "group"
async_priority_low:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
queue_name: low
routing:
'App\Message\SmsNotification': async_priority_low
'App\Message\NewUserWelcomeEmail': async_priority_high
Потім ви зможете запускати окремних робітників для кожного транспорту, або інструктувати одного робітника, щоб він обробляв повідомлення в порядку пріоритетності:
1
$ php bin/console messenger:consume async_priority_high async_priority_low
Робітник завжди спочатку шукатиме повідомлення, які очікують async_priority_high
.
Якщо таких немає, потім він споживатиме повідомлення з async_priority_low
.
Обмежте споживання для конкретних черг
Деякий транспорт (особливо AMQP) має концепцію обміну та черг. Транспорт Symfony завжди пов'язаний з обміном. За замовчуванням, робітник споживає з усіх черг, з'єднаних з обміном вказаного транспорту. Однак є приклади використання, коли вам потрібно, щоб робітник споживав лише з конкретної черги.
Ви можете обмежити робітника, щоб він обробляв лише повідомлення з конкретних черг:
1 2 3
$ php bin/console messenger:consume my_transport --queues=fasttrack
# ви можете передати опцію --queues більше одного разу, щоб обробляти багато черг
$ php bin/console messenger:consume my_transport --queues=fasttrack1 --queues=fasttrack2
Щоб дозволити використання опції queues
, отримувач має реалізовувати
QueueReceiverInterface.
Перевірка кількості повідомлень у черзі у транспорті
Виконайте команду messenger:stats
, щоб дізнатися, скільки повідомлень знаходяться у
"чергах" якогось або всього транспорту:
1 2 3 4 5
# відображає кількість повідомлень у черзі у всьому транспорті
$ php bin/console messenger:stats
# показує статистику лише для деякого транспорту
$ php bin/console messenger:stats my_transport_name other_transport_name
Note
Для того, щоб ця команда працювала, сконфігурований отримувач транспорту має реалізовувати MessageCountAwareInterface.
Конфігурація Supervisor
Supervisor - це чудовий інструмент для гарантії того, що процес(и) ваших робітників
завжди виконується (навіть якщо він закривається у зв'язку з помилкою, досягненням
ліміту повідомлень або завдяки messenger:stop-workers
). Ви можете встановити його
на Ubuntu, наприклад, через:
1
$ sudo apt-get install supervisor
Файли конфігурації Supervisor зазвичай живуть в каталозі /etc/supervisor/conf.d
.
Наприклад, ви можете створити там новий файл messenger-worker.conf
, щоб переконатися,
що 2 екземпляри messenger:consume
працюють завжди:
1 2 3 4 5 6 7 8 9
;/etc/supervisor/conf.d/messenger-worker.conf
[program:messenger-consume]
command=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600
user=ubuntu
numprocs=2
startsecs=0
autostart=true
autorestart=true
process_name=%(program_name)s_%(process_num)02d
Змініть аргумент async
, щоб він використовував назву вашого транспорту (або
транспортів) та user
на Unix-користувача на вашому сервері.
Caution
Під час розгортування, щось може бути недоступним (наприклад, база даних), що
викличе помилку початку роботи споживача. У такій ситуації, Supervisor спробує
startretries
декілька разів, щоб перезапустити команду. Переконайтеся в
тому, що змінили це налаштування, щоб уникнути отримання команди у стані FATAL,
яка ніколи не запуститься знову.
Кожний перезапуск Supervisor збільшує затримку на 1 секунду. Наприклад, якщо
значення - 10
, він зачекає 1 с, 2 с, 3 с та далі. Це надає сервісу загалом
55 секунд, щоб знову стати доступним. Збільшіть налаштування startretries
,
щоб охопити максимум очікуваного часу простою.
Якщо ви використовуєте транспорт Redis, відмітьте, що кожному робітнику необхідне
унікальне ім'я споживача, щоб уникнути обробки одного повідомлення декількома
робітниками. Один зі способів досягнути цього - встановити змінну середовища в
файлі конфігурації Супервізора, на яку ви потім зможете послатися в messenger.yaml
(див. розділ Redis вище):
1
environment=MESSENGER_CONSUMER_NAME=%(program_name)s_%(process_num)02d
Далі, вкажіть Supervisor прочитати вашу конфігурацію та запустити ваших робітників:
1 2 3 4 5 6 7 8 9
$ sudo supervisorctl reread
$ sudo supervisorctl update
$ sudo supervisorctl start messenger-consume:*
# Якщо ви розгортаєте оновлення коду, не забудьте перезапустити ваших робітників
# для запуску нового коду
$ sudo supervisorctl restart messenger-consume:*
Див. документацію Supervisor, щоб дізнатися більше деталей.
Поступое вимкнення
Якщо ви встановили у своєму проекті PHP-розширення PCNTL, робітники будуть
обробляти POSIX сигнал SIGTERM
для завершення обробки поточного повідомлення
перед виходом.
Втім, ви можете віддати перевагу використанню інших POSIX-сигналів для поступового завершення роботи.
Ви можете перевизначити стандартні сигнали встановивши опцію конфігурації
framework.messenger.stop_worker_on_signals
.
У деяких випадках, сигнал SIGTERM
відправляється самим Супервізором (наприклад,
зупинка контейнера Docker в якому Супервізор - точка входу). В таких випадках, вам
треба додати ключ stopwaitsecs
до конфігурації програми (зі значенням бажаного
періоду відстрочки в секундах) для того, щоб виконати граціозне вимкнення:
1 2
[program:x]
stopwaitsecs=20
Конфігурація Systemd
Хоча Supervisor є чудовим інструментом, його недолік в тому, що вам потрібен системний доступ, щоб його запустити. Systemd став стандартом у більшості розповсюджень Linux, і має гарну альтернативу під назвою сервіси користувача.
Файли конфігурації сервісу користувача Systemd зазвичай живуть у каталозі ~/.config/systemd/user
.
Наприклад, ви можете створити новий файл messenger-worker.service
. Або файл messenger-worker@.service
,
якщо ви хочете, щоб більше екземплярів працювали одночасно:
1 2 3 4 5 6 7 8 9 10
[Unit]
Description=Symfony messenger-consume %i
[Service]
ExecStart=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600
Restart=always
RestartSec=30
[Install]
WantedBy=default.target
Тепер, скажіть systemd підключити та запустити одного робітника:
1 2 3 4 5 6
$ systemctl --user enable messenger-worker@1.service
$ systemctl --user start messenger-worker@1.service
# щоб підключити та запустити 20 робітників
$ systemctl --user enable messenger-worker@{1..20}.service
$ systemctl --user start messenger-worker@{1..20}.service
Якщо ви зміните ваш файл конфігурації сервісу, вам потрібно перезавантажити daemon:
1
$ systemctl --user daemon-reload
Щоб перезапустити всіх ваших споживачів:
1
$ systemctl --user restart messenger-consume@*.service
Екземпляр користувача systemd запускається лише після першого входу у систему конкретним користувачем. Споживачу, натомість, часто потрібно запускатися під час початкового завантаження системи. Підключіть зволікання у користувачі, щоб активувати цю поведінку:
1
$ loginctl enable-linger <your-username>
Логами управляє journald і з ними можна працювати з використанням команди journalctl:
1 2 3 4 5 6 7 8
# слідувати логам споживача номер 11
$ journalctl -f --user-unit messenger-consume@11.service
# слідувати логам усіх споживачів
$ journalctl -f --user-unit messenger-consume@*
# слідувати всім логам з ваших сервісів користувача
$ journalctl -f _UID=$UID
Дивіться документацію systemd, щоб дізнатися більше.
Note
Вам потрібні або підвищені привілеї для команди journalctl
, або додати
вашого користувача до групи systemd-journal:
1
$ sudo usermod -a -G systemd-journal <your-username>
Робітник без стану
PHP створений бути без стану, різни запити не мають спільних джерел. В HTTP-контексті PHP очищує все перед відправленням відповіді, тому ви можете вирішити не турбуватися про сервіси, які можуть допускати витік пам'яті.
З іншої сторони, робітники зазвичай працюють у довгострокових CLI-процесах, які не завершуються після обробки повідомлення. Тому вам треба бути обережними зі станами сервісів, щоб вони не давали витік інформації та/або пам'яті з одного повідомлення у інше.
Однак, деякі сервіси Symfony, на кшталт обробника fingers crossed
Monolog, допускають витік за своїм задумом. Symfony надає функцію перезавантаження сервісу,
щоб вирішити цю проблему. При автоматично перезавантаженні контейнера між двома повідомленнями,
Symfony шукає будь-які сервіси, що реалізують ResetInterface
(включно з вашими власними сервісами) і викликає їх метод reset()
, щоб вони могли очистити
свій внутрішній стан.
Якщо сервіс не без стану і ви хочете перезавантажувати його властивості після кожного повідомлення,
тоді сервіс має реалізовувати ResetInterface, де ви можете
перезавантажувати властивості у методі reset()
.
Якщо ви не хочете перезавантажувати контейнер, додайте опцію --no-reset
при виконанні
команди messenger:consume
.
Транспорт з обмеженням швидкості
Іноді вам може знадобитися обмежити швидкість роботи вашого працівника з повідомленнями.
Ви можете налаштувати обмежувач швидкості транспорту (потрібен
компонент RateLimiter), встановивши його опцію rate_limiter
:
1 2 3 4 5 6
# config/packages/messenger.yaml
framework:
messenger:
transports:
async:
rate_limiter: your_rate_limiter_name
Caution
Якщо в транспорті сконфігуровано обмежувач швидкості, він заблокує всього працівника, коли буде досягнуто ліміту. Вам слід переконатися, що ви сконфігурували спеціального робітника для транспорту з обмеженням швидкості, щоб уникнути блокування інших транспортів.
Повторні спроби та помилки
Якщо під час споживання повідомлення з транспорту буде викликано виключення, воно буде автоматично повторно відправлене транспорту, щоб спробувати знову. За замовчуванням, повідомлення має 3 спроби перед скиданням або відправкою транспорту помилок . Кожна спроба буде також відкладена в часі, на випадок, якщо вона була викликана тимчасовою проблемою. Все це можна сконфігурувати для кожного транспорту:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# config/packages/messenger.yaml
framework:
messenger:
transports:
async_priority_high:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
# конфігурація за замовчуванням
retry_strategy:
max_retries: 3
# затримка в мілісекундах
delay: 1000
# робить так, щоб затримка була довшою перед кожною наступною спробою
# наприклад, затримка в 1 секунду, 2 секунди, 4 секунди
multiplier: 2
max_delay: 0
# перевизначити це все сервісом, який реалізує
# Symfony\Component\Messenger\Retry\RetryStrategyInterface
# service: null
7.1
Опція jitter
була представлена в Symfony 7.1.
Tip
Symfony запускає WorkerMessageRetriedEvent, коли повідомлення має повторні спроби, тому ви можете виконати власну логіку.
Note
Завдяки SerializedMessageStamp, серіалізована форма повідомлення зберігається, що запобігає його повторній серіалізації, якщо відбувається повторна спроба повідомлення.
Уникання повторних спроб
Іноді обробка повідомлення може бути невдалою, і ви будете знати, що це перманентно і повторних спроб не потрібно. Якщо ви викличете UnrecoverableMessageHandlingException, повідомлення не матиме повторних спроб.
Note
Повідомлення, які не будуть повторно оброблені, все одно будуть показані у сконфігурованому транспорті невдач. Якщо ви хочете уникнути цього, подумайте про те, щоб обробити помилку самостійно і дозволити обробнику успішно завершитися.
Форсування повторних спроб
Іноді обробка повідомлення може бути невдалою, і ви будете знати, що це тимчасово, і необхідна повторна спроба. Якщо ви викличете RecoverableMessageHandlingException, повідомлення завжди матиме повторну спробу.
Збереження та повторні спроби невдалих повідомлень
Якщо повідомлення зазнає невдачі та має декілька повторних спроб (max_retries
), воно
потім буде відбраковане. Щоб уникнути цього, ви можете сконфігурувати failure_transport
:
1 2 3 4 5 6 7 8 9 10
# config/packages/messenger.yaml
framework:
messenger:
# після повторних спроб, повідомлення будуть відправлені транспорту "failed"
failure_transport: failed
transports:
# ... інший транспорт
failed: 'doctrine://default?queue_name=failed'
У цьому прикладі, якщо обробка повідомлення зазнає невдачі 3 рази (значення за
замовчуванням max_retries
), воно потім буде відправлене транспорту failed
.
Хоча ви можете використати messenger:consume failed
для споживання його, як
звичайного транспорту, вам скоріше захочеться вручну переглянути повідомлення в
транспорті помилок та вирішити щодо повторних спроб:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
# побачити всі повідомлення в транспорті помилок
$ php bin/console messenger:failed:show
# побачити перших 10 повідомлень
$ php bin/console messenger:failed:show --max=10
# побачити лише повідомлення MyClass
$ php bin/console messenger:failed:show --class-filter='MyClass'
# побачити кількість повідомлень класу повідомлення
$ php bin/console messenger:failed:show --stats
# побачити деталі конкретної помилки
$ php bin/console messenger:failed:show 20 -vv
# побачити та повторно спробувати кожне повідомлення окремо
$ php bin/console messenger:failed:retry -vv
# повторна спроба конкретних повідомлень
$ php bin/console messenger:failed:retry 20 30 --force
# видалити повідомлення без повторної спроби
$ php bin/console messenger:failed:remove 20
# видалити повідомлення без повторних спроб та показати кожне повідомлення перед видаленням
$ php bin/console messenger:failed:remove 20 30 --show-messages
Якщо повідомлення знов зазнає невдачі, воно буде відправлене назад у транспорт помилок у відповідності зі звичайними правилами повторних спроб . Як тільки буде досягнено максимум повторних спроб, повідомлення буде скинуте перманентно.
Декілька помилкових транспортів
Іноді недостатньо мати один глобальний сконфігурований failed transport
, тому що деякі
повідомлення важливіші за інші. В таких випадках ви можете перевизначити транспорт помилок
лише для певних транспортів:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# config/packages/messenger.yaml
framework:
messenger:
# після повторних спроб повідомлення будуть відправлені транспорту "failed"
# за замовчуванням, якщо всередині транспорту не сконфігурований "failed_transport"
failure_transport: failed_default
transports:
async_priority_high:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
failure_transport: failed_high_priority
# так як транспорт помилок не сконфігурований, буде використовуватися встановлений
# глобальний набір "failure_transport"
async_priority_low:
dsn: 'doctrine://default?queue_name=async_priority_low'
failed_default: 'doctrine://default?queue_name=failed_default'
failed_high_priority: 'doctrine://default?queue_name=failed_high_priority'
Якщо немає визначеного failure_transport
глобально або на рівні транспорту, повідомлення
буде скинуте після певної кількості спроб.
Невдалі команди мають необов'язкову опцію --transport
, щоб вказати
failure_transport
, сконфігурований на рівні транспорту.
1 2 3 4 5 6 7 8
# побачити всі повідомлення в транспорті "failure_transport"
$ php bin/console messenger:failed:show --transport=failure_transport
# повторна спроба конкретних повідомлень з "failure_transport"
$ php bin/console messenger:failed:retry 20 30 --transport=failure_transport --force
# видалити повідомлення без повторної спроби з "failure_transport"
$ php bin/console messenger:failed:remove 20 --transport=failure_transport
Конфігурація транспорту
Messenger підтримує декілька різних типів транспорту, кожний зі своїми опціями. Опції можуть бути передані транспорту через рядок DSN або конфігурацію.
1 2
# .env
MESSENGER_TRANSPORT_DSN=amqp://localhost/%2f/messages?auto_setup=false
1 2
# .env
MESSENGER_TRANSPORT_DSN=amqp://localhost/%2f/messages?auto_setup=false
1 2 3 4 5 6 7 8
# config/packages/messenger.yaml
framework:
messenger:
transports:
my_transport:
dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
options:
auto_setup: false
Опції, визначені під options
, панують над визначеними в DSN.
Транспорт AMQP
Транспорт AMQP використовує PHP-розширення AMQP для відправлення повідомлень в чергу на кшталт RabbitMQ.
1
$ composer require symfony/amqp-messenger
DSN транспорту AMQP може виглядати так:
1 2 3 4 5
# .env
MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages
# або використайте протокол AMQPS
MESSENGER_TRANSPORT_DSN=amqps://guest:guest@localhost/%2f/messages
Якщо ви хочете виокристати AMQP, закодований за допомогою TLS/SSL, ви маєте також
надати CA-сертифікат. Визначіть шлях сертифікату в налаштуванні PHP.ini amqp.cacert
(наприклад, amqp.cacert = /etc/ssl/certs
) або в параметрі DSN cacert
(наприклад, amqps://localhost?cacert=/etc/ssl/certs/
).
Порт, за замовчуванням використовуваний AMQP, закодованим за допомогою TLS/SSL, - 5671,
але ви можете перевизначити його в параметрі DSN port
(наприклад,
amqps://localhost?cacert=/etc/ssl/certs/&port=12345
).
Note
За замовчуванням, транспорт автоматично створюватиме будь-які необхідні обміни, черги та сполучні ключі. Це можна відключити, але деяків функції можуть працювати некоректно (на кшталт відкладених черг).
Щоб не автоматично створювати жодних черг повторно, ви можете сконфігурувати транспорт
за допомогою queues: []
.
Note
Ви можете обмежити споживача транспорту AMQP, щоб він обробляв лише повідомлення з деяких черг обміну. Див. .
Транспорт має інші опції, включно із способами конфігурації обміну, сполучними ключами, чергами і т.д. Дивіться документацію Connection.
Транспорт має ряд опцій:
????? | ???? | ?? ????????????? |
---|---|---|
auto_setup |
?? ??? ??????? ???? ???????? ??????????? ??? ??? ???????? / ?????????. | true |
cacert |
???? ?? ????? CA-??????????? ? PEM-???????. | |
cert |
???? ?? ??????????? ??????? ? PEM-???????. | |
channel_max |
?????? ????????? ????? ???????, ??? ???????? ??????. 0 ??????? ??????????? ????? ?????????. | |
confirm_timeout |
????-??? ??? ????????????? ? ????????; ???? ?? ??????? ????????? ?? ??????????? ????????????? ????????????. ????????: 0 ??? ?????? ??????. ???? ???? ????????. | |
connect_timeout |
????-??? ?'???????. ????????: 0 ??? ?????? ??????. ???? ???? ????????. | |
frame_max |
?????????? ?????? ??????, ???? ???????? ?????? ??? ?'???????, ??????? ?? ?????????? ????? ?? ???-??????. 0 ??????? ??????????? ????? ?????????? (???????? ??? ?????? ??????? ?????? librabbimq ?? ?????????????) | |
heartbeat |
???????? ????????? ?'??????? (? ????????), ??? ???? ??????. 0 ???????, ?? ?????? ?? ???? ?????????. ?????????, ?? librabbimq ??? ???????? ????????? ?????????, ?? ???????, ?? ????????? ????????????? ???? ??? ??? ?????????? ????????. | |
host |
??'? ???????? AMQP-??????? | |
key |
???? ?? ????? ??????? ? PEM-???????. | |
login |
???? ???????????, ??? ???? ??????????????? ??? ??????????? ?? ??????? AMQP | |
password |
?????? ??? ???????????? ??? ?'??????? ? AMQP-???????? | |
persistent |
'false' |
|
port |
???? AMQP-??????? | |
read_timeout |
????-??? ??????? ??????????. ????????: 0 ??? ?????? ??????. ???? ???? ????????. | |
retry |
||
sasl_method |
||
connection_name |
??? ?????????????? ???? ???????? (??????? ?????????? ?????? ?????????? PHP AMQP 1.10) | |
verify |
?????? ??? ??????? ??????????? ?????. ???? ??????????? ????????, ?? ???????? ??'? ? ??????????? ??????? ??? ?????????? ? ??'?? ???????. ??????????? ???????? ?? ?????????????. | |
vhost |
??????????? ??????? ??? ???????????? ? AMQP-???????? | |
write_timeout |
????-??? ???????? ??????????. ????????: 0 ??? ?????? ??????. ???? ???? ????????. | |
delay[queue_name_pattern] |
??????, ???????????????? ??? ????????? ???? | delay_%exchange_name%_%routing_key%_%delay% |
delay[exchange_name] |
??'? ??????, ??????????????? ??? ???????????/????????? ??????????? | delays |
queues[name][arguments] |
????????? ????????? | |
queues[name][binding_arguments] |
?????????, ??????????????? ??? ???'???????? ?????. | |
queues[name][binding_keys] |
???????? ????? (???? ?) ??? ???'???????? ? ?????? | |
queues[name][flags] |
???????? ????? | AMQP_DURABLE |
exchange[arguments] |
????????? ????????? ??? ??????
(?????????,alternate-exchange ) |
|
exchange[default_publish_routing_key] |
???? ?????????????, ???????????????? ??? ??????????, ???? ??? ?? ???????? ? ???????????? | |
exchange[flags] |
???????? ?????? | AMQP_DURABLE |
exchange[name] |
????? ?????? | |
exchange[type] |
??? ?????? | fanout |
Ви можете також сконфігурувати налаштування спеціально для AMQP у вашому повідомленні, додавши AmqpStamp до вашого Конверту:
1 2 3 4 5 6 7
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp;
// ...
$attributes = [];
$bus->dispatch(new SmsNotification(), [
new AmqpStamp('custom-routing-key', AMQP_NOPARAM, $attributes),
]);
Caution
Споживачі не відображаються в панелі адміна, так як цей транспорт не покладається на
\AmqpQueue::consume()
, який блокує. Наявність блокуючого отримувача робить опції
--time-limit/--memory-limit
команди messenger:consume
. а також команди
messenger:stop-workers
марними, так як вони покладаються на той факт, що отримувач
повертається негайно, незалежно від того, знаходить він повідомлення, чи ні. Робітник
споживання відповідає за ітерацію до отримання повідомлення для обробки та/або до того,
як буде досягнена одна з умов зупинки. Таким чином, логіка зупинки робітника може бути
досягнена, якщо він застряг на блокуючому виклику.
Tip
Якщо ваш додаток стикається з виключеннями сокетів або високим відтоком з'єднань (що проявляється у швидкому створенні і видаленні з'єднань), розгляньте можливість використання AMQProxy. Цей інструмент працює як шлюз між Symfony Messenger і сервером AMQP, підтримуючи стабільні з'єднання і мінімізуючи накладні витрати (що також покращує загальну продуктивність).
Транспорт Doctrine
Транспорт Doctrine може бути використаний для зберігання повідомлень в таблиці бази даних.
1
$ composer require symfony/doctrine-messenger
DSN транспорту Doctrine може виглядати так:
1 2
# .env
MESSENGER_TRANSPORT_DSN=doctrine://default
Формат doctrine://<connection_name>
, у випадку, якщо у вас є декілька з'єднань і ви
хочете використати якесь відмінне від "default". Транспорт буде автоматично створювати
таблицю під назвою messenger_messages
.
Якщо ви хочете змінити ім'я таблиці за замовчуванням, передайте власне ім'я таблиці в
DSN за допомогою опції table_name
:
1 2
# .env
MESSENGER_TRANSPORT_DSN=doctrine://default?table_name=your_custom_table_name
Або, для створення таблиці самостійно, встановіть опцію auto_setup
як false
,
та згенеруйте міграцію .
Caution
Властивість повідомлень datetime, що зберігаються у базі даних, використовує часовий пояс поточної системи. Це може спричинити проблеми, якщо декілька машин з різними конфігураціями часових поясів використовують одне сховище.
Транспорт має такі опції:
????? | ???? | ?? ????????????? |
---|---|---|
table_name | ????? ??????? | messenger_messages |
queue_name | ????? ????? (???????? ? ???????, ??? ??????????????? ???? ??????? ??? ????????? ???????????) | default |
redeliver_timeout | ????-??? ????? ????????? ??????? ???????????? ? ?????, ??? ?? ? ????? "???????" (???? ???????? ?? ?????? ??????? ?????????, ????????? ??, ? ??????? ?? ????? ????? ?????????? ????????????) - ? ????????. | 3600 |
auto_setup | ?? ??? ??????? ???? ???????? ??????????? ??? ??? ?????????/?????????. | true |
Note
Встановіть redeliver_timeout
у більшому значенні, ніж тривалість вашого
найповільнішого повідомлення. Інакше, деякі повідомлення починатимуться вдруге,
поки перше все ще обробляється.
При використанні PostgreSQL, у вас є доступ до наступних опцій для отримання переваг функції LISTEN/NOTIFY. Це дозволяє більш продуктивний підхід, ніж поведінка голосування транспорту Doctrine за замовчуванням, так як PostgreSQL буде напряму повідомляти робітників, коли нове повідомлення з'являтиметься в таблиці.
????? | ???? | ?? ????????????? |
---|---|---|
use_notify | ?? ??????????????? LISTEN/NOTIFY. | true |
check_delayed_interval | ???????? ????????? ??????????? ??????????? ? ????????????. ?????????? ?? 0, ??? ?????????? ?????????. | 1000 |
get_notify_timeout | ?????????? ???? ?????????? ????????? ???
??????? PDO::pgsqlGetNotify` , ?
????????????. |
0 |
Транспорт Beanstalkd
Транспорт Beanstalkd відправляє повідомлення прямо в робочу чергу Beanstalkd. Встановіть його, виконавши:
1
$ composer require symfony/beanstalkd-messenger
DSN транспорту Beanstalkd може виглядати так:
1 2 3 4 5
# .env
MESSENGER_TRANSPORT_DSN=beanstalkd://localhost:11300?tube_name=foo&timeout=4&ttr=120
# Якщо порту немає, він за замовчуванням буде 11300
MESSENGER_TRANSPORT_DSN=beanstalkd://localhost
Транспорт має ряд опцій:
????? | ???? | ?? ????????????? |
---|---|---|
tube_name | ????? ????? | default |
timeout | ????-??? ??????? ???????????? - ? ????????. | 0 (??????? ?????? ?????? ? ??? ????????? ?????????, ??? ????????? TransportException) |
ttr | ??? ????????? ???????????? ????? ??? ?? ????????? ???? ????? ? ????? ?????????? - ? ????????. | 90 |
Транспорт Redis
Транспорт Redis використовує потоки для створення черги повідомлень. Цей транспорт вимагає PHP-розширення Redis (>=4.3) та працюючого серверу Redis (^5.0).
1
$ composer require symfony/redis-messenger
DSN транспорту Redis може виглядати так:
1 2 3 4 5 6 7 8 9 10 11 12
# .env
MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
# Повний приклад DSN
MESSENGER_TRANSPORT_DSN=redis://password@localhost:6379/messages/symfony/consumer?auto_setup=true&serializer=1&stream_max_entries=0&dbindex=0&delete_after_ack=true
# Приклад кластеру Redis
MESSENGER_TRANSPORT_DSN=redis://host-01:6379,redis://host-02:6379,redis://host-03:6379,redis://host-04:6379
# Приклад Unix-сокету
MESSENGER_TRANSPORT_DSN=redis:///var/run/redis.sock
# Приклад TLS
MESSENGER_TRANSPORT_DSN=rediss://localhost:6379/messages
# Приклад багатьох хостів Redis Sentinel
MESSENGER_TRANSPORT_DSN=redis:?host[redis1:26379]&host[redis2:26379]&host[redis3:26379]&sentinel_master=db
Деякі опції можуть бути сконфігуровані через DSN або ключ options
під транспортом в messenger.yaml
:
????? | ???? | ?? ????????????? |
---|---|---|
stream | ????? ?????? Redis | messages |
group | ????? ????? ?????????? Redis | symfony |
consumer | ????? ?????????, ????????????????? ? Redis | consumer |
auto_setup | ???????? ????? Redis ???????????? | true |
auth | ?????? Redis | |
delete_after_ack | ???? true , ????????????
????????????? ??????????? ?????
??????? |
false |
delete_after_reject | ???? true , ????????????
????????????? ???????????, ???? ????
???????????? |
true |
lazy | ?'?????????? ???? ???? ?'??????? ?????? ????????? | false |
serializer | ?? ????????????? ???????? ????????????
? Redis (?????
Redis::OPT_SERIALIZER ) |
Redis::SERIALIZER_PHP |
stream_max_entries | ??????????? ????????? ???????, ?? ???? ???? ??????? ?????. ?????????? ????????? ?????? ????????, ??? ???????? ?????? "??????????" ??????????? | 0 (??? ???????? "?? ???????") |
redeliver_timeout | ????-??? ????? ????????? ??????? "???????????" ????????????, ??? ???????? ?????????? ????????? (???? ???????? ?? ?????? ??????? ?????, ????????? ??, ? ??????? ??? ????? ???? ???????? ???????? ???????????? ????????????) - ? ????????. | 3600 |
claim_interval | ???????? ? ???? ????? ?????????? "?????????"/???????? ???????????? - ? ???????????? | 60000 (1 ??????) |
persistent_id | ?????, ???? null-????????? ?? ?????? non-persistent. | null |
retry_interval | ???? ?????, ???????? ? ???????????? | 0 |
read_timeout | ????????, ???????? ? ????????, ?? ????????????? ??????? "??? ????????" | 0 |
timeout | ????????, ???????? ? ????????, ??. ????????????? ??????? "??? ????????" | 0 |
sentinel_master | ?????, ???? null ??? ??????, ????????????? ????????? Sentinel | null |
redis_sentinel | ????????? ?????????? |
7.1
Опція `redis_sentinel` як псевдонім `sentinel_master` була представлена в Symfony 7.1.
Caution
Ніколи не повинно бути більше однієї команди messenger:consume
, виконуваної з однаковою
комбінацією stream
, group
та consumer
, інашке повідомлення можуть бути оброблені
більше, ніж один раз. Якщо ви запускаєте декілька робітників черги, consumer
може бути
встановлено як змінну оточення (на кшталт %env(MESSENGER_CONSUMER_NAME)%
), встановлено
Супервізором (приклад нижче) або будь-яким іншим сервісом, використовуваним для управління
процесами робітників.
В середовищі контейнера, HOSTNAME
може бути використано як ім'я споживача, так як там
лише один робітник на контейнер/хостинг. Якщо ви використовуєте Kubernetes для управління
контейнерами, розгляньте використання StatefulSet
для стабілізації імен.
Tip
Встановіть delete_after_ack
як true
(якщо у вас одна група) або визначте
stream_max_entries
(якщо ви можете припустити, яка максимальна кількість записів
припустима у вашому випадку), щоб уникнути витоків пам'яті. В іншому випадку, всі
повідомлення назавжди залишаться в Redis.
Транспорт у пам'яті
Транспорт in-memory
насправді не доставляє повідомлення. Замість цього, він тримає
їх у пам'яті під час запиту, що може бути корисним для тестування. Наприклад, якщо у вас
є транспорт async_priority_normal
, ви можете перевизначати його в середовищі test
,
щоб використати цей транспорт:
1 2 3 4 5
# config/packages/test/messenger.yaml
framework:
messenger:
transports:
async_priority_normal: 'in-memory://'
Тоді під час тестування повідомлення не будуть відправлені реальному транспорту. Навіть краще, у тесті, ви можете перевірити, щоб лише одне повідомлення було відправлене під час запиту:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
// tests/Controller/DefaultControllerTest.php
namespace App\Tests\Controller;
use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;
use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport;
class DefaultControllerTest extends WebTestCase
{
public function testSomething(): void
{
$client = static::createClient();
// ...
$this->assertSame(200, $client->getResponse()->getStatusCode());
/* @var InMemoryTransport $transport */
$transport = $this->getContainer()->get('messenger.transport.async_priority_normal');
$this->assertCount(1, $transport->getSent());
}
}
Транспорт має ряд опцій:
serialize
(бульову, за замовчуванням:false
)- Чи сериалізувати повідомлення. Це корисно для тестування додатковго шару, особливо коли ви використовуєте власний сериалізатор повідомлень.
Note
Всі транспорти in-memory
будуть автоматично скинуті після кожного тесту
в класах тестів, що розширюють
KernelTestCase
або WebTestCase.
Amazon SQS
Транспорт Amazon SQS чудово підходить для додатку на AWS. Встановіть його, виконавши:
1
$ composer require symfony/amazon-sqs-messenger
DSN SQS транспорту виглядає так:
1 2 3
# .env
MESSENGER_TRANSPORT_DSN=https://sqs.eu-west-3.amazonaws.com/123456789012/messages?access_key=AKIAIOSFODNN7EXAMPLE&secret_key=j17M97ffSVoKI0briFoo9a
MESSENGER_TRANSPORT_DSN=sqs://localhost:9494/messages?sslmode=disable
Note
Транспорт автоматично створить необхідні черги. Це можна відключити, встановивши
опцію auto_setup
як false
.
Tip
До відправлення або отримання повідомлення, Symfony необхідно конвертувати назву
черги в URL черги AWS, викликавши API GetQueueUrl
в AWS. Цього додаткового
API-виклику можна уникнути, надавши DSN, яка є URL черги.
Транспорт має ряд опцій:
????? | ???? | ?? ????????????? |
---|---|---|
access_key |
???? ??????? AWS | |
account |
????????????? AWS-???????? | ??????? ????????? ????? |
auto_setup |
?? ???????? ??????????? ?????????? ????? ??? ??? ?????????/?????????. | true |
buffer_size |
????????? ??????????? ??? ???????????? ????????? | 9 |
debug |
???? true , ???? ??? ???? HTTP
??????? ?? ?????????? (?? ??????? ??
??????????????) |
false |
endpoint |
?????????? URL ?? SQS-??????? | https://sqs.eu-west-1.amazonaws.com |
poll_timeout |
??? ?????????? ?????? ???????????? ? ???????? | 0.1 |
queue_name |
????? ????? | messages |
region |
????? AWS-??????? | eu-west-1 |
secret_key |
????????? ???? AWS | |
session_token |
????? ????? | |
visibility_timeout |
????????? ??????, ???????? ???? ???????????? ?? ???? ??????? (Visibility Timeout) | ???????????? ????? |
wait_time |
?????????? Long polling ? ???????? | 20 |
Note
Параметр wait_time
визначає максимальний час очікування для Amazon SQS до того, як
повідомлення буде доступне в черзі, перед відправленням відповіді. Це допомагає знизити
вартість використання Amazon SQS, усуваючи деяку кількість пустих відповідей.
Параметр poll_timeout
визначає час очікування отримувача до повернення null.
Він уникає блокування інших отримувачів від виклику.
Note
Якщо назва черги має суфікс .fifo
, AWS створить чергу FIFO. Використайте марку
AmazonSqsFifoStamp,
щоб визначити Message group ID
і Message deduplication ID
.
Іншою можливістю є включення AddFifoStampMiddleware.
Якщо у ваше повідомлення реалізує
MessageDeduplicationAwareInterface,
проміжне програмне забезпечення автоматично додасть
AmazonSqsFifoStamp'
і встановіть 941aa285c3d9b5c2de7bfba085dcef5148fa7d92. Крім того, якщо ваше повідомлення реалізує
:class:Symfony\Component\Messenger\Bridge\AmazonSqs\MessageGroupAwareInterface`,
проміжне програмне забезпечення автоматично встановить мітку Ідентифікатор групи повідомлень
.
Ви можете дізнатися більше про проміжне програмне забезпечення у спеціальному розділі .
Черги FIFO не підтримують установки затримки окремих повідомлень, значення delay: 0
потребується в налаштуваннях стратегії повторних спроб.
Серіалізація повідомлень
Коли повідомлення відправляються (та отримуються) в транпсорт, вони сериалізуються з
використанням нативних функцій PHP serialize()
і unserialize()
. Ви можете змінити
це глобально (або для кожного транспорту) на сервіс, який реалізує
SerializerInterface:
1 2 3 4 5 6 7 8 9 10 11 12 13
# config/packages/messenger.yaml
framework:
messenger:
serializer:
default_serializer: messenger.transport.symfony_serializer
symfony_serializer:
format: json
context: { }
transports:
async_priority_normal:
dsn: # ...
serializer: messenger.transport.symfony_serializer
messenger.transport.symfony_serializer
- це вбудований сервіс, який використовує
компонент Serializer і може бути сконфігурований декількома способами.
Якщо ви оберете використовувати сериалізатор Symfony, ви зможете контролювати контекст для
кожного випадку окремо через SerializerStamp
(див. Конверти та марки).
Tip
При відправленні/отримаванні повідомлен в/з іншого транспорту, вам може знадобитися більше контролю над процесом сериалізації. Використання користувацького сериалізатору надає такий контроль. Див. Туторіал по сериалізації повідомлень SymfonyCasts, щоб дізнатися більше.
Запуск команд та зовнішніх процесів
Запуск команди
Запустити будь-яку команду можна за допомогою розгоратння RunCommandMessage. Symfony подбає про обробку цього повідомлення і виконає команду, передану до параметра повідомлення:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
use Symfony\Component\Console\Messenger\RunCommandMessage;
use Symfony\Component\Messenger\MessageBusInterface;
class CleanUpService
{
public function __construct(private readonly MessageBusInterface $bus)
{
}
public function cleanUp(): void
{
// Довге завдання з деяким кешуванням...
// Після завершення, разгорніть деякі команди очищення
$this->bus->dispatch(new RunCommandMessage('app:my-cache:clean-up --dir=var/temp'));
$this->bus->dispatch(new RunCommandMessage('cache:clear'));
}
}
Ви можете сконфігурувати поведінку на випадок, якщо щось піде не так, під час виконання команди.
Для цього ви можете скористатися параметрами throwOnFailure
та catchExceptions
при створенні
вашого екземпляру RunCommandMessage.
Після обробки, обробник поверне значення RunCommandContext, яке містить багато корисної інформації, наприклад, код завершення роботи або виведення процесу. Ви можете звернутися до сторінки, присвяченої результатам обробника для отримання додаткової інформації.
Запуск зовнішнього процесу
Messenger постачається зі зручним помічником для запуску зовнішніх процесів за допомогою розгортання повідомлення. Для цього використовуються переваги Компонента Process. Розгортаючи RunProcessMessage, Messenger подбає про створення нового процесу з параметрами, які ви передали:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Process\Messenger\RunProcessMessage;
class CleanUpService
{
public function __construct(private readonly MessageBusInterface $bus)
{
}
public function cleanUp(): void
{
$this->bus->dispatch(new RunProcessMessage(['rm', '-rf', 'var/log/temp/*'], cwd: '/my/custom/working-dir'));
// ...
}
}
Після обробки обробник поверне значення RunProcessContext, яке
містить багато корисної інформації, наприклад, код завершення роботи або виведення процесу.
Ви можете звернутися до сторінки, присвяченої
результатам обробника для отримання додаткової інформації.
Пінг веб-сервісу
Іноді вам може знадобитися регулярно пінгувати веб-сервіс, щоб дізнатися його статус, наприклад чи він працює. Це можна зробити, розгорнувши команду PingWebhookMessage:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
use Symfony\Component\HttpClient\Messenger\PingWebhookMessage;
use Symfony\Component\Messenger\MessageBusInterface;
class LivenessService
{
public function __construct(private readonly MessageBusInterface $bus)
{
}
public function ping(): void
{
// Викликано HttpExceptionInterface в 3xx/4xx/5xx
$this->bus->dispatch(new PingWebhookMessage('GET', 'https://example.com/status'));
// Пінг, але він не викликає 3xx/4xx/5xx
$this->bus->dispatch(new PingWebhookMessage('GET', 'https://example.com/status', throw: false));
// Може бути використана будь-яка валідна опція HttpClientInterface
$this->bus->dispatch(new PingWebhookMessage('POST', 'https://example.com/status', [
'headers' => [
'Authorization' => 'Bearer ...'
],
'json' => [
'data' => 'some-data',
],
]));
}
}
Обробник поверне значення ResponseInterface, що дозволяє вам збирати та обробляти інформацію, що повертається HTTP-запитом.
Отримання результатів від ваших обробників
Коли повідомлення обробляється, HandleMessageMiddleware додає HandledStamp для кожного об'єкта, який обробив повідомлення. Ви можете використовувати його для отримання значення, повернутого обробником(ами):
1 2 3 4 5 6 7 8 9 10 11
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\HandledStamp;
$envelope = $messageBus->dispatch(new SomeMessage());
// отримати значення, яке було повернуто останнім обробником повідомлення
$handledStamp = $envelope->last(HandledStamp::class);
$handledStamp->getResult();
// або отримаи інформацію про всі обробники
$handledStamps = $envelope->all(HandledStamp::class);
Отримання результатів при роботі з автобусами команди та запиту
Компонент Messenger можна використовувати в архітектурах CQRS, де автобуси команд і запитів є центральними частинами додатку. Прочитайте статтю Мартіна Фаулера про CQRS, щоб дізнатися більше і як сконфігурувати декілька автобусів .
Оскільки запити зазвичай синхронні і очікується, що вони будуть оброблені один раз, отримання результату від обробника є звичайною потребою.
Для отримання результату обробника при обробці синхронних запитів існує
HandleTrait. Він також гарантує, що буде
зареєстровано лише один обробник. HandleTrait
може бути використаний у будь-якому
класі, який має властивість $messageBus
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
// src/Action/ListItems.php
namespace App\Action;
use App\Message\ListItemsQuery;
use App\MessageHandler\ListItemsQueryResult;
use Symfony\Component\Messenger\HandleTrait;
use Symfony\Component\Messenger\MessageBusInterface;
class ListItems
{
use HandleTrait;
public function __construct(
private MessageBusInterface $messageBus,
) {
}
public function __invoke(): void
{
$result = $this->query(new ListItemsQuery(/* ... */));
// Зробити щось з результатом
// ...
}
// Створення такого методу не є обовʼязковим, але довзоляю додавати до результату підказки
private function query(ListItemsQuery $query): ListItemsQueryResult
{
return $this->handle($query);
}
}
Отже, ви можете використовувати рису для створення класів шини автобусів та запитів.
Наприклад, ви можете створити спеціальний клас QueryBus
і впроваджувати його
всюди, де вам потрібна поведінка автобуса запитів замість MessageBusInterface
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
// src/MessageBus/QueryBus.php
namespace App\MessageBus;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\HandleTrait;
use Symfony\Component\Messenger\MessageBusInterface;
class QueryBus
{
use HandleTrait;
public function __construct(
private MessageBusInterface $messageBus,
) {
}
/**
* @param object|Envelope $query
*
* @return mixed The handler returned value
*/
public function query($query): mixed
{
return $this->handle($query);
}
}
Налаштування обробників
Конфігурація обробників з використанням атрибутів
Ви можете сконфігурувати вашого обробника, передавши опції атриубуту:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;
use App\Message\OtherSmsNotification;
use App\Message\SmsNotification;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler(fromTransport: 'async', priority: 10)]
class SmsNotificationHandler
{
public function __invoke(SmsNotification $message): void
{
// ...
}
}
Можливі опції для конфігурації з атрибутом:
????? | ???? |
---|---|
bus |
????? ????????, ? ????? ???????? ???? ?????????? ????????????, ?? ????????????? - ??? ????????. |
fromTransport |
????? ??????????, ? ????? ???????? ???? ?????????? ????????????, ?? ????????????? - ??? ??????????. |
handles |
??? ??????????? (FQCN), ??? ?????? ???? ????????? ??????????, ???????? ?????? ???? ?? ????? ??????? ?? ????????? ????????. |
method |
??'? ??????, ???? ???? ????????? ????????????, ?????? ???? ??'????? ? ????. |
priority |
????????? ?????????, ???? ???????? ?????????? ?????? ????????? ???? ? ?? ???? ????????????. |
Конфігурація обробників вручну
Symfony зазвичай буде знаходити та реєструвати вашого обробника автоматично .
Але ви також можете сконфігурувати його вручну - і передати йому додаткову конфігурацію - тегувавши
сервіс обробника messenger.message_handler
1 2 3 4 5 6 7 8 9 10 11
# config/services.yaml
services:
App\MessageHandler\SmsNotificationHandler:
tags: [messenger.message_handler]
# або сконфігурувати з опціями
tags:
-
name: messenger.message_handler
# небхідно лише якщо неможливо вгадати за підказкою
handles: App\Message\SmsNotification
Можливі опції конфігурації з тегами:
????? | ???? |
---|---|
bus |
????? ????????, ? ????? ???????? ???? ?????????? ????????????, ?? ????????????? - ??? ????????. |
from_transport |
????? ??????????, ? ????? ???????? ???? ?????????? ????????????, ?? ????????????? - ??? ??????????. |
handles |
??? ??????????? (FQCN), ??? ?????? ???? ????????? ??????????, ???????? ?????? ???? ?? ????? ??????? ?? ????????? ????????. |
method |
??'? ??????, ???? ???? ????????? ????????????, ?????? ???? ??'????? ? ????. |
priority |
????????? ?????????, ???? ???????? ?????????? ?????? ????????? ???? ? ?? ???? ????????????. |
Обробка множини повідомлень
Один клас обробника може обробляти багато повідомлень. Для цього, додайте атрибут
#AsMessageHandler
до всіх методів обробки:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;
use App\Message\OtherSmsNotification;
use App\Message\SmsNotification;
class SmsNotificationHandler
{
#[AsMessageHandler]
public function handleSmsNotification(SmsNotification $message): void
{
// ...
}
#[AsMessageHandler]
public function handleOtherSmsNotification(OtherSmsNotification $message): void
{
// ...
}
}
Транзакційні повідомлення: Обробка нових повідомлень після завершення обробки
Обробник повідомлень може dispatch
нові повідомлення, одночасно обробляючи інші, до
того самого або іншого автобуса (якщо у додатку визначено
декілька автобусів ). Будь-які помилки або винятки, які
виникають під час цього процесу, можуть мати непередбачувані наслідки, такі як:
- Якщо при використанні
DoctrineTransactionMiddleware
розгорнуте повідомлення викликає виключення, то будь-які транзакції бази даних у початковому обробнику будуть відкочені. - Якщо повідомлення відправляється на інший автобус, то відправлене повідомлення буде оброблено, навіть якщо якийсь код пізніше в поточному обробнику згенерує виключення.
Приклад проццесу RegisterUser
Розглянемо додаток з автобусами команд та подій. Додаток
розгортає команду з іменем RegisterUser
у команді автобуса. Команда обробляється
RegisterUserHandler
, який створює об'єкт User
, зберігає
цей об'єкт до бази даних і надсилає повідомлення UserRegistered
на до автобуса подій.
Існує багато обробників повідомлення UserRegistered
, один обробник може надсилати
привітальний лист новому користувачеві. Ми використовуємо DoctrineTransactionMiddleware
для обгортання всіх запитів до бази даних в одну транзакцію бази даних.
Проблема 1: Якщо під час надсилання привітального листа буде викликано виключення, то
користувач не буде створений, оскільки DoctrineTransactionMiddleware
відкотить транзакцію Doctrine, в якій було створено користувача.
Проблема 2: Якщо при збереженні користувача до бази даних буде згенеровано виключення, привітальний лист все одно буде надіслано, оскільки він обробляється асинхронно.
Проміжкове ПО DispatchAfterCurrentBusMiddleware
Для багатьох додатків бажаною поведінкою є обробка лише тих повідомлень, які
які розгортаються обробником після повного завершення роботи цього обробника. Це можна зробити за допомогою
використання DispatchAfterCurrentBusMiddleware
і додавання DispatchAfterCurrentBusStamp
до конверта повідомлення :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
// src/Messenger/CommandHandler/RegisterUserHandler.php
namespace App\Messenger\CommandHandler;
use App\Entity\User;
use App\Messenger\Command\RegisterUser;
use App\Messenger\Event\UserRegistered;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp;
class RegisterUserHandler
{
public function __construct(
private MessageBusInterface $eventBus,
private EntityManagerInterface $em,
) {
}
public function __invoke(RegisterUser $command): void
{
$user = new User($command->getUuid(), $command->getName(), $command->getEmail());
$this->em->persist($user);
// DispatchAfterCurrentBusStamp позначає, що повідомлення події має бути оброблено лише
// якщо його обробник не викликає виключення
$event = new UserRegistered($command->getUuid());
$this->eventBus->dispatch(
(new Envelope($event))
->with(new DispatchAfterCurrentBusStamp())
);
// ...
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
// src/Messenger/EventSubscriber/WhenUserRegisteredThenSendWelcomeEmail.php
namespace App\Messenger\EventSubscriber;
use App\Entity\User;
use App\Messenger\Event\UserRegistered;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Mailer\MailerInterface;
use Symfony\Component\Mime\RawMessage;
class WhenUserRegisteredThenSendWelcomeEmail
{
public function __construct(
private MailerInterface $mailer,
EntityManagerInterface $em,
) {
}
public function __invoke(UserRegistered $event): void
{
$user = $this->em->getRepository(User::class)->find($event->getUuid());
$this->mailer->send(new RawMessage('Welcome '.$user->getFirstName()));
}
}
Це означає, що повідомлення UserRegistered
не буде оброблено лише
після завершення роботи обробника RegisterUserHandler
і збереження нового User
в базі даних. Якщо RegisterUserHandler
зустріне виключення, подія UserRegistered
ніколи не буде оброблена. І якщо виключення буде викликано під час надсилання привітального
листа, транзакція Doctrine не буде відкочена.
Note
Якщо WhenUserRegisteredThenSendWelcomeEmail
викликає виключення, це
виключення буде обгорнуто у виключення DelayedMessageHandlingException
. Використання
DelayedMessageHandlingException::getWrappedExceptions
дасть вам всі виключення,
які виникають при обробці повідомлення з DispatchAfterCurrentBusStamp
.
Проміжне програмне забезпечення dispatch_after_current_bus
увімкнено за замовчуванням. Якщо ви
конфігуруєте проміжне ПЗ вручну, обов'язково зареєструйте dispatch_after_current_bus
перед
doctrine_transaction
у ланцюжку проміжного програмного забезпечення. Крім того, проміжне програмне
забезпечення dispatch_after_current_bus
має бути завантажено для всіх використовуваних автобусів.
Пов'язування обробників з різними транспортами
Кожне повідомлення може мати декілька обробників, і коли повідомлення споживається, викликаються всі його оброники. Але ви можете також сконфігурувати обробника так, щоб він викликався лише тоді, коли повідомлення отримане з конкретного транспорту. Це дозволяє вам мати одне повідомлення, де кожний обробник викликається різними "робітниками", що споживають різний транспорт.
Уявіть, що у вас є повідомлення UploadedImage
з двома обробниками:
ThumbnailUploadedImageHandler
: ви хочете, щоб цей оброблялося транспортом під назвоюimage_transport
NotifyAboutNewUploadedImageHandler
: ви хочете, щоб цей оброблялося транспортом під назвоюasync_priority_normal
Щоб зробити це, додайте опцію from_transport
до кожного обробника. Наприклад:
1 2 3 4 5 6 7 8 9 10 11 12 13
// src/MessageHandler/ThumbnailUploadedImageHandler.php
namespace App\MessageHandler;
use App\Message\UploadedImage;
#[AsMessageHandler(fromTransport: 'image_transport')]
class ThumbnailUploadedImageHandler
{
public function __invoke(UploadedImage $uploadedImage): void
{
// do some thumbnailing
}
}
І, схожим чином:
1 2 3 4 5 6 7 8
// src/MessageHandler/NotifyAboutNewUploadedImageHandler.php
// ...
#[AsMessageHandler(fromTransport: 'async_priority_normal')]
class NotifyAboutNewUploadedImageHandler
{
// ...
}
Потім, переконайтеся, що "маршрутизуєте" ваше повідомлення до обох транспортів:
1 2 3 4 5 6 7 8 9 10
# config/packages/messenger.yaml
framework:
messenger:
transports:
async_priority_normal: # ...
image_transport: # ...
routing:
# ...
'App\Message\UploadedImage': [image_transport, async_priority_normal]
Ось і все! Тепер ви можете споживати кожний транспорт:
1 2 3 4
# викличе ThumbnailUploadedImageHandler лише при обробці повідомлення
$ php bin/console messenger:consume image_transport -vv
$ php bin/console messenger:consume async_priority_normal -vv
Caution
Якщо обробник не має конфігурації from_transport
, він буде виконаний
в кожному транспорті, з якого буде отримано це повідомлення.
Обробка повідомлень партіями
Ви можете оголосити «спеціальні» обробники, які будуть обробляти повідомлення партіями.
Таким чином, обробник буде чекати, поки певної кількості повідомлень, перш ніж почати їх
обробку. Оголошення обробника партіями здійснюється за допомогою реалізації
BatchHandlerInterface.
BatchHandlerTrait також
надано для того, щоб полегшити оголошення цих спеціальних обробників:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
use Symfony\Component\Messenger\Handler\Acknowledger;
use Symfony\Component\Messenger\Handler\BatchHandlerInterface;
use Symfony\Component\Messenger\Handler\BatchHandlerTrait;
class MyBatchHandler implements BatchHandlerInterface
{
use BatchHandlerTrait;
public function __invoke(MyMessage $message, Acknowledger $ack = null): mixed
{
return $this->handle($message, $ack);
}
private function process(array $jobs): void
{
foreach ($jobs as [$message, $ack]) {
try {
// Обчислити $result з $message...
// Визнати обробку цього повідомлення
$ack->ack($result);
} catch (\Throwable $e) {
$ack->nack($e);
}
}
}
// Як варіант, ви можете або перевизначити метод `shouldFlush()` цієї
// риси, щоб визначити ваш власний розмір партії...
private function shouldFlush(): bool
{
return 100 <= \count($this->jobs);
}
// ... або перевизначити метод `getBatchSize()`, якщо поведінка за замовчуванням
// відповідає вашим потребам
private function getBatchSize(): int
{
return 100;
}
}
Note
Коли аргумент $ack
__invoke()
дорівнює null
, повідомлення
буде оброблено синхронно. В іншому випадку, __invoke()
поверне кількість повідомлень, що очікують на обробку.
BatchHandlerTrait обробляє це
за вас.
Note
За замовчуванням, відкладені партії змиваються, коли працівник простоює, а також так само, як і коли він зупинений.
Розширення Messenger
Конверти та марки
Повідомлення може бути будь-яким PHP-об'єктом. Інколи вам може знадобитися сконфігурувати щось додаткове в повідомленні - на кшталт того, як воно має бути оброблене всередині AMQP або додавання затримки перед обробкою повідомлення. Ви можете зробити це, додавши марку ("stamp") до вашого повідомлення:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
public function index(MessageBusInterface $bus): void
{
$bus->dispatch(new SmsNotification('...'), [
// wait 5 seconds before processing
new DelayStamp(5000),
]);
// ябо чітко створіть Конверт
$bus->dispatch(new Envelope(new SmsNotification('...'), [
new DelayStamp(5000),
]));
// ...
}
Внутрішньо, кожне повідомлення огортається в конверт (Envelope
), який містить
повідомлення та марки. Ви можете створити його вручну або дозволити автобусу повідомлень
зробити це. Існує багато різних марок для різних цілей і вони використовуються внутрішньо
для відслідковування інформації про повідомлення - на кшталт того, який автобус обробляє
його і чи має воно повторні спроби після невдачі.
Проміжкове ПЗ
Те, що відбувається після запуску повідомлення в автобус повідомлень, залежить від його набору проміжкового ПЗ та його порядку. За замовчуванням, проміжкове ПЗ, сконфігуроване для кожного атрибуту, виглядає так:
add_bus_name_stamp_middleware
- додає марку для запису того, в якому автобусі було запущено це повідомлення;dispatch_after_current_bus
- див. Транзакційні повідомлення: обробляйте повідомлення після того, як обробку завершено;failed_message_processing_middleware
- обробляє повідомлення, які мають повторні спроби через транспорт помилок , щоб вони правильно функціонували, ніби-то вони були отримані з початкового транспорту;- Ваша власна колекція middleware;
send_message
- якщо маршрутизація сконфігурована для транспорту, відправляє повідомлення цьому трнаспорту та зупиняє ланцюжок проміжкового ПЗ;handle_message
- викликає обробника(ів) повідомлень для заданого повідомлення.
Note
Ці назви проміжкового ПЗ - насправді скорочення. Справжні id сервісів мають префікс
messenger.middleware.
(наприклад,messenger.middleware.handle_message
).
Проміжкове ПЗ виконується після запуску повідомлення, а також ще раз, коли повідомлення отримане через робітника (для повідомлень, які були відправлені транспорту для асинхронної обробки). Пам'ятайте це, якщо ви створите власне проміжкове ПЗ.
Ви можете додати власне проміжкове ПЗ в список, або повністю відключити проміжкове ПЗ за замовчуванням і додати лише ваше власне:
Якщо сервіс проміжного програмного забезпечення є абстрактним, ви можете сконфігурувати аргументи його конструктора і для кожного автобуса буде створено окремий екземпляр.
1 2 3 4 5 6 7 8 9 10 11 12 13
# config/packages/messenger.yaml
framework:
messenger:
buses:
messenger.bus.default:
# відключити проміжкове ПЗ за замовчуванням
default_middleware: false
# та/або додати ваше власне
middleware:
# id сервісів, що реалізують Symfony\Component\Messenger\Middleware\MiddlewareInterface
- 'App\Middleware\MyMiddleware'
- 'App\Middleware\AnotherMiddleware'
Проміжкове ПЗ для Doctrine
Якщо ви у своєму додатку використовуєте Doctrine, існує ряд необов'язкового проміжкового ПЗ, яке ви можете захотіти використати:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
# config/packages/messenger.yaml
framework:
messenger:
buses:
command_bus:
middleware:
# кожний раз при обробці повідомлення, з'єднання Doctrine
# "пінгується" і повторно підключається, якщо воно закрите. Корисно,
# якщо ваші робітники працюють довгий час і з'єднання бази даних
# інколи втрачається
- doctrine_ping_connection
# Після обробки, з'єднання Doctrine закривається, що може
# звільнити з'єднання бази даних в робітнику, замість того,
# щоб утримувати їх відкритими завжди
- doctrine_close_connection
# огортає всіх обробників в одну транзакцію Doctrine
# обробникам не потрібно викликати flush(), а помилка в будь-якому
# обробнику викличе відкат
- doctrine_transaction
# або передати інший менеджер сутностей будь-якому
#- doctrine_transaction: ['custom']
Інше проміжкове ПЗ
Додайте проміжкове ПЗ router_context
, якщо вам потрібно генерувати абсолютні
URL у споживачі (наприклад, відображати шаблон з посиланнями). Це проміжкове ПЗ
зберігає контекст початкового запиту (тобто хостинг, HTTP-порт і т.д.), що необхідно
при створенні абсолютних URL.
Додайте проміжкове ПЗ validation
, якщо вам потрібно валідувати обʼєкт повідомлення,
використовуючи компонент Validator перед його обробкою.
Якщо валідація невдала, буде викликано ValidationFailedException
.
ValidationStamp може бути використано
для конфігурації груп валідації.
1 2 3 4 5 6 7 8
# config/packages/messenger.yaml
framework:
messenger:
buses:
command_bus:
middleware:
- router_context
- validation
Події Messenger
На додаток до проміжкового ПЗ, Messenger також запускає декілька подій. Ви можете створити слухача подій, щоб підключатися до різних частин процесу. Для кожного, клас подій буде назвою події:
Додаткові аргументи обробника
Існує можливість змусити месенджер передавати додаткові дані до обробника повідомлень за допомогою HandlerArgumentsStamp. Додайте цей штамп до конверта у проміжному програмному забезпеченні і заповніть його будь-якими додатковими даними, які ви хочете мати у обробнику:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
// src/Messenger/AdditionalArgumentMiddleware.php
namespace App\Messenger;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\HandlerArgumentsStamp;
final class AdditionalArgumentMiddleware implements MiddlewareInterface
{
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
$envelope = $envelope->with(new HandlerArgumentsStamp([
$this->resolveAdditionalArgument($envelope->getMessage()),
]));
return $stack->next()->handle($envelope, $stack);
}
private function resolveAdditionalArgument(object $message): mixed
{
// ...
}
}
Тоді ваш обробник виглядатиме так:
1 2 3 4 5 6 7 8 9 10 11 12
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;
use App\Message\SmsNotification;
final class SmsNotificationHandler
{
public function __invoke(SmsNotification $message, mixed $additionalArgument)
{
// ...
}
}
Серіалізатор повідомлень для користувацьких форматів даних
Якщо ви отримуєте повідомлення з інших додатків, можливо, вони не у тому форматі,
який вам потрібен. Не всі додатки повертатимуть повідомлення у форматі JSON
з полями body
і headers
. У таких випадках вам потрібно створити
новий серіалізатор повідомлень, що реалізує
SerializerInterface.
Припустимо, ви хочете створити дешифратор повідомлень:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
namespace App\Messenger\Serializer;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
class MessageWithTokenDecoder implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
$envelope = \json_decode($encodedEnvelope, true);
try {
// аналізувати дані, які ви отримали з вашими користувацькими полями
$data = $envelope['data'];
$data['token'] = $envelope['token'];
// інші операції, на кшталт отримання інформації зі штампів
} catch (\Throwable $throwable) {
// обгорнути будь-яке виключення, яке може виникнути в конверті, щоб відравити його до транспорту невдач
return new Envelope($throwable);
}
return new Envelope($data);
}
public function encode(Envelope $envelope): array
{
// цей дешифратор не шифрує повідомлення, але ви можете реалізувати це, повернувши
// масив з сериалізованими штампами, якщо вам потрібно відправляти повідомлення у користувацькому форматі
throw new \LogicException('This serializer is only used for decoding messages.');
}
}
Наступним кроком буде вказати Symfony використовувати цей серіалізатор в одному або декількох ваших транспортах:
1 2 3 4 5 6 7
# config/packages/messenger.yaml
framework:
messenger:
transports:
my_transport:
dsn: '%env(MY_TRANSPORT_DSN)%'
serializer: 'App\Messenger\Serializer\MessageWithTokenDecoder'
Декілька автобусів, автобуси команд та подій
Messenger надає вам один сервіс автобусу повідомлень за замовчуванням. Але ви можете сконфігурувати стільки, скільки ви хочете, створивши автобуси "команд", "запитів" або "подій", та контролюючи їх проміжкове ПЗ. Див. Декілька автобусів.
Поширеною архітектурою при створенні додатків є відокремлення команд від запитів. Команди - це дії, які щось роблять, а запити отримують дані. Це називається CQRS (Command Query Responsibility Segregation). Дивіться статтю Мартіна Фаулера про CQRS, щоб дізнатися більше. Цю архітектуру можна використовувати разом з компонентом Messenger, визначивши декілька автобусів.
Автобус команд дещо відрізняється від автобуса запитів. Наприклад, автобуси команд зазвичай не надають жодних результатів, а автобуси запитів рідко бувають асинхронними. Ви можете сконфігурувати ці автобуси та їхні правила за допомогою проміжного програмного забезпечення.
Також може бути гарною ідеєю відокремити дії від реакцій, ввівши автобус подій. Автобус подій може мати нуль або більше підписників.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
framework:
messenger:
# Автобус, яакий буде впроваджено при впровадженні MessageBusInterface
default_bus: command.bus
buses:
command.bus:
middleware:
- validation
- doctrine_transaction
query.bus:
middleware:
- validation
event.bus:
default_middleware:
enabled: true
# встановити "allow_no_handlers" як (за замовчуванням - false), щоб дозволити
# відсутність сконфігурованого обробника для цього автобуса, без виклику виключення
allow_no_handlers: false
# встановити "allow_no_senders" як false (за замовчуванням - true), щоб викликати виключення,
# якщо для цього автобуса не сконфігуровано відправника
allow_no_senders: true
middleware:
- validation
У результаті буде створено три нові сервіси:
command.bus
: автоматично монтується з підказкою MessageBusInterface (оскільки цеdefault_bus
);query.bus
: автоматично монтується зMessageBusInterface $queryBus
;event.bus
: автоматично монтується зMessageBusInterface $eventBus
.
Обмеження обробників за автобусом
За замовчуванням, кожен обробник буде доступний для обробки повідомлень в усіх
ваших автобусах. Щоб запобігти відправленню повідомлення у неправильний автобус без помилки,
ви можете обмежити кожен обробник певним автобусом за допомогою тегу messenger.message_handler
:
1 2 3 4
# config/services.yaml
services:
App\MessageHandler\SomeCommandHandler:
tags: [{ name: messenger.message_handler, bus: command.bus }]
Таким чином, обробник App\MessageHandler\SomeCommandHandler
буде відомий лише
автобусу command.bus
.
Ви також можете автоматично додати цей тег до низки класів за допомогою _конфігурації сервісу instanceof . Використовуючи цей тег, ви можете визначити автобус повідомлень на основі реалізованого інтерфейсу:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
# config/services.yaml
services:
# ...
_instanceof:
# всі сервіси, що реалізують CommandHandlerInterface, будуть
# зареєстровані в автобусі command.bus
App\MessageHandler\CommandHandlerInterface:
tags:
- { name: messenger.message_handler, bus: command.bus }
# в той час як ті, що реалізують QueryHandlerInterface, будуть
# зареєстровані в автобусі query.bus
App\MessageHandler\QueryHandlerInterface:
tags:
- { name: messenger.message_handler, bus: query.bus }
Налагодження автобусів
Команда debug:messenger
показує список доступних повідомлень та обробників для кожного
автобуса. Ви також можете обмежити список певним автобусом, вказавши його назву як аргумент.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
$ php bin/console debug:messenger
Messenger
=========
command.bus
-----------
Наступні повідомлення можуть бути розгорнуті:
---------------------------------------------------------------------------------------
App\Message\DummyCommand
handled by App\MessageHandler\DummyCommandHandler
App\Message\MultipleBusesMessage
handled by App\MessageHandler\MultipleBusesMessageHandler
---------------------------------------------------------------------------------------
query.bus
---------
Наступні повідомлення можуть бути розгорнуті:
---------------------------------------------------------------------------------------
App\Message\DummyQuery
handled by App\MessageHandler\DummyQueryHandler
App\Message\MultipleBusesMessage
handled by App\MessageHandler\MultipleBusesMessageHandler
---------------------------------------------------------------------------------------
Tip
Команда також покаже PHPDoc опис класів повідомлення та обробника.
Розгортання повідомлення
Якщо ви хочете повторно розгорнути повідомлення (використовуючи той самий транспорт і конверт), створіть
новий RedispatchMessage і відправте
його через ваш автобус. Повторне використання того самого прикладу SmsNotification
, показаного раніше:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;
use App\Message\SmsNotification;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Message\RedispatchMessage;
use Symfony\Component\Messenger\MessageBusInterface;
#[AsMessageHandler]
class SmsNotificationHandler
{
public function __construct(private MessageBusInterface $bus)
{
}
public function __invoke(SmsNotification $message): void
{
// зробити щось з повідомленням
// потім повторно розгорнути його, засновуючись на вашій власній логіці
if ($needsRedispatch) {
$this->bus->dispatch(new RedispatchMessage($message));
}
}
}
Вбудований RedispatchMessageHandler
подбає про те, щоб це повідомлення було повторно відправлено по тому самому автобусу, по якому воно було
відправлено спочатку. Ви також можете використовувати другий аргумент конструктора RedispatchMessage
,
щоб вказати транспорт, який буде використано при повторному розгортанні повідомлення.