Интеграция с менеджером очередей

0

Разработчик: InterMotion

Установлено: Менее 50

Обновлено: 06.05.2025

Адаптирован под мобильные устройства

Совместим с редакциями Битрикс: «Первый сайт» / «Старт» / «Стандарт» / «Малый бизнес» / «Бизнес» /

Обеспечивает интеграцию Битрикс с брокером сообщений RabbitMQ (AMQP) либо, при его отсутствии, реализует менеджер на базе используемой продуктом СуБД.

Разработан на основе пакета yii3 queue

Модуль предназначен для разработчиков, никаких компонентов/шаблонов для публичной части не предполагается.

Детали использования см. на вкладке "Установка"

Решение устанавливается на любые редакции БУС:

  1. Установите модуль штатным образом. Запись сообщений
  2. По умолчанию очередь обслуживается используемой продуктом СуБД (в процессе установки модуль создает таблицу в базе данных, в которую пишутся сообщения и обрабатываются в дальнейшем)

    Для использования на базе брокера сообщений RabbitMQ необходимо внести изменение в конфигурацию приложения (файл .settings.php), добавив новое соединение в секцию `connections`.
        "rabbitmq" =>
            [
                "className" => "\\InterMotion\\Queue\\Expansion\\Bitrix\\Main\\Data\\RabbitMQConnection",
                "host"      => "{rabbitmq_host}",
                "port"      => {rabbitmq_port},
                "username"  => "{rabbitmq_username}",
                "password"  => "{rabbitmq_password}",
            ],

  3. Обработка сообщений

    Предпочтительным является обработка сообщений, используя процесс-менеджер `supervisor`, альтернативным вариантом является использование менеджера фоновых задач - `crontab`.
