Дата обновления перевода 2022-01-19
Как создать ваш собственный транспорт сообщений¶
Когда вы написали ваш отправитель и получатель транспорта, вы можете зарегистрировать свою фабрику транспорта, чтобы использовать ее через DSN в приложении Symfony.
Создайте вашу фабрику транспорта¶
Вам нужно предоставить FrameworkBundle возможность создавать ваш транспорт из DSN. Вам понадобится фабрика транспорта:
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
class YourTransportFactory implements TransportFactoryInterface
{
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
{
return new YourTransport(/* ... */);
}
public function supports(string $dsn, array $options): bool
{
return 0 === strpos($dsn, 'my-transport://');
}
}
Объект транспорта должен реализовать
TransportInterface
(который сочетает SenderInterface
и ReceiverInterface
).
Вот упрощенный пример транспорта БД:
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Symfony\Component\Uid\Uuid;
class YourTransport implements TransportInterface
{
private $db;
private $serializer;
/**
* @param FakeDatabase $db is used for demo purposes. It is not a real class.
*/
public function __construct(FakeDatabase $db, SerializerInterface $serializer = null)
{
$this->db = $db;
$this->serializer = $serializer ?? new PhpSerializer();
}
public function get(): iterable
{
// Получить сообщение из "my_queue"
$row = $this->db->createQuery(
'SELECT *
FROM my_queue
WHERE (delivered_at IS NULL OR delivered_at < :redeliver_timeout)
AND handled = FALSE'
)
->setParameter('redeliver_timeout', new DateTimeImmutable('-5minutes'))
->getOneOrNullResult();
if (null === $row) {
return [];
}
$envelope = $this->serializer->decode([
'body' => $row['envelope'],
]);
return [$envelope->with(new TransportMessageIdStamp($row['id']))];
}
public function ack(Envelope $envelope): void
{
$stamp = $envelope->last(TransportMessageIdStamp::class);
if (!$stamp instanceof TransportMessageIdStamp) {
throw new \LogicException('No TransportMessageIdStamp found on the Envelope.');
}
// Пометить сообщене, как "handled"
$this->db->createQuery('UPDATE my_queue SET handled = TRUE WHERE id = :id')
->setParameter('id', $stamp->getId())
->execute();
}
public function reject(Envelope $envelope): void
{
$stamp = $envelope->last(TransportMessageIdStamp::class);
if (!$stamp instanceof TransportMessageIdStamp) {
throw new \LogicException('No TransportMessageIdStamp found on the Envelope.');
}
// Удалить сообщение из таблицы "my_queue"
$this->db->createQuery('DELETE FROM my_queue WHERE id = :id')
->setParameter('id', $stamp->getId())
->execute();
}
public function send(Envelope $envelope): Envelope
{
$encodedMessage = $this->serializer->encode($envelope);
$uuid = (string) Uuid::v4();
// Добавить сообщение к таблице "my_queue"
$this->db->createQuery(
'INSERT INTO my_queue (id, envelope, delivered_at, handled)
VALUES (:id, :envelope, NULL, FALSE)'
)
->setParameters([
'id' => $uuid,
'envelope' => $encodedMessage['body'],
])
->execute();
return $envelope->with(new TransportMessageIdStamp($uuid));
}
}
Реализация выше - не выполняемый код, но он иллюстрирует, как может быть реализован
TransportInterface
. Чтобы увидеть
настоящие реализации, см. InMemoryTransport
и DoctrineReceiver
.
Зарегистрируйте вашу фабрику¶
- YAML
1 2 3 4
# config/services.yaml services: Your\Transport\YourTransportFactory: tags: [messenger.transport_factory]
- XML
1 2 3 4 5 6 7 8 9 10 11 12 13
<!-- config/services.xml --> <?xml version="1.0" encoding="UTF-8" ?> <container xmlns="http://symfony.com/schema/dic/services" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd"> <services> <service id="Your\Transport\YourTransportFactory"> <tag name="messenger.transport_factory"/> </service> </services> </container>
- PHP
1 2 3 4 5
// config/services.php use Your\Transport\YourTransportFactory; $container->register(YourTransportFactory::class) ->setTags(['messenger.transport_factory']);
Используйте ваш транспорт¶
В конфигурации framework.messenger.transports.*
, создайте ваш названный
транспорт, используя собственную DSN:
- YAML
1 2 3 4 5
# config/packages/messenger.yaml framework: messenger: transports: yours: 'my-transport://...'
- XML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
<!-- config/packages/messenger.xml --> <?xml version="1.0" encoding="UTF-8" ?> <container xmlns="http://symfony.com/schema/dic/services" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:framework="http://symfony.com/schema/dic/symfony" xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd"> <framework:config> <framework:messenger> <framework:transport name="yours" dsn="my-transport://..."/> </framework:messenger> </framework:config> </container>
- PHP
1 2 3 4 5 6 7 8 9
// config/packages/messenger.php use Symfony\Config\FrameworkConfig; return static function (FrameworkConfig $framework) { $framework->messenger() ->transport('yours') ->dsn('my-transport://...') ; };
Кроме того, что вы сможете маршрутизировать ваши сообщения отправителю yours
,
это даст вам доступ к следующим сервисам:
messenger.sender.yours
: отправитель;messenger.receiver.yours
: получатель.
Эта документация является переводом официальной документации Symfony и предоставляется по свободной лицензии CC BY-SA 3.0.