PHP Fibers: практический пример

Моя цель - предложение широкого ассортимента товаров и услуг на постоянно высоком качестве обслуживания по самым выгодным ценам.

В PHP 8.1 Fibers появился как новая функция языка. Я часто вижу, что многие люди называют fibers (волокна) threads (потоками), что неверно и, я думаю, дает некоторым представление о том, что PHP теперь каким-то образом стал многопоточным. Возможно люди имеют неправильное представление о том, что такое fibers (и, возможно, что такое асинхронное программирование), и я надеюсь, что смогу помочь исправить ситуацию, предоставив практический пример того, как их можно использовать.

Что такое Fibers?

Хотя fibers не являются threads, они могут помочь сделать ваш код более эффективным при выполнении нескольких задач быстрее, что на первый взгляд может показаться многопоточностью (думаю, отсюда и путаница). Fibers позволяют вашему коду тратить меньше времени на ожидание внешних ресурсов, позволяя вам параллельно инициировать несколько запросов, а затем ждать их завершения (в любом порядке).

Чтобы воспользоваться преимуществами fibers, ваша ситуация должна соответствовать нескольким условиям:

  1. Вы имеете дело с чем-то внешним по отношению к PHP.

  2. Ваш ресурс должен быть внешним по отношению к PHP, чтобы его можно было обрабатывать параллельно с вашим кодом PHP. PHP по-прежнему является однопоточным, поэтому пока выполняется конкретный fiber, больше ничего в вашем скрипте выполняться не будет. Это означает, что фактическую работу необходимо перенести в отдельный процесс. Обычными сценариями, отвечающими этому требованию, являются сетевые запросы или подпроцессы.

Вам необходимо иметь возможность запрашивать внешний ресурс таким образом, чтобы не блокировать выполнение вашего сценария.

Например, сложный запрос к базе данных не соответствует этому требованию, поскольку нет возможности продолжить работу сценария во время выполнения запроса и собрать результаты позже (используя стандартные расширения php).

Пример

В этом примере мы рассмотрим сценарий, который перебирает каталог видеофайлов и создает 30-секундный клип для каждого файла с помощью FFMpeg. Сначала мы посмотрим, как типичный синхронный код справится с поставленной задачей, а затем постепенно перенесем этот код в асинхронный формат с помощью fibers.

Синхронный путь

Перебираем каталог и для каждого файла и выполняем exec процесса ffmpeg для преобразования.

<?php

$start = microtime(true);
foreach (new DirectoryIterator('.') as $item){
    if ($item->getExtension() === 'mkv'){
        $source = $item->getPathname();
        $destination = getTempDestination();
        $cmd = sprintf('%s -threads 1 -i %s -t 30 -crf 26 -c:v h264 -c:a ac3 %s', $ffmpeg, $source, $destination);
        exec($cmd, $output, $ret);
        if ($ret !== 0){
            throw new \RuntimeException('Failed to create clip.');
        }

        echo 'Successfully created clip from ' . $source . ' => ' . $destination . PHP_EOL;
    }
}
$end = microtime(true);
echo 'Directory processed in ' . round($end - $start, 1) . ' seconds' . PHP_EOL;

Запустив это в папке с 19 эпизодами сериала «Scrubs», я могу обработать все видео за 243,1 секунды. Это занимает некоторое время, потому что в конечном итоге я использую только около 50% общей мощности моего процессора (благодаря -threads 1, добавленному для демонстрационных целей).

Я мог бы ускорить этот процесс, если бы мог запускать два или три экземпляра параллельно. Для того, чтобы код сделал это, потребуется сначала перейти на неблокирующий способ запуска ffmpeg, а затем реализовать fiber, используя этот новый неблокирующий exec в качестве исполняемой функции fiber.

Создание неблокирующего exec

Чтобы правильно использовать fibers, нам нужно начать с неблокирующего exec. Это код, который запускает наш процесс ffmpeg, но продолжает работать, пока процесс ffmpeg выполняет свою работу, а не ждет результата. Мы можем добиться этого, заменив простой вызов функции exec на proc_open. Эта функция позволяют процессу запуститься, пока наш код продолжает работать. Затем в цикле, опрашивая статус другого процесса, чтобы узнать, завершен он или нет, и вернется только тогда, когда ffmpeg выполнит свою задачу.

