Дата обновления перевода 2021-09-28

Как создать ваш собственный транспорт сообщений

Когда вы написали ваш отправитель и получатель транспорта, вы можете зарегистрировать свою фабрику транспорта, чтобы использовать ее через DSN в приложении Symfony.

Создайте вашу фабрику транспорта

Вам нужно предоставить FrameworkBundle возможность создавать ваш транспорт из DSN. Вам понадобится фабрика транспорта:

use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;

class YourTransportFactory implements TransportFactoryInterface
{
    public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
    {
        return new YourTransport(/* ... */);
    }

    public function supports(string $dsn, array $options): bool
    {
        return 0 === strpos($dsn, 'my-transport://');
    }
}

Объект транспорта должен реализовать TransportInterface (который сочетает SenderInterface и ReceiverInterface). Вот упрощенный пример транспорта БД:

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Symfony\Component\Uid\Uuid;

class YourTransport implements TransportInterface
{
    private $db;
    private $serializer;

    /**
     * @param FakeDatabase $db is used for demo purposes. It is not a real class.
     */
    public function __construct(FakeDatabase $db, SerializerInterface $serializer = null)
    {
        $this->db = $db;
        $this->serializer = $serializer ?? new PhpSerializer();
    }

    public function get(): iterable
    {
        // Получить сообщение из "my_queue"
        $row = $this->db->createQuery(
                'SELECT *
                FROM my_queue
                WHERE (delivered_at IS NULL OR delivered_at < :redeliver_timeout)
                AND handled = FALSE'
            )
            ->setParameter('redeliver_timeout', new DateTimeImmutable('-5minutes'))
            ->getOneOrNullResult();

        if (null === $row) {
            return [];
        }

        $envelope = $this->serializer->decode([
            'body' => $row['envelope'],
        ]);

        return [$envelope->with(new TransportMessageIdStamp($row['id']))];
    }

    public function ack(Envelope $envelope): void
    {
        $stamp = $envelope->last(TransportMessageIdStamp::class);
        if (!$stamp instanceof TransportMessageIdStamp) {
            throw new \LogicException('No TransportMessageIdStamp found on the Envelope.');
        }

        // Пометить сообщене, как "handled"
        $this->db->createQuery('UPDATE my_queue SET handled = TRUE WHERE id = :id')
            ->setParameter('id', $stamp->getId())
            ->execute();
    }

    public function reject(Envelope $envelope): void
    {
        $stamp = $envelope->last(TransportMessageIdStamp::class);
        if (!$stamp instanceof TransportMessageIdStamp) {
            throw new \LogicException('No TransportMessageIdStamp found on the Envelope.');
        }

        // Удалить сообщение из таблицы "my_queue"
        $this->db->createQuery('DELETE FROM my_queue WHERE id = :id')
            ->setParameter('id', $stamp->getId())
            ->execute();
    }

    public function send(Envelope $envelope): Envelope
    {
        $encodedMessage = $this->serializer->encode($envelope);
        $uuid = (string) Uuid::v4();
        // Добавить сообщение к таблице "my_queue"
        $this->db->createQuery(
                'INSERT INTO my_queue (id, envelope, delivered_at, handled)
                VALUES (:id, :envelope, NULL, FALSE)'
            )
            ->setParameters([
                'id' => $uuid,
                'envelope' => $encodedMessage['body'],
            ])
            ->execute();

        return $envelope->with(new TransportMessageIdStamp($uuid));
    }
}

Реализация выше - не выполняемый код, но он иллюстрирует, как может быть реализован TransportInterface. Чтобы увидеть настоящие реализации, см. InMemoryTransport и DoctrineReceiver.

Зарегистрируйте вашу фабрику

  • YAML
    1
    2
    3
    4
    # config/services.yaml
    services:
        Your\Transport\YourTransportFactory:
            tags: [messenger.transport_factory]
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    <!-- config/services.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd">
    
        <services>
            <service id="Your\Transport\YourTransportFactory">
               <tag name="messenger.transport_factory"/>
            </service>
        </services>
    </container>
    
  • PHP
    1
    2
    3
    4
    5
    // config/services.php
    use Your\Transport\YourTransportFactory;
    
    $container->register(YourTransportFactory::class)
        ->setTags(['messenger.transport_factory']);
    

Используйте ваш транспорт

В конфигурации framework.messenger.transports.*, создайте ваш названный транспорт, используя собственную DSN:

  • YAML
    1
    2
    3
    4
    5
    # config/packages/messenger.yaml
    framework:
        messenger:
            transports:
                yours: 'my-transport://...'
    
  • XML
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    <!-- config/packages/messenger.xml -->
    <?xml version="1.0" encoding="UTF-8" ?>
    <container xmlns="http://symfony.com/schema/dic/services"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:framework="http://symfony.com/schema/dic/symfony"
        xsi:schemaLocation="http://symfony.com/schema/dic/services
            https://symfony.com/schema/dic/services/services-1.0.xsd
            http://symfony.com/schema/dic/symfony
            https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
    
        <framework:config>
            <framework:messenger>
                <framework:transport name="yours" dsn="my-transport://..."/>
            </framework:messenger>
        </framework:config>
    </container>
    
  • PHP
    1
    2
    3
    4
    5
    6
    7
    8
    9
    // config/packages/messenger.php
    use Symfony\Config\FrameworkConfig;
    
    return static function (FrameworkConfig $framework) {
        $framework->messenger()
            ->transport('yours')
                ->dsn('my-transport://...')
        ;
    };
    

Кроме того, что вы сможете маршрутизировать ваши сообщения отправителю yours, это даст вам доступ к следующим сервисам:

  1. messenger.sender.yours: отправитель;
  2. messenger.receiver.yours: получатель.

Эта документация является переводом официальной документации Symfony и предоставляется по свободной лицензии CC BY-SA 3.0.