Установка супервизора
$ sudo apt update && sudo apt install supervisor -y
$ sudo systemctl enable --now supervisor
$ sudo nano /etc/supervisor/conf.d/bitrix-worker.conf
Базовая конфигурация супервизора:
[program:bitrix-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /var/www/site.tld/bitrix/modules/intermotion.queue/tools/console.php queue:listen
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
numprocs=5
redirect_stderr=true
stdout_logfile=/var/log/bitrix-worker.log

Параметр `numproc` отвечает за количество активных единомоментно процессов (воркеров) параллельно обрабатывающих очередь.

применяем конфигурацию:
$ sudo supervisorctl reread
$ sudo supervisorctl update
# запускаем воркер
$ sudo supervisorctl start bitrix-worker
для проверки статуса воркера:
$ sudo supervisorctl status
Установка на кронтаб
* * * * * /usr/bin/flock -w 1 php /var/www/site.tld/bitrix/modules/intermotion.queue/tools/console.php queue:run
`flock` необходим для того, чтобы в один момент времени был запущен ТОЛЬКО один процесс.

Использование штатных событий

Модуль реализует отложенное выполнения ряда штатных событий для элементов, разделов инфоблоков, товаров, их остатков и цен.

use Bitrix\Main;

// элементы инфоблоков
iblock:OnAfterIBlockElement<Add,Update,Delete>(array $fields)
    -> intermotion.queue:onAfterIblockElement<Add,Update,Delete>(new Main\Event $e {
        parameters: [elementId: {id}, iblockId: {iblockId}]
    })
    -> intermotion.queue:onAfterIblockElementAction(Main\Event $e {
        parameters: [elementId: {id}, iblockId: {iblockId}, action: {add,update,delete}]
    })

// разделы инфоблоков
iblock:OnAfterIBlockSection<Add,Update,Delete>(array $fields)
    -> intermotion.queue:onAfterIblockSection<Add,Update,Delete>(Main\Event $e {
        parameters: [elementId: {id}, iblockId: {iblockId}]
    })
    -> intermotion.queue:onAfterIblockSectionAction(new Main\Event $e {
        parameters: [elementId: {id}, iblockId: {iblockId}, action: {add,update,delete}]
    })

// товары
catalog:Bitrix\Catalog\Model\Product::OnAfter<Add,Update,Delete>(Main\Event $e)
    -> intermotion.queue:onAfterCatalogProduct<Add,Update,Delete>(Main\Event $e {
        parameters: [productId: {id}]
    })
    -> intermotion.queue:onAfterCatalogProductAction(new Main\Event $e {
        parameters: [productId: {id}, action: {add,update,delete}]
    })

// цены
catalog:Bitrix\Catalog\Model\Price::OnAfter<Add,Update,Delete>(Main\Event $e)
    -> intermotion.queue:onAfterCatalogProductPrice<Add,Update,Delete>(Main\Event $e {
        parameters: [productPriceId: {id}]
    })
    -> intermotion.queue:onAfterCatalogProductPriceAction(new Main\Event $e {
        parameters: [productPriceId: {id}, action: {add,update,delete}]
    })

// остатки
catalog:OnStoreProduct<Add,Update,Delete>(int $id)
    -> intermotion.queue:onAfterCatalogStorePrice<Add,Update,Delete>(Main\Event $e {
        parameters: [productStoreId: {id}]
    })
    -> intermotion.queue:onAfterCatalogProductStoreAction(new Main\Event $e {
        parameters: [productStoreId: {id}, action: {add,update,delete}]
    })

Например, при создании элемента инфоблока необходимо выполнить сложные, ресурсозатратные операции. Если подписываться на стандартные события модуля "информационные блоки", то время, которое необходимо на просчет этих операций, будет "затормаживать" систему администрирования и прочие api-вызовы, подразумевающие сохранение элементов.

Это идеальный случай для использования "отложенных" выполнений событий.

use Bitrix\Main;
use Bitrix\Iblock;
use InterMotion\Queue;

// вместо
Main\EventManager::getInstance()->addEventHandler(
    "iblock",
    "OnAfterIBlockElementAdd",
    function(array $fields): void
    {
        try
        {
            if (!isset($fields["RESULT"]) || !$fields["RESULT"])
            {
                throw new Main\NotSupportedException("Element not really saved");
            }

            $elementId = (int) $fields["ID"];
            $iblockId = (int) $fields["IBLOCK_ID"];

            // сложные вычисления.
        }
        catch (\Throwable $exception)
        {
        }
    }
);

// используем
Main\EventManager::getInstance()->addEventHandler(
    Queue\Config::getModuleName(),
    "OnAfterIBlockElementAdd",
    function(Main\Event $event): Main\EventResult
    {
        try
        {
            // обязательно необходимо проверить что элемент, соответсвующий идентификатору существует
            $elementId = $event->getParameter("elementId");
            $iblockId = $event->getParameter("iblockId");

            // сложные вычисления.
        }
        catch (\Throwable $exception)
        {
        }
        
        return (new Main\EventResult(Main\EventResult::SUCCESS);
    }
);

ВАЖНО: все обработчики событий имеют равный приоритет выполнения и попадают в один общий канал (`channel`). Поэтому, следует не допускать перекоса по времени выполнения обработчиков. Например, есть 10 типов обработчиков, 9 из которых выполняются по 5-10 секунд, а 10-ый требует для выполнения одну минуту. Таким образом, в перспективе, 9 обработчиков будут ждать когда выполнится 10-ый. При условии, что событий очень много, может возникнуть ситуация, когда все время выполняются только 10-ые обработчики, а до первых 9 очередь просто не дойдет.

Решить данную проблему можно либо,

  • увеличением количества процессов (см `supervisor`, параметр `numprocs`;
  • созданием своих собственных событий, которые будут отправлять сообщения в отдельный канал и в последующем выделять отдельные воркеры на обработку только этого канала
Создание собственных заданий

Если предоставляемых модулем событий недостаточно для реализации необходимого функционала, есть возможность создавать свои собственные.

Для этого необходимо:

  • создать задачу (унаследоваться от абстрактного класса `InterMotion\Queue\Job\Base`)
  • зарегистрировать задание на событии `registerJob`

Рабочий пример задания - `lib/job/example.php`

namespace InterMotion\Queue\Job;

use Psr;
use YiiSoft\Queue as YiiQueue;


// описание класса задания
class Example extends Base
{
    /**
     * @return string
     */
    public function getChannel(): string
    {
        // в какой канал публикуем сообщение
        return "default";
    }

    /**
     * @return string
     */
    public function getJobId(): string
    {
        // внутренний идентификатор задания, для того, чтобы при чтении сообщений из канала можно было идентифицировать обработчик
        // УНИКАЛЕН для каждого типа задания !!
        return "intermotion-queue-job-example";
    }

    /**
     * @param YiiQueue\Message\MessageInterface $message
     * @return void
     * @throws \Exception
     */
    public function execute(YiiQueue\Message\MessageInterface $message): void
    {
        $logger = $this->getLogger();

        try
        {
            $logger->info("job started", [
                "data" => $message->getData(),
                "metadata" => $message->getMetadata(),
                "handlerName" => $message->getHandlerName()
            ]);

            // do nothing
            // выполнение самой задачи

            $logger->info("job complete");

        }
        catch (\Exception $exception)
        {
            $logger->error(
                $exception->getMessage(),
                [
                    "trace" => $exception->getTrace()
                ]
            );

            throw $exception;
        }
    }
}

// регистрируем задание
Main\EventManager::getInstance()->addEventHandler(
    Queue\Config::getModuleName(),
    "registerJob",
    function(Main\Event $event): Main\EventResult {
        return (new Main\EventResult(Main\EventResult::SUCCESS, [
            "JOBS" => [
                // класс задания, можно зарегистрировать сразу несколько
                [ "CLASS" => Queue\Job\Example::class ],
            ]
        ]));
    }
);

Консольные команды

Решение поддерживает консольные команды `Symfony Console`:

1. Обработка всех существующих сообщений в определенных каналах. Завершается, когда заканчиваются сообщения.

Используется для разработки / отладки / запуске на менеджере запуска заданий `crontab`

php bitrix/modules/intermotion.queue/tools/console.php queue:run [channel1 [channel2 [...]]] --maximum 100

Аргументы и опции:

  • channel - перечень каналов, сообщения из которых необходимо извлечь. По умолчанию - из всех каналов
  • maximum - максимальное количество для обработки В КАЖДОМ канале. По умолчанию 0 - все сообщений
2. Процесс прослушивания сообщений в определенных каналах и их выполнение когда они добавляются. Необходимо останавливать процесс "вручную"

Используется для процесс-менеджера `supervisor`

php bitrix/modules/intermotion.queue/tools/console.php queue:listen [channel1 [channel2 [...]]]

Аргументы:

  • channel - перечень каналов, сообщения в которых необходимо слушать. По умолчанию - из всех каналов
Логгирование

По умолчанию создается лог-файл `queue.log` в корне модуля.