<?php

$start = microtime(true);
foreach (new DirectoryIterator('.') as $item){
    if ($item->getExtension() === 'mkv'){
        $source = $item->getPathname();
        $destination = getTempDestination();
        createVideoClip($ffmpeg, $source, $destination);

        echo 'Successfully created clip from ' . $source . ' => ' . $destination . PHP_EOL;
    }
}
$end = microtime(true);
echo 'Directory processed in ' . round($end - $start, 1) . ' seconds' . PHP_EOL;

function createVideoClip(string $ffmpeg, string $source, string $destination) : array{
    $cmd = sprintf('%s -threads 1 -i %s -t 30 -crf 26 -c:v h264 -c:a ac3 %s', $ffmpeg, $source, $destination);

    $stdout = fopen('php://temporary', 'w+');
    $stderr = fopen('php://temporary', 'w+');
    $streams = [
        0 => ['pipe', 'r']
        , 1 => $stdout
        , 2 => $stderr
    ];

    $proc = proc_open($cmd, $streams, $pipes);
    if (!$proc){
        throw new RuntimeException('Unable to launch download process');
    }

    do {
        usleep(1000); //Wait 1ms before checking
        $status = proc_get_status($proc);
    } while ($status['running']);

    proc_close($proc);
    fclose($stdout);
    fclose($stderr);
    $success = $status['exitcode'] === 0;
    if ($success){
        return [$source, $destination];
    } else {
        throw new \RuntimeException('Unable to perform conversion');
    }
}

С точки зрения кода итерации каталога эта функция блокируется так же, как и exec. Разница в том, что теперь мы сами выполняем блокировку, используя цикл опроса, который мы сможем изменить позже при реализации fibers.

Представляем Fiber

Теперь, когда у нас есть неблокирующая функция создания клипов, мы можем изменить итерацию каталога, чтобы создать новый fiber для каждого файла, который мы хотим преобразовать, используя эту функцию в качестве вызываемой.

<?php

$fiberList = [];
$start = microtime(true);

foreach (new DirectoryIterator('.') as $item){
    if ($item->getExtension() === 'mkv'){
        $fiber = new Fiber(createVideoClip(...));
        $fiber->start($ffmpeg, $item->getPathname(), getTempDestination());
        [$source, $destination] = $fiber->getReturn();
        echo 'Successfully created clip from ' . $source . ' => ' . $destination . PHP_EOL;
    }
}

Итак, теперь мы используем fibers, верно? Код должен быть быстрее, верно? Нет еще нет. Конечно, мы «используем» Fiber для запуска нашей функции создания видеоклипов, но этот код по-прежнему на 100% синхронен и будет обрабатывать только одно видео за раз, так же медленно, как и старый код exec.

Делаем это асинхронно

Чтобы воспользоваться преимуществами fibers, нам нужно сделать код асинхронным и добавить Fiber::suspend в нашу функцию createVideoClip. Это секретный соус, который позволяет PHP продолжать работу по созданию клипов для других файлов, пока ffmpeg обрабатывает текущий файл. В нашей функции подходящим местом для размещения этого вызова приостановки является цикл опроса, заменяющий существующий вызов usleep.

do {
    Fiber::suspend();
    $status = proc_get_status($proc);
} while ($status['running']);

Теперь, когда fiber будет приостановлен (suspend), нам также необходимо возобновить его в какой-то момент, чтобы фактически получить результат создания видеоклипа, иначе оно просто никогда не завершится. Для этого нам нужно собрать все fibers, которые мы создаем при переборе каталога

<?php

$fiberList=[];
foreach (new DirectoryIterator('.') as $item){
    if ($item->getExtension() === 'mkv'){
        $fiber = new Fiber(createVideoClip(...));
        $fiber->start($ffmpeg, $item->getPathname(), getTempDestination());
        $fiberList[] = $fiber;
    }
}

