Дата оновлення перекладу 2022-05-23
Месенджер: робота з синхронізованими повідомленнями та повідомленнями у черзі¶
Месенджер надає автобус повідомлень з можливістю відправлення повідомлень, а потім роботи з ними одразу ж у вашому додатку, або їх відправлення через транспорт (наприклад, чергу) для обробки пізніше. Щоб дізнатися про це більше, прочитайте документацію компонента Месенджер.
Установка¶
У додатках, що використовують Symfony Flex, виконайте цю команду, щоб встановити месенджер:
1 | $ composer require symfony/messenger
|
Створення повідомлення та обробника¶
Месенджер будується на двох різних класах, які ви створите: (1) класі повідомлення, який містить дані, та (2) класі обробника(ів), який буде викликано після запуску повідомлення. Клас обробника читатиме клас повідомлення та виконувати якусь дію.
До класу повідомлення немає особливих вимог, окрім того, що його можна сериалізувати:
// src/Message/SmsNotification.php
namespace App\Message;
class SmsNotification
{
private $content;
public function __construct(string $content)
{
$this->content = $content;
}
public function getContent(): string
{
return $this->content;
}
}
Обробник повідомлень - це PHP-викликане, рекомендований спосіб його створення - створити клас,
який реалізує MessageHandlerInterface
та має
метод __invoke()
, який має підказки класу повідомлення (або інтерфейс повідомлення):
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;
use App\Message\SmsNotification;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
class SmsNotificationHandler implements MessageHandlerInterface
{
public function __invoke(SmsNotification $message)
{
// ... зробіть щось - на кшталт відправки SMS!
}
}
Завдяки автоконфігурації та підказці SmsNotification
,
Symfony знає, що цей обробник має бути викликаний, коли запускається повідомлення
SmsNotification
. У більшості випадків, це все, що вам треба буде зробити. Але ви
можете також сконфігурувати обробники повідомлень вручну.
Щоб побачити всі сконфігуровані обробники, виконайте:
1 | $ php bin/console debug:messenger
|
Запуск повідомлення¶
Ви готові! Для запуску повідомлення (та виклику обробника), впровадьте сервіс
messenger.default_bus
(через MessageBusInterface
), наприклад, у контролер:
// src/Controller/DefaultController.php
namespace App\Controller;
use App\Message\SmsNotification;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\Messenger\MessageBusInterface;
class DefaultController extends AbstractController
{
public function index(MessageBusInterface $bus)
{
// призведе до виклику SmsNotificationHandler
$bus->dispatch(new SmsNotification('Look! I created a message!'));
// або використайте це скорочення
$this->dispatchMessage(new SmsNotification('Look! I created a message!'));
// ...
}
}
Транспорт: асинхронні повідомлення/повідомлення в черзі¶
За замовчуванням, повідомлення обробляються одразу ж після запуску. Якщо ви хочете обробити повідомлення асинхронно, ви можете сконфігурувати транспорт. Транспорт поже відправляти повідомлення (наприклад, в систему черги), а потім отримувати через робітника. Месенджер підтримує декілька транспортів.
Note
Якщо ви хочете використати транспорт, який не підтримується, подивіться на транспорт Enqueue, який підтримує речі на кшталт Kafka і Google Pub/Sub.
Транспорт реєструється з використанням “DSN”. Завдяки рецепту Flex для Месенджера,
ваш файл .env
вже має декілька прикладів.
1 2 3 | # MESSENGER_TRANSPORT_DSN=amqp://guest:[email protected]:5672/%2f/messages
# MESSENGER_TRANSPORT_DSN=doctrine://default
# MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
|
Розкоментуйте той транспорт, який ви хочете (або встановіть його в .env.local
).
Див. Transport Configuration, щоб дізнатися більше деталей.
Далі, в config/packages/messenger.yaml
, давайте визначимо транспорт під назвою
async
, який використовує цю конфігурацію:
- YAML
1 2 3 4 5 6 7 8 9 10
# config/packages/messenger.yaml framework: messenger: transports: async: "%env(MESSENGER_TRANSPORT_DSN)%" # або розширено для конфігурації більшої кількості опцій #async: # dsn: "%env(MESSENGER_TRANSPORT_DSN)%" # options: []
- XML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
<!-- config/packages/messenger.xml --> <?xml version="1.0" encoding="UTF-8" ?> <container xmlns="http://symfony.com/schema/dic/services" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:framework="http://symfony.com/schema/dic/symfony" xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd"> <framework:config> <framework:messenger> <framework:transport name="async">%env(MESSENGER_TRANSPORT_DSN)%</framework:transport> <!-- або розширено для конфігурації більшої кількості опцій --> <framework:transport name="async" dsn="%env(MESSENGER_TRANSPORT_DSN)%" > <option key="...">...</option> </framework:transport> </framework:messenger> </framework:config> </container>
- PHP
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// config/packages/messenger.php use Symfony\Config\FrameworkConfig; return static function (FrameworkConfig $framework) { $framework->messenger() ->transport('async') ->dsn('%env(MESSENGER_TRANSPORT_DSN)%') ; $framework->messenger() ->transport('async') ->dsn('%env(MESSENGER_TRANSPORT_DSN)%') ->options([]) ; };
Маршрутизація повідомлень до транспорту¶
Тепер, коли у вас є сконфігурований транспорт, замість негайної обробки повідомлень, ви можете сконфігурувати їх так, щоб вони були відправлені транспорту:
- YAML
1 2 3 4 5 6 7 8 9
# config/packages/messenger.yaml framework: messenger: transports: async: "%env(MESSENGER_TRANSPORT_DSN)%" routing: # async - це те ім'я, яке ви дали вашому транспорту вище 'App\Message\SmsNotification': async
- XML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
<!-- config/packages/messenger.xml --> <?xml version="1.0" encoding="UTF-8" ?> <container xmlns="http://symfony.com/schema/dic/services" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:framework="http://symfony.com/schema/dic/symfony" xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd"> <framework:config> <framework:messenger> <framework:routing message-class="App\Message\SmsNotification"> <!-- async - це те ім'я, яке ви дали вашому транспорту вище --> <framework:sender service="async"/> </framework:routing> </framework:messenger> </framework:config> </container>
- PHP
1 2 3 4 5 6 7 8 9
// config/packages/messenger.php use Symfony\Config\FrameworkConfig; return static function (FrameworkConfig $framework) { $framework->messenger() // async - це те ім'я, яке ви дали вашому транспорту вище ->routing('App\Message\SmsNotification')->senders(['async']) ; };
Завдяки цьому, App\Message\SmsNotification
буде відправлено транспорту async
,
і його обробник(и) не будуть викликані одразу ж. Всі повідомлення, не співпавші з
routing
будуть оброблені негайно.
Ви також можете маршрутизувати класи за їхнім батьківським класом або інтерфейсом. Або відправляти повідомлення декільком транспортам:
- YAML
1 2 3 4 5 6 7 8 9
# config/packages/messenger.yaml framework: messenger: routing: # маршрутизуйте всі повідомлення, що розширюють цей приклад базового класу або інтерфейс 'App\Message\AbstractAsyncMessage': async 'App\Message\AsyncMessageInterface': async 'My\Message\ToBeSentToTwoSenders': [async, audit]
- XML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
<!-- config/packages/messenger.xml --> <?xml version="1.0" encoding="UTF-8" ?> <container xmlns="http://symfony.com/schema/dic/services" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:framework="http://symfony.com/schema/dic/symfony" xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd"> <framework:config> <framework:messenger> <!-- маршрутизуйте всі повідомлення, що розширюють цей приклад базового класу або інтерфейс --> <framework:routing message-class="App\Message\AbstractAsyncMessage"> <framework:sender service="async"/> </framework:routing> <framework:routing message-class="App\Message\AsyncMessageInterface"> <framework:sender service="async"/> </framework:routing> <framework:routing message-class="My\Message\ToBeSentToTwoSenders"> <framework:sender service="async"/> <framework:sender service="audit"/> </framework:routing> </framework:messenger> </framework:config> </container>
- PHP
1 2 3 4 5 6 7 8 9 10
// config/packages/messenger.php use Symfony\Config\FrameworkConfig; return static function (FrameworkConfig $framework) { $messenger = $framework->messenger(); // маршрутизуйте всі повідомлення, що розширюють цей приклад базового класу або інтерфейс $messenger->routing('App\Message\AbstractAsyncMessage')->senders(['async']); $messenger->routing('App\Message\AsyncMessageInterface')->senders(['async']); $messenger->routing('My\Message\ToBeSentToTwoSenders')->senders(['async', 'audit']); };
Note
Якщо ви сконфігуруєте маршрутизація і для дочірнього, і для батьківського класу,
використовуються обидва правила. Наприклад, якщо у вас є об’єкт SmsNotification
,
що розширюється з Notification
, будуть використані маршрутизації і для
Notification
, і для SmsNotification
.
Сутності Doctrine у повідомленнях¶
Якщо вам потрібно передати сутність Doctrine у повідомленні, краще за все передати основний
ключ сутності (або будь-яку релевантну інформацію, необхідну обробнику, на кшталт email
,
та ін.) заміть об’єкту:
// src/Message/NewUserWelcomeEmail.php
namespace App\Message;
class NewUserWelcomeEmail
{
private $userId;
public function __construct(int $userId)
{
$this->userId = $userId;
}
public function getUserId(): int
{
return $this->userId;
}
}
Потім, у вашому обробнику, ви можете зробити запит свіжого об’єкту:
// src/MessageHandler/NewUserWelcomeEmailHandler.php
namespace App\MessageHandler;
use App\Message\NewUserWelcomeEmail;
use App\Repository\UserRepository;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
class NewUserWelcomeEmailHandler implements MessageHandlerInterface
{
private $userRepository;
public function __construct(UserRepository $userRepository)
{
$this->userRepository = $userRepository;
}
public function __invoke(NewUserWelcomeEmail $welcomeEmail)
{
$user = $this->userRepository->find($welcomeEmail->getUserId());
// ... відправити електронний лист!
}
}
Це гарантує, що сутність містить свіжі дані.
Синхронна обробка повідомлень¶
Якщо повідомлення не співпадає з жодним правилом маршрутизації, воно
не буде відправлене жодному транспорту, і буде оброблене негайно. В деяких випадках (наприклад,
при `пов'язуванні обробників з різними транспортами`_), легше та гнучкіше обробити їх чітко:
створивши транспорт sync
та “відправивши” повідомлення у нього для негайної обробки:
- YAML
1 2 3 4 5 6 7 8 9 10
# config/packages/messenger.yaml framework: messenger: transports: # ... інші транспорти sync: 'sync://' routing: App\Message\SmsNotification: sync
- XML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
<!-- config/packages/messenger.xml --> <?xml version="1.0" encoding="UTF-8" ?> <container xmlns="http://symfony.com/schema/dic/services" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:framework="http://symfony.com/schema/dic/symfony" xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd"> <framework:config> <framework:messenger> <!-- ... інші транспорти --> <framework:transport name="sync" dsn="sync://"/> <framework:routing message-class="App\Message\SmsNotification"> <framework:sender service="sync"/> </framework:routing> </framework:messenger> </framework:config> </container>
- PHP
1 2 3 4 5 6 7 8 9 10 11
// config/packages/messenger.php use Symfony\Config\FrameworkConfig; return static function (FrameworkConfig $framework) { $messenger = $framework->messenger(); // ... інші транспорти $messenger->transport('sync')->dsn('sync://'); $messenger->routing('App\Message\SmsNotification')->senders(['sync']); };
Створення вашого власного транспорту¶
Ви також можете створити власний транспорт, якщо вам потрібно відправляти або отримувати повідомлення через щось, що не підтримується. Див. How to Create Your own Messenger Transport.
Споживання повідомлень (запуск робітника)¶
Коли ваші повідомлення маршрутизовані, в більшості випадків, вам треба буде
“споживати” їх. Ви можете зробити це за допомогою команди messenger:consume
:
1 2 3 4 | $ php bin/console messenger:consume async
# використайте -vv, щоб побачити деталі того що відбувається
$ php bin/console messenger:consume async -vv
|
Перший аргумент - це ім’я отримувача (або id сервісу, якщо ви маршрутизували до користувацького сервісу). За замовчуванням, команда буде виконуватися нескінченно: шукати нові повідомлення у вашому транспорті та обробляти їх. Ця команда називається вашим “робітником”.
Tip
Щоб правильно зупинити робітника, викликайте екземпляр
StopWorkerException
.
New in version 5.4: Клас StopWorkerException
було представлено в Symfony 5.4.
Запуск у виробництво¶
У виробництві є декілька важливих речей, про які варто подумати:
- Використовуйте Супервізора, щоб ваш(і) робітник(и) працювали
- Ви хочете, щоб один або більше “робітників” працювали весь час. Щоб зробити це, використайте систему контролю процесу на кшталт Супервізора.
- Не дозволяйте робітникам працювати нескінченно
- Деякі сервіси (на кшталт
EntityManager
Doctrine) будуть споживати все більше пам’яті з часом. Тому замість того, щоб дозволяти вашому робітнику працювати завжди, використовуйте прапорець на кшталтmessenger:consume --limit=10
, щоб вказати робітнику, що він має обробити лише 10 повідомлень до припинення роботи (потім Супервізор створить новий процес). Також є інші опції на кшталт--memory-limit=128M
и--time-limit=3600
. - Перезавантажуйте робітників при запуску
- Кожний раз при запуску вам треба буде перезавантажити всі процеси ваших робітників,
щоб вони бачили новозапущений код. Щоб зробити це, виконайте
messenger:stop-workers
при запуску. Це сигналізує кожному робітнику, що він має завершити обробку поточного повідомлення та граціозно завершити роботу. Потім, Супервізор створить нові процеси робітників. Команда використовує кеш app внутрішньо - тому переконайтеся в тому, що він сконфігурований для використання адаптеру за вашим смаком. - Використовуйте одиин кеш між запусками
- Якщо ваша стратегія запуску полягає у створенні нових цільових каталогів, вам
необхідно встановити значення опції конфігурації cache.prefix.seed,
щоб використовувати один і теж простір імен кешу між запусками. Інакше, пул
cache.app
буде використовувати значення параметруkernel.project_dir
в якості бази для простору імен, що призведе до різних просторів імен кожний раз при запуску.
Пріоритизований транспорт¶
Іноді певні типи повідомлень повинні мати вищий пріоритет та бути оброблені до інших. Щоб зробити це можливим, ви можете створити декілька транспортів та маршрутизувати різні повідомлення до них. Наприклад:
- YAML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
# config/packages/messenger.yaml framework: messenger: transports: async_priority_high: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' options: # queue_name відповідає транспорту doctrine queue_name: high # для AMQP відправте окремий обмін, а потім поставте у чергу #exchange: # name: high #queues: # messages_high: ~ # або спробуйте redis "group" async_priority_low: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' options: queue_name: low routing: 'App\Message\SmsNotification': async_priority_low 'App\Message\NewUserWelcomeEmail': async_priority_high
- XML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
<!-- config/packages/messenger.xml --> <?xml version="1.0" encoding="UTF-8" ?> <container xmlns="http://symfony.com/schema/dic/services" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:framework="http://symfony.com/schema/dic/symfony" xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd"> <framework:config> <framework:messenger> <framework:transport name="async_priority_high" dsn="%env(MESSENGER_TRANSPORT_DSN)%"> <framework:options> <framework:queue> <framework:name>Queue</framework:name> </framework:queue> </framework:options> </framework:transport> <framework:transport name="async_priority_low" dsn="%env(MESSENGER_TRANSPORT_DSN)%"> <option key="queue_name">low</option> </framework:transport> <framework:routing message-class="App\Message\SmsNotification"> <framework:sender service="async_priority_low"/> </framework:routing> <framework:routing message-class="App\Message\NewUserWelcomeEmail"> <framework:sender service="async_priority_high"/> </framework:routing> </framework:messenger> </framework:config> </container>
- PHP
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// config/packages/messenger.php use Symfony\Config\FrameworkConfig; return static function (FrameworkConfig $framework) { $messenger = $framework->messenger(); $messenger->transport('async_priority_high') ->dsn('%env(MESSENGER_TRANSPORT_DSN)%') ->options(['queue_name' => 'high']); $messenger->transport('async_priority_low') ->dsn('%env(MESSENGER_TRANSPORT_DSN)%') ->options(['queue_name' => 'low']); $messenger->routing('App\Message\SmsNotification')->senders(['async_priority_low']); $messenger->routing('App\Message\NewUserWelcomeEmail')->senders(['async_priority_high']); };
Потім ви зможете запускати окремних робітників для кожного транспорту, або інструктувати одного робітника, щоб він обробляв повідомлення в порядку пріоритетності:
1 | $ php bin/console messenger:consume async_priority_high async_priority_low
|
Робітник завжди спочатку шукатиме повідомлення, які очікують async_priority_high
.
Якщо таких немає, потім він споживатиме повідомлення з async_priority_low
.
Обмежте споживання для конкретних черг¶
Деякий транспорт (особливо AMQP) має концепцію обміну та черг. Транспорт Symfony завжди пов’язаний з обміном. За замовчуванням, робітник споживає з усіх черг, з’єднаних з обміном вказаного транспорту. Однак є приклади використання, коли вам потрібно, щоб робітник споживав лише з конкретної черги.
Ви можете обмежити робітника, щоб він обробляв лише повідомлення з конкретних черг:
1 | $ php bin/console messenger:consume my_transport --queues=fasttrack
|
Щоб дозволити використання опції queues
, отримувач має реалізовувати
QueueReceiverInterface
.
New in version 5.3: Обмеження робітника для конкретних черг було представлено в Symfony 5.3.
Конфігурація супервізора¶
Супервізор - це чудовий інструмент для гарантії того, що процес(и) ваших робітників
завжди виконується (навіть якщо він закривається у зв’язку з помилкою, досягненням
ліміту повідомлень або завдяки messenger:stop-workers
). Ви можете встановити його
на Ubuntu, наприклад, через:
1 | $ sudo apt-get install supervisor
|
Файли конфігурації Супервізора зазвичай живуть в каталозі /etc/supervisor/conf.d
.
Наприклад, ви можете створити там новий файл messenger-worker.conf
, щоб переконатися,
що 2 екземпляри messenger:consume
працюють завжди:
1 2 3 4 5 6 7 8 9 | ;/etc/supervisor/conf.d/messenger-worker.conf
[program:messenger-consume]
command=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600
user=ubuntu
numprocs=2
startsecs=0
autostart=true
autorestart=true
process_name=%(program_name)s_%(process_num)02d
|
Змініть аргумент async
, щоб він використовував назву вашого транспорту (або
транспортів) та user
на Unix-користувача на вашому сервері.
Якщо ви використовуєте транспорт Redis, відмітьте, що кожному робітнику необхідне
унікальне ім’я споживача, щоб уникнути обробки одного повідомлення декількома
робітниками. Один зі способів досягнути цього - встановити змінну середовища в
файлі конфігурації Супервізора, на яку ви потім зможете послатися в messenger.yaml
(див. розділ Redis вище):
1 | environment=MESSENGER_CONSUMER_NAME=%(program_name)s_%(process_num)02d
|
Далі, вкажіть Супервізору прочитати вашу конфігурацію та запустити ваших робітників:
1 2 3 4 5 | $ sudo supervisorctl reread
$ sudo supervisorctl update
$ sudo supervisorctl start messenger-consume:*
|
Див. документацію Супервізора, щоб дізнатися більше деталей.
Граціозне вимкнення¶
Якщо ви встановили у своєму проекті PHP-розширення PCNTL, робітники будуть
обробляти POSIX сигнал SIGTERM
для завершення обробки поточного повідомлення
перед виходом.
У деяких випадках, сигнал SIGTERM
відправляється самим Супервізором (наприклад,
зупинка контейнера Docker в якому Супервізор - точка входу). В таких випадках, вам
треба додати ключ stopwaitsecs
до конфігурації програми (зі значенням бажаного
періоду відстрочки в секундах) для того, щоб виконати граціозне вимкнення:
1 2 | [program:x]
stopwaitsecs=20
|
Робітник без стану¶
PHP створений бути без стану, різни запити не мають спільних джерел. В HTTP-контексті PHP очищує все перед відправленням відповіді, тому ви можете вирішити не турбуватися про сервіси, які можуть допускати витік пам’яті.
З іншої сторони, робітники зазвичай працюють у довгострокових CLI-процесах, які не завершуються після обробки повідомлення. Тому вам треба бути обережними зі станами сервісів, щоб вони не давали витік інформації та/або пам’яті з одного повідомлення у інше.
Однак, деякі сервіси Symfony, на кшталт обробника fingers crossed
Monolog, допускають витік за своїм задумом. В таких випадках, використайте опцію транспорту
reset_on_message
, щоб автоматично скинути сервіс-контейнер між двома повідомленнями:
- YAML
1 2 3 4 5 6 7
# config/packages/messenger.yaml framework: messenger: reset_on_message: true transports: async: dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
- XML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
<!-- config/packages/messenger.xml --> <?xml version="1.0" encoding="UTF-8" ?> <container xmlns="http://symfony.com/schema/dic/services" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:framework="http://symfony.com/schema/dic/symfony" xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd"> <framework:config> <framework:messenger> <framework:transport name="async" dsn="%env(MESSENGER_TRANSPORT_DSN)%" reset-on-message="true"> </framework:transport> </framework:messenger> </framework:config> </container>
- PHP
1 2 3 4 5 6 7 8 9 10 11
// config/packages/messenger.php use Symfony\Config\FrameworkConfig; return static function (FrameworkConfig $framework) { $messenger = $framework->messenger(); $messenger->transport('async') ->dsn('%env(MESSENGER_TRANSPORT_DSN)%') ->resetOnMessage(true) ; };
New in version 5.4: Опція reset_on_message
була представлена в Symfony 5.4.
Повторні спроби та помилки¶
Якщо під час споживання повідомлення з транспорту буде викликане виключення, воно буде автоматично повторно відправлене транспорту, щоб спробувати знову. За замовчуванням, повідомлення має 3 спроби перед скиданням або відправкою транспорту помилок. Кожна спроба буде також відкладена в часі, на випадок, якщо вона була викликана тимчасовою проблемою. Все це можна сконфігурувати для кожного транспорту:
- YAML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# config/packages/messenger.yaml framework: messenger: transports: async_priority_high: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' # конфігурація за замовчуванням retry_strategy: max_retries: 3 # затримка в мілісекундах delay: 1000 # робить так, щоб затримка була довшою перед кожною наступною спробою # наприклад, затримка в 1 секунду, 2 секунди, 4 секунди multiplier: 2 max_delay: 0 # перевизначити це все сервісом, який реалізує # Symfony\Component\Messenger\Retry\RetryStrategyInterface # service: null
- XML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
<!-- config/packages/messenger.xml --> <?xml version="1.0" encoding="UTF-8" ?> <container xmlns="http://symfony.com/schema/dic/services" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:framework="http://symfony.com/schema/dic/symfony" xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd"> <framework:config> <framework:messenger> <framework:transport name="async_priority_high" dsn="%env(MESSENGER_TRANSPORT_DSN)%?queue_name=high_priority"> <framework:retry-strategy max-retries="3" delay="1000" multiplier="2" max-delay="0"/> </framework:transport> </framework:messenger> </framework:config> </container>
- PHP
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
// config/packages/messenger.php use Symfony\Config\FrameworkConfig; return static function (FrameworkConfig $framework) { $messenger = $framework->messenger(); $messenger->transport('async_priority_high') ->dsn('%env(MESSENGER_TRANSPORT_DSN)%') // конфігурація за замовчуванням ->retryStrategy() ->maxRetries(3) // затримка в мілісекундах ->delay(1000) // робить так, щоб затримка була довшою перед кожною наступною спробою // наприклад, затримка в 1 секунду, 2 секунди, 4 секунди ->multiplier(2) ->maxDelay(0) // перевизначити це все сервісом, який реалізує // Symfony\Component\Messenger\Retry\RetryStrategyInterface ->service(null) ; };
Tip
Symfony запускає WorkerMessageRetriedEvent
,
коли повідомлення має повторні спроби, тому ви можете виконати власну логіку.
New in version 5.2: Клас WorkerMessageRetriedEvent
було представлено в Symfony 5.2.
Уникання повторних спроб¶
Іноді обробка повідомлення може бути невдалою, і ви будете знати, що
це перманентно і повторних спроб не потрібно. Якщо ви викличете
UnrecoverableMessageHandlingException
,
повідомлення не матиме повторних спроб.
Форсування повторних спроб¶
New in version 5.1: RecoverableMessageHandlingException
було представлено в Symfony 5.1.
Іноді обробка повідомлення може бути невдалою, і ви будете знати, що це
тимчасово, і необхідна повторна спроба. Якщо ви викличете
RecoverableMessageHandlingException
,
повідомлення завжди матиме повторну спробу.
Збереження та повторні спроби невдалих повідомлень¶
Якщо повідомлення зазнає невдачі та має декілька повторних спроб (max_retries
), воно
потім буде відбраковане. Щоб уникнути цього, ви можете сконфігурувати failure_transport
:
- YAML
1 2 3 4 5 6 7 8 9 10
# config/packages/messenger.yaml framework: messenger: # після повторних спроб, повідомлення будуть відправлені транспорту "failed" failure_transport: failed transports: # ... інший транспорт failed: 'doctrine://default?queue_name=failed'
- XML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
<!-- config/packages/messenger.xml --> <?xml version="1.0" encoding="UTF-8" ?> <container xmlns="http://symfony.com/schema/dic/services" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:framework="http://symfony.com/schema/dic/symfony" xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd"> <framework:config> <!-- після повторних спроб, повідомлення будуть відправлені транспорту "failed" --> <framework:messenger failure-transport="failed"> <!-- ... інший транспорт --> <framework:transport name="failed" dsn="doctrine://default?queue_name=failed"/> </framework:messenger> </framework:config> </container>
- PHP
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// config/packages/messenger.php use Symfony\Config\FrameworkConfig; return static function (FrameworkConfig $framework) { $messenger = $framework->messenger(); // після повторних спроб, повідомлення будуть відправлені транспорту "failed" $messenger->failureTransport('failed'); // ... інший транспорт $messenger->transport('failed') ->dsn('doctrine://default?queue_name=failed'); };
У цьому прикладі, якщо обробка повідомлення зазнає невдачі 3 рази (значення за
замовчуванням max_retries
), воно потім буде відправлене транспорту failed
.
Хоча ви можете використати messenger:consume failed
для споживання його, як
звичайного транспорту, вам скоріше захочеться вручну переглянути повідомлення в
транспорті помилок та вирішити щодо повторних спроб:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | # побачити всі повідомлення в транспорті помилок
$ php bin/console messenger:failed:show
# побачити деталі конкретної помилки
$ php bin/console messenger:failed:show 20 -vv
# побачити та повторно спробувати кожне повідомлення окремо
$ php bin/console messenger:failed:retry -vv
# повторна спроба конкретних повідомлень
$ php bin/console messenger:failed:retry 20 30 --force
# видалити повідомлення без повторної спроби
$ php bin/console messenger:failed:remove 20
# видалити повідомлення без повторних спроб та показати кожне повідомлення перед видаленням
$ php bin/console messenger:failed:remove 20 30 --show-messages
|
New in version 5.1: Опція --show-messages
була представлена в Symfony 5.1.
Якщо повідомлення знов зазнає невдачі, воно буде відправлене назад у транспорт помилок у відповідності зі звичайними правилами повторних спроб. Як тільки буде досягнено максимум повторних спроб, повідомлення буде скинуте перманентно.
Декілька помилкових транспортів¶
New in version 5.3: Можливість використовувати декілька транспортів помилок була представлена в Symfony 5.3.
Іноді недостатньо мати один глобальний сконфігурований failed transport
, тому що деякі
повідомлення важливіші за інші. В таких випадках ви можете перевизначити транспорт помилок
лише для певних транспортів:
- YAML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# config/packages/messenger.yaml framework: messenger: # після повторних спроб повідомлення будуть відправлені транспорту "failed" # за замовчуванням, якщо всередині транспорту не сконфігурований "failed_transport" failure_transport: failed_default transports: async_priority_high: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' failure_transport: failed_high_priority # так як транспорт помилок не сконфігурований, буде використовуватися встановлений # глобальний набір "failure_transport" async_priority_low: dsn: 'doctrine://default?queue_name=async_priority_low' failed_default: 'doctrine://default?queue_name=failed_default' failed_high_priority: 'doctrine://default?queue_name=failed_high_priority'
- XML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
<!-- config/packages/messenger.xml --> <?xml version="1.0" encoding="UTF-8" ?> <container xmlns="http://symfony.com/schema/dic/services" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:framework="http://symfony.com/schema/dic/symfony" xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd"> <framework:config> <!-- після повторних спроб повідомлення будуть відправлені транспорту "failed" за замовчуванням, якщо всередині транспорту не сконфігурований "failed_transport" --> <framework:messenger failure-transport="failed_default"> <framework:transport name="async_priority_high" dsn="%env(MESSENGER_TRANSPORT_DSN)%" failure-transport="failed_high_priority"/> <!-- так як транспорт помилок не сконфігурований, буде використовуватися встановлений глобальний набір "failure_transport" --> <framework:transport name="async_priority_low" dsn="doctrine://default?queue_name=async_priority_low"/> <framework:transport name="failed_default" dsn="doctrine://default?queue_name=failed_default"/> <framework:transport name="failed_high_priority" dsn="doctrine://default?queue_name=failed_high_priority"/> </framework:messenger> </framework:config> </container>
- PHP
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
// config/packages/messenger.php use Symfony\Config\FrameworkConfig; return static function (FrameworkConfig $framework) { $messenger = $framework->messenger(); // після повторних спроб повідомлення будуть відправлені транспорту "failed" // за замовчуванням, якщо всередині транспорту не сконфігурований "failed_transport" $messenger->failureTransport('failed_default'); $messenger->transport('async_priority_high') ->dsn('%env(MESSENGER_TRANSPORT_DSN)%') ->failureTransport('failed_high_priority'); // так як транспорт помилок не сконфігурований, буде використовуватися встановлений // глобальний набір "failure_transport" $messenger->transport('async_priority_low') ->dsn('doctrine://default?queue_name=async_priority_low'); $messenger->transport('failed_default') ->dsn('doctrine://default?queue_name=failed_default'); $messenger->transport('failed_high_priority') ->dsn('doctrine://default?queue_name=failed_high_priority'); };
Якщо немає визначеного failure_transport
глобально або на рівні транспорту, повідомлення
буде скинуте після певної кількості спроб.
Невдалі команди мають необов’язкову опцію --transport
, щоб вказати
failure_transport
, сконфігурований на рівні транспорту.
1 2 3 4 5 6 7 8 | # побачити всі повідомлення в транспорті "failure_transport"
$ php bin/console messenger:failed:show --transport=failure_transport
# повторна спроба конкретних повідомлень з "failure_transport"
$ php bin/console messenger:failed:retry 20 30 --transport=failure_transport --force
# видалити повідомлення без повторної спроби з "failure_transport"
$ php bin/console messenger:failed:remove 20 --transport=failure_transport
|
Конфігурація транспорту¶
Месенджер підтримує декілька різних типів транспорту, кожний зі своїми опціями. Опції можуть бути передані транспорту через рядок DSN або конфігурацію.
1 2 | # .env
MESSENGER_TRANSPORT_DSN=amqp://localhost/%2f/messages?auto_setup=false
|
- YAML
1 2 3 4 5 6 7 8
# config/packages/messenger.yaml framework: messenger: transports: my_transport: dsn: "%env(MESSENGER_TRANSPORT_DSN)%" options: auto_setup: false
- XML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
<!-- config/packages/messenger.xml --> <?xml version="1.0" encoding="UTF-8" ?> <container xmlns="http://symfony.com/schema/dic/services" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:framework="http://symfony.com/schema/dic/symfony" xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd"> <framework:config> <framework:messenger> <framework:transport name="my_transport" dsn="%env(MESSENGER_TRANSPORT_DSN)%"> <framework:options auto-setup="false"/> </framework:transport> </framework:messenger> </framework:config> </container>
- PHP
1 2 3 4 5 6 7 8 9 10
// config/packages/messenger.php use Symfony\Config\FrameworkConfig; return static function (FrameworkConfig $framework) { $messenger = $framework->messenger(); $messenger->transport('my_transport') ->dsn('%env(MESSENGER_TRANSPORT_DSN)%') ->options(['auto_setup' => false]); };
Опції, визначені під options
, панують над визначеними в DSN.
Транспорт AMQP¶
Транспорт AMQP використовує PHP-розширення AMQP для відправлення повідомлень в чергу на кшталт RabbitMQ.
New in version 5.1: Починаючи з Symfony 5.1, транспорт AMQP переїхав у окремий пакет. Встановіть його, виконавши:
1 | $ composer require symfony/amqp-messenger
|
DSN транспорту AMQP може виглядати так:
1 2 3 4 5 | # .env
MESSENGER_TRANSPORT_DSN=amqp://guest:[email protected]:5672/%2f/messages
# або використайте протокол AMQPS
MESSENGER_TRANSPORT_DSN=amqps://guest:[email protected]/%2f/messages
|
New in version 5.2: Підтримка протоколу AMQPS була представлена в Symfony 5.2.
Якщо ви хочете виокристати AMQP, закодований за допомогою TLS/SSL, ви маєте також
надати CA-сертифікат. Визначіть шлях сертифікату в налаштуванні PHP.ini amqp.cacert
(наприклад, amqp.cacert = /etc/ssl/certs
) або в параметрі DSN cacert
(наприклад, amqps://localhost?cacert=/etc/ssl/certs/
).
Порт, за замовчуванням використовуваний AMQP, закодованим за допомогою TLS/SSL, - 5671,
але ви можете перевизначити його в параметрі DSN port
(наприклад,
amqps://localhost?cacert=/etc/ssl/certs/&port=12345
).
Note
За замовчуванням, транспорт автоматично створюватиме будь-які необхідні обміни, черги та сполучні ключі. Це можна відключити, але деяків функції можуть працювати некоректно (на кшталт відкладених черг).
Note
З Symfony 5.3 або новіше, ви можете обмежити споживачів транспорту AMQP, щоб вони обробляли лише повідомлення з деяких черг якогось обміну. Див. Limit Consuming to Specific Queues.
Транспорт має інші опції, включно із способами конфігурації обміну, сполучними ключами,
чергами і т.д. Дивіться документацію Connection
.
Транспорт має ряд опцій:
New in version 5.2: Опція confirm_timeout
була представлена в Symfony 5.2.
Deprecated since version 5.3: Опція prefetch_count
застаріла в Symfony 5.3 так як вона не має
ефекту в транспорті AMQP месенджера.
Ви можете також сконфігурувати налаштування спеціально для AMQP у вашому повідомленні,
додавши AmqpStamp
до вашого Конверту:
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp;
// ...
$attributes = [];
$bus->dispatch(new SmsNotification(), [
new AmqpStamp('custom-routing-key', AMQP_NOPARAM, $attributes),
]);
Caution
Споживачі не відображаються в панелі адміна, так як цей транспорт не покладається на
\AmqpQueue::consume()
, який блокує. Наявність блокуючого отримувача робить опції
--time-limit/--memory-limit
команди messenger:consume
. а також команди
messenger:stop-workers
марними, так як вони покладаються на той факт, що отримувач
повертається негайно, незалежно від того, знаходить він повідомлення, чи ні. Робітник
споживання відповідає за ітерацію до отримання повідомлення для обробки та/або до того,
як буде досягнена одна з умов зупинки. Таким чином, логіка зупинки робітника може бути
досягнена, якщо він застряг на блокуючому виклику.
Транспорт Doctrine¶
Транспорт Doctrine може бути використаний для зберігання повідомлень в таблиці бази даних.
New in version 5.1: Починаючи з Symfony 5.1, транспорт Doctrine було переміщено в окремий пакет. Встановіть його, виконавши:
1 | $ composer require symfony/doctrine-messenger
|
DSN транспорту Doctrine може виглядати так:
1 2 | # .env
MESSENGER_TRANSPORT_DSN=doctrine://default
|
Формат doctrine://<connection_name>
, у випадку, якщо у вас є декілька з’єднань і ви
хочете використати якесь відмінне від “default”. Транспорт буде автоматично створювати
таблицю під назвою messenger_messages
.
New in version 5.1: Можливість автоматично генерувати міграцію для таблиці
messenger_messages
була вредставлена в Symfony 5.1 та DoctrineBundle 2.1.
Або, для створення таблиці самостійно, встановіть опцію auto_setup
як false
,
та згенеруйте міграцію.
Caution
Властивість повідомлень datetime, яка зберігається в базі даних, використовує часову зону поточної системи. Це може викликати проблеми, якщо декілька машин з різними конфігураціями часових зон використовують одне сховище.
Транспорт має такі опції:
Опція | Опис | За замовчуванням |
---|---|---|
table_name | Назва таблиці | messenger_messages |
queue_name | Назва черги (стовпчик в таблиці, щоб вкиористовувати одну таблицю для декількох транспортів) | default |
redeliver_timeout | Тайм-аут перед повторною спробою повідомлення в черзі, але не в стані “обробки” (якщо робітник за якоїсь причини зупинився, станеться це, і зрештою ви маєте знову спробувати повідомлення) - в секундах. | 3600 |
auto_setup | Чи має таблиця бути створена автоматично під час відправки/отримання. | true |
New in version 5.1: Можливість використовувати PostgreSQL LISTEN/NOTIFY була представлена в Symfony 5.1.
При використанні PostgreSQL, у вас є доступ до наступних опцій для отримання переваг функції LISTEN/NOTIFY. Це дозволяє більш продуктивний підхід, ніж поведінка голосування транспорту Doctrine за замовчуванням, так як PostgreSQL буде напряму повідомляти робітників, коли нове повідомлення з’являтиметься в таблиці.
Опція | Опис | За замовчуванням |
---|---|---|
use_notify | Чи використовувати LISTEN/NOTIFY. | true |
check_delayed_interval | Інтервал перевірки відкладених повідомлень в мілісекундах. Встановіть як 0, щоб відключити перевірку. | 1000 |
get_notify_timeout | Тривалість часу очікування відповіді при
виклику PDO::pgsqlGetNotify` , в
мілісекундах. |
0 |
Транспорт Beanstalkd¶
New in version 5.2: Транспорт Beanstalkd було представлено в Symfony 5.2.
Транспорт Beanstalkd відправляє повідомлення прямо в робочу чергу Beanstalkd. Встановіть його, виконавши:
1 | $ composer require symfony/beanstalkd-messenger
|
DSN транспорту Beanstalkd може виглядати так:
1 2 3 4 5 | # .env
MESSENGER_TRANSPORT_DSN=beanstalkd://localhost:11300?tube_name=foo&timeout=4&ttr=120
# Якщо порту немає, він за замовчуванням буде 11300
MESSENGER_TRANSPORT_DSN=beanstalkd://localhost
|
Транспорт має ряд опцій:
Опція | Опис | За замовчуванням |
---|---|---|
tube_name | Назва черги | default |
timeout | Тайм-аут резерву повідомлення - в секундах. | 0 (змусить сервер одразу ж або повернути відповідь, або викликати TransportException) |
ttr | Час виконання повідомлення перед тим як помістити його назад у чергу готовності - в секундах. | 90 |
Транспорт Redis¶
Транспорт Redis використовує потоки для створення черги повідомлень. Цей транспорт вимагає PHP-розширення Redis (>=4.3) та працюючого серверу Redis (^5.0).
New in version 5.1: Починаючи з Symfony 5.1, транспорт Redis було переміщено в окремий пакет. Встановіть його, виконавши:
1 | $ composer require symfony/redis-messenger
|
DSN транспорту Redis може виглядати так:
1 2 3 4 5 6 7 8 | # .env
MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
# Повний приклад DSN
MESSENGER_TRANSPORT_DSN=redis://[email protected]:6379/messages/symfony/consumer?auto_setup=true&serializer=1&stream_max_entries=0&dbindex=0&delete_after_ack=true
# Приклад кластеру Redis
MESSENGER_TRANSPORT_DSN=redis://host-01:6379,redis://host-02:6379,redis://host-03:6379,redis://host-04:6379
# Приклад Unix-сокету
MESSENGER_TRANSPORT_DSN=redis:///var/run/redis.sock
|
New in version 5.1: DSN Unix-сокету була представлена в Symfony 5.1.
Деякі опції можуть бути сконфігуровані через DSN або ключ options
під транспортом в messenger.yaml
:
Caution
Ніколи не повинно бути більше однієї команди messenger:consume
, виконуваної з однаковою
комбінацією stream
, group
та consumer
, інашке повідомлення можуть бути оброблені
більше, ніж один раз. Якщо ви запускаєте декілька робітників черги, consumer
може бути
встановлено як змінну оточення (на кшталт %env(MESSENGER_CONSUMER_NAME)%
), встановлено
Супервізором (приклад нижче) або будь-яким іншим сервісом, використовуваним для управління
процесами робітників.
В середовищі контейнера, HOSTNAME
може бути використано як ім’я споживача, так як там
лише один робітник на контейнер/хостинг. Якщо ви використовуєте Kubernetes для управління
контейнерами, розгляньте використання StatefulSet
для стабілізації імен.
Tip
Встановіть delete_after_ack
як true
(якщо у вас одна група) або визначте
stream_max_entries
(якщо ви можете припустити, яка максимальна кількість записів
припустима у вашому випадку), щоб уникнути витоків пам’яті. В іншому випадку, всі
повідомлення назавжди залишаться в Redis.
New in version 5.1: Опції delete_after_ack
, redeliver_timeout
и claim_interval
були представлені в Symfony 5.1.
New in version 5.2: Опції delete_after_reject
і lazy
були представлені в Symfony 5.2.
New in version 5.4: Опції sentinel_persistent_id
, sentinel_retry_interval
, sentinel_read_timeout
,
sentinel_timeout
, і sentinel_master
були представлені в Symfony 5.4.
Deprecated since version 5.4: Відсутність установки чіткого значення для опції delete_after_ack
не заохочується
починаючи з версії Symfony 5.4. В Symfony 6.0, значення цієї опції за замовчуванням
змінюється з false
на true
.
Транспорт у пам’яті¶
Транспорт in-memory
насправді не доставляє повідомлення. Замість цього, він тримає
їх у пам’яті під час запиту, що може бути корисним для тестування. Наприклад, якщо у вас
є транспорт async_priority_normal
, ви можете перевизначати його в середовищі test
,
щоб використати цей транспорт:
- YAML
1 2 3 4 5
# config/packages/test/messenger.yaml framework: messenger: transports: async_priority_normal: 'in-memory://'
- XML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
<!-- config/packages/test/messenger.xml --> <?xml version="1.0" encoding="UTF-8" ?> <container xmlns="http://symfony.com/schema/dic/services" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:framework="http://symfony.com/schema/dic/symfony" xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd"> <framework:config> <framework:messenger> <framework:transport name="async_priority_normal" dsn="in-memory://"/> </framework:messenger> </framework:config> </container>
- PHP
1 2 3 4 5 6 7 8 9
// config/packages/test/messenger.php use Symfony\Config\FrameworkConfig; return static function (FrameworkConfig $framework) { $messenger = $framework->messenger(); $messenger->transport('async_priority_normal') ->dsn('in-memory://'); };
Тоді під час тестування повідомлення не будуть відправлені реальному транспорту. Навіть краще, у тесті, ви можете перевірити, щоб лише одне повідомлення було відправлене під час запиту:
// tests/Controller/DefaultControllerTest.php
namespace App\Tests\Controller;
use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;
use Symfony\Component\Messenger\Transport\InMemoryTransport;
class DefaultControllerTest extends WebTestCase
{
public function testSomething()
{
$client = static::createClient();
// ...
$this->assertSame(200, $client->getResponse()->getStatusCode());
/* @var InMemoryTransport $transport */
$transport = self::$container->get('messenger.transport.async_priority_normal');
$this->assertCount(1, $transport->getSent());
}
}
Транспорт має ряд опцій:
serialize
(бульову, за замовчуванням:false
)- Чи сериалізувати повідомлення. Це корисно для тестування додатковго шару, особливо коли ви використовуєте власний сериалізатор повідомлень.
New in version 5.3: Опція serialize
була представлена в Symfony 5.3.
Note
Всі транспорти in-memory
будуть автоматично скинуті після кожного тесту
в класах тестів, що розширюють
KernelTestCase
або WebTestCase
.
Amazon SQS¶
New in version 5.1: Транспорт Amazon SQS було представлено в Symfony 5.1.
Транспорт Amazon SQS чудово підходить для додатку на AWS. Встановіть його, виконавши:
1 | $ composer require symfony/amazon-sqs-messenger
|
DSN SQS транспорту виглядає так:
1 2 3 | # .env
MESSENGER_TRANSPORT_DSN=https://sqs.eu-west-3.amazonaws.com/123456789012/messages?access_key=AKIAIOSFODNN7EXAMPLE&secret_key=j17M97ffSVoKI0briFoo9a
MESSENGER_TRANSPORT_DSN=sqs://localhost:9494/messages?sslmode=disable
|
Note
Транспорт автоматично створить необхідні черги. Це можна відключити, встановивши
опцію auto_setup
як false
.
Tip
До відправлення або отримання повідомлення, Symfony необхідно конвертувати назву
черги в URL черги AWS, викликавши API GetQueueUrl
в AWS. Цього додаткового
API-виклику можна уникнути, надавши DSN, яка є URL черги.
New in version 5.2: Функція надання URL черги в DSN була представлена в Symfony 5.2.
Транспорт має ряд опцій:
Опція | Опис | За замовчуванням |
---|---|---|
access_key |
Ключ доступу AWS | |
account |
Ідентифікатор AWS-аккаунту | Володар облікових даних |
auto_setup |
Чи потрібно автоматично створювати чергу під час відправки/отримання. | true |
buffer_size |
Кількість повідомлень для попереднього вилучення | 9 |
debug |
Якщо true , веде лог всіх HTTP
запитів та відповідей (це впливає на
продуктивність) |
false |
endpoint |
Абсолютний URL до SQS-сервісу | https://sqs.eu-west-1.amazonaws.com |
poll_timeout |
Час очікування нового повідомлення в секундах | 0.1 |
queue_name |
Назва черги | messages |
region |
Назва AWS-регіону | eu-west-1 |
secret_key |
Секретныи ключ AWS | |
visibility_timeout |
Кількість секунд, протягом яких повідомлення не буде видимим (Visibility Timeout) | Конфігурація черги |
wait_time |
Тривалість Long polling в секундах | 20 |
New in version 5.3: Опція debug
була представлена в Symfony 5.3.
Note
Параметр wait_time
визначає максимальний час очікування для Amazon SQS до того, як
повідомлення буде доступне в черзі, перед відправленням відповіді. Це допомагає знизити
вартість використання Amazon SQS, усуваючи деяку кількість пустих відповідей.
Параметр poll_timeout
визначає час очікування отримувача до повернення null.
Він уникає блокування інших отримувачів від виклику.
Note
Якщо назва черги має суфікс .fifo
, AWS створить чергу FIFO. Використайте марку
AmazonSqsFifoStamp
,
щоб визначити Message group ID
і Message deduplication ID
.
Черги FIFO не підтримують установки затримки окремих повідомлень, значення delay: 0
потребується в налаштуваннях стратегії повторних спроб.
Сериалізація повідомлень¶
Коли повідомлення відправляються (та отримуються) в транпсорт, вони сериалізуються з
використанням нативних функцій PHP serialize()
і unserialize()
. Ви можете змінити
це глобально (або для кожного транспорту) на сервіс, який реалізує
SerializerInterface
:
- YAML
1 2 3 4 5 6 7 8 9 10 11 12 13
# config/packages/messenger.yaml framework: messenger: serializer: default_serializer: messenger.transport.symfony_serializer symfony_serializer: format: json context: { } transports: async_priority_normal: dsn: # ... serializer: messenger.transport.symfony_serializer
- XML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
<!-- config/packages/messenger.xml --> <?xml version="1.0" encoding="UTF-8" ?> <container xmlns="http://symfony.com/schema/dic/services" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:framework="http://symfony.com/schema/dic/symfony" xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd"> <framework:config> <framework:messenger> <framework:serializer default-serializer="messenger.transport.symfony_serializer"> <framework:symfony-serializer format="json"> <framework:context/> </framework:symfony-serializer> </framework:serializer> <framework:transport name="async_priority_normal" dsn="..." serializer="messenger.transport.symfony_serializer"/> </framework:messenger> </framework:config> </container>
- PHP
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// config/packages/messenger.php use Symfony\Config\FrameworkConfig; return static function (FrameworkConfig $framework) { $messenger = $framework->messenger(); $messenger->serializer() ->defaultSerializer('messenger.transport.symfony_serializer') ->symfonySerializer() ->format('json') ->context('foo', 'bar'); $messenger->transport('async_priority_normal') ->dsn(...) ->serializer('messenger.transport.symfony_serializer'); };
messenger.transport.symfony_serializer
- це вбудований сервіс, який використовує
компонент Сериалізатор і може бути сконфігурований декількома способами.
Якщо ви оберете використовувати сериалізатор Symfony, ви зможете контролювати контекст для
кожного випадку окремо через SerializerStamp
(див. Конверти та марки).
Tip
При відправленні/отримаванні повідомлен в/з іншого транспорту, вам може знадобитися більше контролю над процесом сериалізації. Використання користувацького сериалізатору надає такий контроль. Див. `Туторіал по сериалізації повідомлень SymfonyCasts`_, щоб дізнатися більше.
Налаштування обробників¶
Конфігурація обробників вручну¶
Symfony зазвичай буде знаходити та реєструвати вашого обробника автоматично.
Але ви також можете сконфігурувати його вручну - і передати йому додаткову конфігурацію - тегувавши
сервіс обробника messenger.message_handler
- YAML
1 2 3 4 5 6 7 8 9 10 11
# config/services.yaml services: App\MessageHandler\SmsNotificationHandler: tags: [messenger.message_handler] # або сконфігурувати з опціями tags: - name: messenger.message_handler # небхідно лише якщо неможливо вгадати за підказкою handles: App\Message\SmsNotification
- XML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
<!-- config/services.xml --> <?xml version="1.0" encoding="UTF-8" ?> <container xmlns="http://symfony.com/schema/dic/services" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd"> <services> <service id="App\MessageHandler\SmsNotificationHandler"> <!-- handles небхідно лише якщо неможливо вгадати за підказкою --> <tag name="messenger.message_handler" handles="App\Message\SmsNotification"/> </service> </services> </container>
- PHP
1 2 3 4 5 6 7 8 9
// config/services.php use App\Message\SmsNotification; use App\MessageHandler\SmsNotificationHandler; $container->register(SmsNotificationHandler::class) ->addTag('messenger.message_handler', [ // небхідно лише якщо неможливо вгадати за підказкою 'handles' => SmsNotification::class, ]);
Можливі опції конфігурації з тегами:
bus
from_transport
handles
method
priority
Підписник та опції обробника¶
Клас обробника може обробляти багато повідомлень або конфігурувати сам себе, реалізуючи
MessageSubscriberInterface
:
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;
use App\Message\OtherSmsNotification;
use App\Message\SmsNotification;
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
class SmsNotificationHandler implements MessageSubscriberInterface
{
public function __invoke(SmsNotification $message)
{
// ...
}
public function handleOtherSmsNotification(OtherSmsNotification $message)
{
// ...
}
public static function getHandledMessages(): iterable
{
// обробити це повідомлення в __invoke
yield SmsNotification::class;
// також обробити це повідомлення в handleOtherSmsNotification
yield OtherSmsNotification::class => [
'method' => 'handleOtherSmsNotification',
//'priority' => 0,
//'bus' => 'messenger.bus.default',
];
}
}
Пов’язування обробників з різними транспортами¶
Кожне повідомлення може мати декілька обробників, і коли повідомлення споживається, викликаються всі його оброники. Але ви можете також сконфігурувати обробника так, щоб він викликався лише тоді, коли повідомлення отримане з конкретного транспорту. Це дозволяє вам мати одне повідомлення, де кожний обробник викликається різними “робітниками”, що споживають різний транспорт.
Уявіть, що у вас є повідомлення UploadedImage
з двома обробниками:
ThumbnailUploadedImageHandler
: ви хочете, щоб цей оброблялося транспортом під назвоюimage_transport
NotifyAboutNewUploadedImageHandler
: ви хочете, щоб цей оброблялося транспортом під назвоюasync_priority_normal
Щоб зробити це, додайте опцію from_transport
до кожного обробника. Наприклад:
// src/MessageHandler/ThumbnailUploadedImageHandler.php
namespace App\MessageHandler;
use App\Message\UploadedImage;
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
class ThumbnailUploadedImageHandler implements MessageSubscriberInterface
{
public function __invoke(UploadedImage $uploadedImage)
{
// створити мініатюри
}
public static function getHandledMessages(): iterable
{
yield UploadedImage::class => [
'from_transport' => 'image_transport',
];
}
}
І, схожим чином:
// src/MessageHandler/NotifyAboutNewUploadedImageHandler.php
// ...
class NotifyAboutNewUploadedImageHandler implements MessageSubscriberInterface
{
// ...
public static function getHandledMessages(): iterable
{
yield UploadedImage::class => [
'from_transport' => 'async_priority_normal',
];
}
}
Потім, переконайтеся, що “маршрутизуєте” ваше повідомлення до обох транспортів:
- YAML
1 2 3 4 5 6 7 8 9 10
# config/packages/messenger.yaml framework: messenger: transports: async_priority_normal: # ... image_transport: # ... routing: # ... 'App\Message\UploadedImage': [image_transport, async_priority_normal]
- XML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
<!-- config/packages/messenger.xml --> <?xml version="1.0" encoding="UTF-8" ?> <container xmlns="http://symfony.com/schema/dic/services" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:framework="http://symfony.com/schema/dic/symfony" xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd"> <framework:config> <framework:messenger> <framework:transport name="async_priority_normal" dsn="..."/> <framework:transport name="image_transport" dsn="..."/> <framework:routing message-class="App\Message\UploadedImage"> <framework:sender service="image_transport"/> <framework:sender service="async_priority_normal"/> </framework:routing> </framework:messenger> </framework:config> </container>
- PHP
1 2 3 4 5 6 7 8 9 10 11 12
// config/packages/messenger.php use Symfony\Config\FrameworkConfig; return static function (FrameworkConfig $framework) { $messenger = $framework->messenger(); $messenger->transport('async_priority_normal')->dsn(...); $messenger->transport('image_transport')->dsn(...); $messenger->routing('App\Message\UploadedImage') ->senders(['image_transport', 'async_priority_normal']); };
Ось і все! Тепер ви можете споживати кожний транспорт:
1 2 3 4 | # викличе ThumbnailUploadedImageHandler лише при обробці повідомлення
$ php bin/console messenger:consume image_transport -vv
$ php bin/console messenger:consume async_priority_normal -vv
|
Caution
Якщо обробник не має конфігурації from_transport
, він буде виконаний
в кожному транспорті, з якого буде отримано це повідомлення.
Розширення месенджера¶
Конверти та марки¶
Повідомлення може бути будь-яким PHP-об’єктом. Інколи вам може знадобитися сконфігурувати щось додаткове в повідомленні - на кшталт того, як воно має бути оброблене всередині AMQP або додавання затримки перед обробкою повідомлення. Ви можете зробити це, додавши марку (“stamp”) до вашого повідомлення:
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
public function index(MessageBusInterface $bus)
{
$bus->dispatch(new SmsNotification('...'), [
// почекати 5 секунд перед обробкою
new DelayStamp(5000),
]);
// ябо чітко створіть Конверт
$bus->dispatch(new Envelope(new SmsNotification('...'), [
new DelayStamp(5000),
]));
// ...
}
Внутрішньо, кожне повідомлення огортається в конверт (Envelope
), який містить
повідомлення та марки. Ви можете створити його вручну або дозволити автобусу повідомлень
зробити це. Існує багато різних марок для різних цілей і вони використовуються внутрішньо
для відслідковування інформації про повідомлення - на кшталт того, який автобус обробляє
його і чи має воно повторні спроби після невдачі.
Проміжкове ПЗ¶
Те, що відбувається після запуску повідомлення в автобус повідомлень, залежить від його набору проміжкового ПЗ та його порядку. За замовчуванням, проміжкове ПЗ, сконфігуроване для кожного атрибуту, виглядає так:
add_bus_name_stamp_middleware
- додає марку для запису того, в якому автобусі було запущено це повідомлення;dispatch_after_current_bus
- див. Transactional Messages: Handle New Messages After Handling is Done;failed_message_processing_middleware
- обробляє повідомлення, які мають повторні спроби через транспорт помилок, щоб вони правильно функціонували, ніби-то вони були отримані з початкового транспорту;- Ваша власна колекція middleware_;
send_message
- якщо маршрутизація сконфігурована для транспорту, відправляє повідомлення цьому трнаспорту та зупиняє ланцюжок проміжкового ПЗ;handle_message
- викликає обробника(ів) повідомлень для заданого повідомлення.
Note
Ці назви проміжкового ПЗ - насправді скорочення. Справжні id сервісів мають префікс
messenger.middleware.
(наприклад,``messenger.middleware.handle_message``).
Проміжкове ПЗ виконується після запуску повідомлення, а також ще раз, коли повідомлення отримане через робітника (для повідомлень, які були відправлені транспорту для асинхронної обробки). Пам’ятайте це, якщо ви створите власне проміжкове ПЗ.
Ви можете додати власне проміжкове ПЗ в список, або повністю відключити проміжкове ПЗ за замовчуванням і додати лише ваше власне:
- YAML
1 2 3 4 5 6 7 8 9 10 11 12 13
# config/packages/messenger.yaml framework: messenger: buses: messenger.bus.default: # відключити проміжкове ПЗ за замовчуванням default_middleware: false # та/або додати ваше власне middleware: # id сервісів, що реалізують Symfony\Component\Messenger\Middleware\MiddlewareInterface - 'App\Middleware\MyMiddleware' - 'App\Middleware\AnotherMiddleware'
- XML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
<!-- config/packages/messenger.xml --> <?xml version="1.0" encoding="UTF-8" ?> <container xmlns="http://symfony.com/schema/dic/services" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:framework="http://symfony.com/schema/dic/symfony" xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd"> <framework:config> <framework:messenger> <!-- default-middleware: відключити проміжкове ПЗ за замовчуванням --> <framework:bus name="messenger.bus.default" default-middleware="false"/> <!-- та/або додати ваше власне --> <framework:middleware id="App\Middleware\MyMiddleware"/> <framework:middleware id="App\Middleware\AnotherMiddleware"/> </framework:messenger> </framework:config> </container>
- PHP
1 2 3 4 5 6 7 8 9 10 11
// config/packages/messenger.php use Symfony\Config\FrameworkConfig; return static function (FrameworkConfig $framework) { $messenger = $framework->messenger(); $bus = $messenger->bus('messenger.bus.default') ->defaultMiddleware(false); $bus->middleware()->id('App\Middleware\MyMiddleware'); $bus->middleware()->id('App\Middleware\AnotherMiddleware'); };
Note
Якщо сервіс проміжкового ПЗ абстрактний, буде створено другий екземпляр сервісу для кожного автобусу.
Проміжкове ПЗ для Doctrine¶
New in version 1.11: Наступне проміжкове ПЗ для Doctrine було представлено в DoctrineBundle 1.11.
Якщо ви у своєму додатку використовуєте Doctrine, існує ряд необов’язкового проміжкового ПЗ, яке ви можете захотіти використати:
- YAML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
# config/packages/messenger.yaml framework: messenger: buses: command_bus: middleware: # кожний раз при обробці повідомлення, з'єднання Doctrine # "пінгується" і повторно підключається, якщо воно закрите. Корисно, # якщо ваші робітники працюють довгий час і з'єднання бази даних # інколи втрачається - doctrine_ping_connection # Після обробки, з'єднання Doctrine закривається, що може # звільнити з'єднання бази даних в робітнику, замість того, # щоб утримувати їх відкритими завжди - doctrine_close_connection # огортає всіх обробників в одну транзакцію Doctrine # обробникам не потрібно викликати flush(), а помилка в будь-якому # обробнику викличе відкат - doctrine_transaction # або передати інший менеджер сутностей будь-якому #- doctrine_transaction: ['custom']
- XML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
<!-- config/packages/messenger.xml --> <?xml version="1.0" encoding="UTF-8" ?> <container xmlns="http://symfony.com/schema/dic/services" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:framework="http://symfony.com/schema/dic/symfony" xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd"> <framework:config> <framework:messenger> <framework:bus name="command_bus"> <framework:middleware id="doctrine_transaction"/> <framework:middleware id="doctrine_ping_connection"/> <framework:middleware id="doctrine_close_connection"/> <!-- або передати інший менеджер сутностей будь-якому --> <!-- <framework:middleware id="doctrine_transaction"> <framework:argument>custom</framework:argument> </framework:middleware> --> </framework:bus> </framework:messenger> </framework:config> </container>
- PHP
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// config/packages/messenger.php use Symfony\Config\FrameworkConfig; return static function (FrameworkConfig $framework) { $messenger = $framework->messenger(); $bus = $messenger->bus('command_bus'); $bus->middleware()->id('doctrine_transaction'); $bus->middleware()->id('doctrine_ping_connection'); $bus->middleware()->id('doctrine_close_connection'); // Використання іншого менеджеру сутностей $bus->middleware()->id('doctrine_transaction') ->arguments(['custom']); };
Інше проміжкове ПЗ¶
New in version 5.3: Проміжкове ПЗ router_context
було представлено в Symfony 5.3.
Додайте проміжкове ПЗ router_context
, якщо вам потрібно генерувати абсолютні
URL у споживачі (наприклад, відображати шаблон з посиланнями). Це проміжкове ПЗ
зберігає контекст початкового запиту (тобто хостинг, HTTP-порт і т.д.), що необхідно
при створенні абсолютних URL.
- YAML
1 2 3 4 5 6 7
# config/packages/messenger.yaml framework: messenger: buses: command_bus: middleware: - router_context
- XML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
<!-- config/packages/messenger.xml --> <?xml version="1.0" encoding="UTF-8" ?> <container xmlns="http://symfony.com/schema/dic/services" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:framework="http://symfony.com/schema/dic/symfony" xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd"> <framework:config> <framework:messenger> <framework:bus name="command_bus"> <framework:middleware id="router_context"/> </framework:bus> </framework:messenger> </framework:config> </container>
- PHP
1 2 3 4 5 6 7 8 9
// config/packages/messenger.php use Symfony\Config\FrameworkConfig; return static function (FrameworkConfig $framework) { $messenger = $framework->messenger(); $bus = $messenger->bus('command_bus'); $bus->middleware()->id('router_context'); };
Події Месенджера¶
На додаток до проміжкового ПЗ, Месенджер також запускає декілька подій. Ви можете створити слухача подій, щоб підключатися до різних частин процесу. Для кожного, клас подій буде назвою події:
Декілька автобусів, автобусів команд та подій¶
Месенджер надає вам один сервіс автобусу повідомлень за замовчуванням. Але ви можете сконфігурувати стільки, скільки ви хочете, створивши автобуси “команд”, “запитів” або “подій”, та контролюючи їх проміжкове ПЗ. Див. Multiple Buses.
Дізнайтеся більше¶
- How to Create Your own Messenger Transport
- Как создать ваш собственный транспорт сообщений
- Transactional Messages: Handle New Messages After Handling is Done
- Транзакционные сообщение: обрабатывайте сообщения после того, как обработка закончена
- Getting Results from your Handler
- Получение результатов из вашего обработчика
- Multiple Buses
- Несколько автобусов
Эта документация является переводом официальной документации Symfony и предоставляется по свободной лицензии CC BY-SA 3.0.