затем пройдитесь по fibers, возобновляя каждое из них по очереди, пока все они не закончатся и мы не сможем получить результат.

<?php

while ($fiberList){
    foreach ($fiberList as $idx => $fiber){
        if ($fiber->isTerminated()){
            [$source, $destination] = $fiber->getReturn();
            echo 'Successfully created clip from ' . $source . ' => ' . $destination . PHP_EOL;
            unset($fiberList[$idx]);
        } else {
            $fiber->resume();
        }
    }
}

Теперь наш код наконец-то стал асинхронным, и все задания по созданию видеоклипов будут выполняться параллельно. Если вы проследите за своими системными процессами после запуска сценария, вы обнаружите, что для каждого видеофайла немедленно запускается отдельный процесс ffmpeg, и все они начинают работать одновременно. Для моей папки Scrubs это 19 процессов ffmpeg, запущенных одновременно, а общее время выполнения сократилось до 173 секунд.

Слишком много параллелизма!

Иметь 19 одновременных процессов конвертации видео ffmpeg — это слишком много для моего 8-ядерного процессора. Конечно, это быстрее, но теперь мы теряем время, пытаясь бежать слишком много. ЦП будет тратить много времени на переключение контекста между процессами, а не на создание видеоклипов. Вероятно, все было бы еще лучше, если бы мы могли выполнять только несколько преобразований одновременно, а не все.

Добавить ограничение параллелизма возможно, но перед этим нам нужно немного обобщить код, чтобы сделать его более понятным и пригодным для повторного использования. Имеющийся у нас цикл, обрабатывающий fibers до их завершения, необходимо обобщить в небольшую функцию, которую мы можем вызвать и которая будет ждать завершения fibers.

<?php

/**
 * @param Fiber[] $fiberList
 * @param int|null $completionCount
 *
 * @return Fiber[]
 */
function waitForFibers(array &$fiberList, ?int $completionCount = null) : array{
    $completedFibers = [];
    $completionCount ??= count($fiberList);
    while (count($fiberList) && count($completedFibers) < $completionCount){
        usleep(1000);
        foreach ($fiberList as $idx => $fiber){
            if ($fiber->isSuspended()){
                $fiber->resume();
            } else if ($fiber->isTerminated()){
                $completedFibers[] = $fiber;
                unset($fiberList[$idx]);
            }
        }
    }

    return $completedFibers;
}

Вот простая функция, которая принимает список fibers по ссылке и, при необходимости, подсчитывает, сколько fibers следует ожидать. Любое из завершившихся fibers будет удалено из списка и возвращено из функции для обработки вызывающим кодом. По умолчанию функция будет ждать завершения всех fibers в списке, однако добавление дополнительного счетчика позволит ей вернуться после завершения как минимум такого количества fibers (может быть больше, чем запрошено).

Создав эту функцию ожидания, мы можем обновить наш код, добавив ограничение параллелизма. Таким образом, мы можем тратить меньше времени на переключение контекста и больше времени на создание клипов, что, надеемся, еще больше ускорит процесс.

<?php

$concurrency = 3;
$fiberList = [];
foreach (new DirectoryIterator('.') as $item){
    if ($item->getExtension() === 'mkv'){
        $fiber = new Fiber(createVideoClip(...));
        $fiber->start($ffmpeg, $item->getPathname(), getTempDestination());
        $fiberList[] = $fiber;
        if (count($fiberList) >= $concurrency){
            foreach (waitForFibers($fiberList, 1) as $fiber){
                [$source, $destination] = $fiber->getReturn();
                echo 'Successfully created clip from ' . $source . ' => ' . $destination . PHP_EOL;
            }
        }
    }
}

foreach (waitForFibers($fiberList) as $fiber){
    [$source, $destination] = $fiber->getReturn();
    echo 'Successfully created clip from ' . $source . ' => ' . $destination . PHP_EOL;
}

Отлично, теперь с ограничением параллелизма, равным 3, мой процессор не так утомлен переключением контекста и может выполнить все 19 эпизодов Scrubs всего за 143,7 секунды. Намного лучше, чем первоначальные 243,1 секунды, которые потребовались для синхронного выполнения.

Полный пример

<?php

$ffmpeg = getenv('FFMPEG_BIN') ?: 'ffmpeg';
$concurrency = $argv[1] ?? 3;
$fiberList = [];
$start = microtime(true);

foreach (new DirectoryIterator('.') as $item){
    if ($item->getExtension() === 'mkv'){
        $fiber = new Fiber(createVideoClip(...));
        $fiber->start($ffmpeg, $item->getPathname(), getTempDestination());
        $fiberList[] = $fiber;
        if (count($fiberList) >= $concurrency){
            foreach (waitForFibers($fiberList, 1) as $fiber){
                [$source, $destination] = $fiber->getReturn();
                echo 'Successfully created clip from ' . $source . ' => ' . $destination . PHP_EOL;
            }
        }
    }
}

foreach (waitForFibers($fiberList) as $fiber){
    [$source, $destination] = $fiber->getReturn();
    echo 'Successfully created clip from ' . $source . ' => ' . $destination . PHP_EOL;
}

$end = microtime(true);
echo 'Directory processed in ' . round($end - $start, 1) . ' seconds' . PHP_EOL;

/**
 * @param Fiber[] $fiberList
 * @param int|null $completionCount
 *
 * @return Fiber[]
 */
function waitForFibers(array &$fiberList, ?int $completionCount = null) : array{
    $completedFibers = [];
    $completionCount ??= count($fiberList);
    while (count($fiberList) && count($completedFibers) < $completionCount){
        usleep(1000);
        foreach ($fiberList as $idx => $fiber){
            if ($fiber->isSuspended()){
                $fiber->resume();
            } else if ($fiber->isTerminated()){
                $completedFibers[] = $fiber;
                unset($fiberList[$idx]);
            }
        }
    }

    return $completedFibers;
}

function getTempDestination() : string{
    $destination = tempnam(sys_get_temp_dir(), 'video');
    unlink($destination);
    $dir = dirname($destination);
    $file = basename($destination, '.tmp');

    return $dir . DIRECTORY_SEPARATOR . $file . '.mp4';
}

function createVideoClip(string $ffmpeg, string $source, string $destination) : array{
    $cmd = sprintf('%s -threads 1 -i %s -t 30 -crf 26 -c:v h264 -c:a ac3 %s', $ffmpeg, $source, $destination);

    $stdout = fopen('php://temporary', 'w+');
    $stderr = fopen('php://temporary', 'w+');
    $streams = [
        0 => ['pipe', 'r']
        , 1 => $stdout
        , 2 => $stderr
    ];

    $proc = proc_open($cmd, $streams, $pipes);
    if (!$proc){
        throw new RuntimeException('Unable to launch download process');
    }

    do {
        Fiber::suspend();
        $status = proc_get_status($proc);
    } while ($status['running']);

    proc_close($proc);
    fclose($stdout);
    fclose($stderr);
    $success = $status['exitcode'] === 0;
    if ($success){
        return [$source, $destination];
    } else {
        throw new \RuntimeException('Unable to perform conversion');
    }
}

Источник: https://habr.com/ru/articles/756642/


Интересные статьи

Интересные статьи

Началось всё с того, что я открыл для себя Kaggle. В частности, я принимаю участие в публичном соревновании Spaceship Titanic. Это более "молодая" версия классического Титаника. Код, продемонстированн...
Когда работаешь в проекте со сторонними апи предоставляющими какой-либо сервис, то необходимо делать к ним запросы с бэкенда и как по мне, делать это с бекэнда бывает не так удобно как с фронтенда. Те...
В этом базовом руководстве вы узнаете самые основы команды awk, а также увидите некоторые способы её использования при работе с текстом, включая вывод содержимого файла, а также его конкретных столб...
Мало кто из литераторов, да что там литераторов – даже нашего брата-историка, смог избежать описания драматической картины: король Пруссии Фридрих-Вильгельм III, кутаясь в плащ, наблюдает, как на на п...
Многие сервисы в современном мире, по большей части, «ничего не делают». Их задачи сводятся к запросам к другим базам/сервисам/кешам и агрегации всех этих данных по различным правилам и разн...