Darkwood Blog Blog
  • Artikel
  • Auto
  • Releases
de
  • en
  • fr
Anmeldung
  • Blog
  • Artikel
  • Auto
  • Releases

đŸ˜¶â€đŸŒ«ïž Diese Person existiert nicht – Synchrone PHP-zu-asynchrone Orchestrierung mit Flow

vom 13. Juni 2026

Anmelden um auf diesen Beitrag zu reagieren

🚀 1

Sie mĂŒssen zehn Bilder von einer entfernten API herunterladen. Zwischen den einzelnen Anfragen warten Sie jeweils eine Sekunde, um ein Ratenlimit einzuhalten. Jedes Bild durchlĂ€uft denselben Prozess: Abrufen, Hashen, Speichern, Berichten.

Der Code sieht aus wie eine bekannte for-Schleife. Er funktioniert. Doch wĂ€hrend ein Bild heruntergeladen wird, warten die anderen neun. Solange ein Prozess in einer sleep()-Funktion pausiert, steht der gesamte Prozess still. Die Arbeitseinheiten sind unabhĂ€ngig – nur die Orchestrierung behandelt sie so, als wĂ€ren sie es nicht.

Dies ist das konkrete Problem, das wir im Projekt flow-thispersondoesnotexist isoliert haben: ein minimaler Symfony-Befehl, der mehrere Bilder aus ThisPersonDoesNotExist abruft und auf der Festplatte speichert.

Ziel war es nicht, Bilder schneller herunterzuladen.

Ziel war es, zu verstehen, wie man eine sequentielle AusfĂŒhrung mit darkwood/flow in eine asynchrone Orchestrierung umwandeln kann – ohne die GeschĂ€ftslogik zu Ă€ndern, sondern durch eine Änderung des AusfĂŒhrungsmodells.

Der Übergang lĂ€sst sich in zwei Commits zusammenfassen:

  • fae47e8 — Befehl app:fetch-thispersondoesnotexist, synchrone Schleife
  • c61c929 — derselbe Befehl, orchestriert von Flow und FiberDriver

Zwischen den beiden Versionen Àndert sich nur eine Datei: src/Command/FetchThisPersonDoesNotExistCommand.php.

Der Ausgangspunkt: eine synchrone Schleife

Die erste Version (fae47e8) verwendet eine klassische for-Schleife. Abrufen, Hashen, Speichern und Berichten werden inline ausgefĂŒhrt; ein sleep(1) blockiert den Prozess zwischen den einzelnen Downloads:

$savedFiles = [];

for ($index = 1; $index <= $count; ++$index) {
    if ($count > 1) {
        $this->io->section(sprintf('Download %d/%d', $index, $count));
    }

    try {
        $image = $this->fetchImage();
        $checksum = $this->computeChecksum($image);
        $filename = $this->generateFilename($checksum, $file);
        $outputPath = $this->saveFile($filename, $image);
        $this->report($outputPath, $checksum, \strlen($image));
    } catch (\RuntimeException $exception) {
        $this->io->error($exception->getMessage());

        return Command::FAILURE;
    }

    $savedFiles[] = $outputPath;

    if ($index < $count) {
        $this->io->note(sprintf('Waiting %d second(s) before next download...', self::DOWNLOAD_DELAY_SECONDS));
        sleep(self::DOWNLOAD_DELAY_SECONDS);
    }
}

Der Prozess verlĂ€uft linear: Herunterladen, Verarbeiten, Warten, Weiter zum nĂ€chsten Bild. Jeder Schritt blockiert den Prozess vollstĂ€ndig – einschließlich der sleep()-Funktion, die das Programm selbst dann einfriert, wenn keine sinnvolle Arbeit ausgefĂŒhrt wird.

Dieser Ansatz ist leicht verstÀndlich, verschwendet aber E/A-Wartezeit: Solange ein Bild nicht fertig ist, kann kein anderes weiterbearbeitet werden.

Hinweis: Die synchrone Version behielt auch die alternative Methode fetchImageViaFileGetContents() bei. Diese wird in der Flow-Version entfernt – explizite Streams (fopen) werden zukĂŒnftig bevorzugt.

Beobachten Sie den verborgenen Arbeitsablauf

Hinter der Schleife verbirgt sich eine Pipeline, die fĂŒr jedes Bild wiederholt wird:

Fetch Image → Save Image → Report Result
Image #1 : Fetch → Save → Report
Image #2 : Fetch → Save → Report
Image #3 : Fetch → Save → Report

Jedes Bild stellt eine in sich abgeschlossene Arbeitseinheit dar. Die Frage lautet daher: Warum warten, bis Bild 1 vollstÀndig fertiggestellt ist, bevor mit Bild 2 begonnen wird?

Vorher/Nachher: ​​Die architektonische VerĂ€nderung

Vor der EinfĂŒhrung von Flow erfolgte die AusfĂŒhrung streng sequenziell:

Image 1 → Fetch → Save → Sleep
Image 2 → Fetch → Save → Sleep
Image 3 → Fetch → Save → Sleep

Anschließend werden die Arbeitseinheiten unabhĂ€ngig voneinander geplant und durchlaufen einen gemeinsamen Prozess:

Image 1 ─┐
Image 2 ─┌─> Flow Pipeline (Fetch → Save + Report) ─> rĂ©sultats
Image 3 ─┘

Die for-Schleife steuert nicht mehr die schrittweise AusfĂŒhrung. Sie reiht Ips (Arbeitseinheiten) in eine Warteschlange ein. Flow orchestriert deren Durchlauf durch die Pipeline; await() synchronisiert am Ende.

EinfĂŒhrung in Flow

Die AbhĂ€ngigkeit darkwood/flow wird vor dem Befehl (e71899a) zum Projekt hinzugefĂŒgt. Version c61c929 verwendet sie schließlich zur Orchestrierung der Arbeit.

eingefĂŒhrte Importe:

use Flow\Driver\FiberDriver;
use Flow\ExceptionInterface;
use Flow\FlowFactory;
use Flow\Ip;

Jedes Bild wird zu einer eigenstÀndigen Arbeitseinheit.

In Flow wird diese Einheit durch einen Ip (Befehlszeiger) dargestellt:

$flow(new Ip($index));

Die Pipeline wird in Stufen mit FlowFactory und einem Generator beschrieben:

$driver = new FiberDriver();
$savedFiles = [];

$flow = (new FlowFactory())->create(function () use ($file, &$savedFiles, $driver) {
    yield [
        fn (int $index) => $this->fetchImage($index, $driver),
        fn (ExceptionInterface $exception) => throw new \RuntimeException($exception->getMessage()),
    ];
    yield function (string $image) use ($file, &$savedFiles): string {
        $outputPath = $this->saveAndReport($image, $file);
        $savedFiles[] = $outputPath;

        return $outputPath;
    };
}, ['driver' => $driver]);

for ($index = 1; $index <= $count; ++$index) {
    $flow(new Ip($index));
}

try {
    $flow->await();
} catch (\RuntimeException $exception) {
    $this->io->error($exception->getMessage());

    return Command::FAILURE;
}

Konzeptionell entspricht dies dem Schreiben:

Ip($index)
 ↓
Fetch Image        ← Ă©tape async (fiber + delay)
 ↓
Save + Report      ← Ă©tape suivante, reçoit le string image

Jedes Bild durchlÀuft exakt dieselbe Pipeline.

Die Verantwortlichkeiten werden mithilfe spezieller Methoden ermittelt:

  • fetchImage(int $index, FiberDriver $driver): string — Herunterladen mit kooperativer Suspendierung
  • saveAndReport(string $image, ?string $fileOverride): string — Hash, Speichern, Berichterstellung

Die Fehlerbehandlung beendet die Schleife: Ein errorJob im Fetch-Schritt löst eine RuntimeException aus, die nur einmal um await() herum abgefangen wird.

Ersetze die blockierende Wartezeit.

Die bedeutendste Änderung betrifft das Wartezeitmanagement.

In der synchronen Version ist die Verzögerung global und blockierend:

private const int DOWNLOAD_DELAY_SECONDS = 1;

// dans la boucle, entre chaque image :
sleep(self::DOWNLOAD_DELAY_SECONDS);

In der Flow-Version gilt die Verzögerung pro Faser und Kooperative:

private const int DELAY_MIN_SECONDS = 1;
private const int DELAY_MAX_SECONDS = 3;

// dans fetchImage(), avant le téléchargement :
$delay = random_int(self::DELAY_MIN_SECONDS, self::DELAY_MAX_SECONDS);
$this->io->note(sprintf('#%d: suspending fiber for %ds before download...', $index, $delay));
$driver->delay($delay);

$driver->delay() unterbricht nur die aktuell laufende Fiber. Die anderen Fibers werden fortgesetzt:

Image #1 attend (delay 2s)
Image #2 télécharge
Image #3 sauvegarde
Image #4 démarre son delay

Bei --count=5 nÀhert sich die Gesamtzeit eher max(delays + fetch) als deren Summe an.

Fasern als Grundlage fĂŒr die AusfĂŒhrung

Im Hintergrund basiert FiberDriver auf PHP 8.1 Fibers: Ein Fiber kann angehalten und spĂ€ter fortgesetzt werden, wodurch mehrere unabhĂ€ngige Prozesse mit einem Code ausgefĂŒhrt werden können, der dem klassischen PHP sehr nahe kommt.

$driver = new FiberDriver();

Jedes Bild hat seine eigene Fiber. Wenn eine Fiber wartet ($driver->delay()), setzen die anderen ihren Fortschritt fort. Das Logging spiegelt dieses Verhalten wider:

#2: suspending fiber for 2s before download...
#2: downloading (other fibers may run while this one waits)...
#2: fetch complete in 1.34s

Flow ersetzt Fibers nicht, sondern nutzt sie ĂŒber einen Treiber. Die Wahl des Treibers bestimmt, wie die Pipeline ausgefĂŒhrt wird; die Pipeline-Beschreibung bleibt unverĂ€ndert.

Der Synchronisationspunkt

Die for-Abfrage identifiziert die Arbeitseinheiten:

for ($index = 1; $index <= $count; ++$index) {
    $flow(new Ip($index));
}

await() wird zum einzigen Synchronisationspunkt:

$flow->await();
Planifier Ip(1)
Planifier Ip(2)
Planifier Ip(3)


await() — barriùre finale

Die Verantwortung fĂŒr die Orchestrierung wurde an Flow ĂŒbertragen. Die for-Schleife ĂŒbernimmt diese Aufgabe nicht mehr: Sie reiht Einheiten in die Warteschlange ein. Flow fĂŒhrt diese parallel aus; await() wartet, bis die gesamte Pipeline abgeschlossen ist.

Warum PHP Streams beibehalten?

Das Projekt verwendet absichtlich:

$stream = fopen(self::FETCH_URL, 'r', false, $context);
$content = stream_get_contents($stream);

statt file_get_contents().

Ziel ist es nicht, einen HTTP-Client neu zu erfinden.

Ziel ist es, sich eng an die PHP-Grundfunktionen zu halten – der Codekommentar gibt dies ausdrĂŒcklich an:

Bevorzugt fĂŒr Flow-Experimente: Das Ressourcenhandle ist das Primitive, das spĂ€ter mit stream_select(), dem nicht-blockierenden Modus oder Fibers verbunden werden kann.

Diese Grundformen können sich dann weiterentwickeln zu:

stream_set_blocking(false);
stream_select(...);

oder in fortgeschrittenere Treiber integriert werden (Amp, React, Swoole – alle werden von darkwood/flow unterstĂŒtzt).

Streams bieten eine natĂŒrliche Grundlage fĂŒr Experimente mit asynchronen Modellen in PHP.

WettbewerbsbeschrÀnkung (nÀchster Schritt)

In diesem Proof of Concept werden alle Bilder parallel gesendet (bis zu --count aktiven Fasern).

FĂŒr den Produktiveinsatz bietet Flow Strategien wie MaxIpStrategy:

use Flow\IpStrategy\MaxIpStrategy;

yield [$job1, $errorJob1, new MaxIpStrategy(5)];

Das offizielle Beispielpaket (examples/flow.php) verwendet MaxIpStrategy(2), um die Anzahl der gleichzeitig ausgefĂŒhrten Jobs in jedem Schritt zu begrenzen.

Es ist noch nicht mit flow-thispersondoesnotexist verbunden, aber es ist der natĂŒrliche Hebel zur Ratenbegrenzung, ohne auf sequentielles sleep() zurĂŒckzugreifen.

Ein Modell, das weit ĂŒber Bilder hinaus anwendbar ist

Das Beispiel ThisPersonDoesNotExist ist bewusst einfach gehalten. Dasselbe Muster findet sich in Darkwood-Pipelines wieder:

Scraping YouTube     → Lister → TĂ©lĂ©charger / transcrire → Sauvegarder
Traitement vidĂ©o     → GĂ©nĂ©rer assets → Encoder → Persister
Agent IA             → Lire → Transformer → Publier

In allen FĂ€llen handelt es sich um unabhĂ€ngige Arbeitseinheiten, die eine Abfolge von Schritten durchlaufen. Der Arbeitsfluss zielt nicht darauf ab, eine Aufgabe auszufĂŒhren, sondern er orchestriert ArbeitsablĂ€ufe.

MigrationsĂŒbersicht

Erscheinungsbild fae47e8 (synchron) c61c929 (Flow)
Schleife for mit fetch/save inline for enqueue + $flow->await()
Verzögerung sleep(1) global $driver->delay() pro Faser
Fehler try/catch in der Schleife errorJob + catch bei await()
Abrufen fetchImage() fetchImage($index, $driver)
Speichern inline saveAndReport()
Wettbewerb keiner Parallelfasern ĂŒber FiberDriver

Was Flow nicht ist

Bevor wir zum Schluss kommen, noch eine hilfreiche Klarstellung, um hÀufige MissverstÀndnisse zu vermeiden.

Flow ist keine Ereignisschleife. Es verarbeitet nicht direkt einen select / poll-Zyklus auf E/A-Deskriptoren.

Flow ist keine Implementierung von Fibers. Fibers sind ein PHP 8.1-Primitiv. Flow orchestriert sie ĂŒber Treiber, ersetzt sie aber nicht.

Flow ist keine Laufzeitumgebung. Es verĂ€ndert das PHP-AusfĂŒhrungsmodell nicht. Es lĂ€uft in einem Standard-PHP-Prozess.

Flow ist kein Ersatz fĂŒr Amp, ReactPHP oder Swoole. Diese Bibliotheken stellen Laufzeitumgebungen und Ereignisschleifen bereit. Flow kann auf diese – AmpDriver, ReactDriver, SwooleDriver – zurĂŒckgreifen, ohne mit ihnen in Konkurrenz zu treten.

Flow bietet ein Modell der Orchestrierung:

  • Arbeitseinheiten beschreiben (Ip)
  • Pipelines aus Schritten zusammenstellen (yield in FlowFactory)
  • Delegieren Sie die AusfĂŒhrung an einen ausgewĂ€hlten Treiber (FiberDriver, AmpDriver, ReactDriver, SwooleDriver, 
)

Das Orchestrierungsmodell ist vom AusfĂŒhrungsmodell getrennt. Eine Pipeline kann einmal beschrieben werden, und der Treiber kann dann je nach Kontext geĂ€ndert werden – CLI mit Fibers, Swoole-Worker, React-Service – ohne die GeschĂ€ftslogik neu schreiben zu mĂŒssen.

Abschluss

Der Übergang von synchron zu asynchron ist nicht in erster Linie eine Frage der LeistungsfĂ€higkeit.

Es handelt sich um einen Wandel im mentalen Modell:

Avant : exĂ©cuter l'opĂ©ration A, puis B, puis C — et attendre entre chaque Ă©tape.

AprÚs : décrire un pipeline, enqueue des unités de travail,
        laisser l'orchestrateur planifier l'exécution.

Die for-Schleife wird zum IP-Produzenten. Die Schritte fetch und save werden zu verketteten Prozessen. await() ersetzt die Abfolge von sleep()-Aufrufen. Die GeschĂ€ftslogik (Herunterladen, Hashen, Speichern) bleibt unverĂ€ndert; geĂ€ndert hat sich lediglich, wer entscheidet, wann welcher Schritt ausgefĂŒhrt wird.

Genau dieses Modell benötigt Darkwood fĂŒr seine zukĂŒnftigen Pipelines: YouTube-Scraping und Transkriptextraktion, Medienverarbeitung in MediaBundle, mehrstufige KI-Agenten und die Orchestrierung langlaufender Workflows. UnabhĂ€ngige Einheiten, kombinierbare Phasen und ein expliziter Synchronisationspunkt.

Das Projekt ThisPersonDoesNotExist ist die minimale Demonstration: eine einzelne Datei, eine zweistufige Pipeline, ein Treiber – ausreichend, um den Übergang zu veranschaulichen, ohne seinen Umfang zu verschleiern.

Um weiterzukommen

  • POC-Repository: flow-thispersondoesnotexist — Commits fae47e8 → c61c929
  • Paket: darkwood/flow — Beispiel examples/flow.php FrĂ©dĂ©ric Boucherys Artikel zeichnet die Geschichte der asynchronen Programmierung in PHP nach, von den in PHP 4.3 eingefĂŒhrten Streams bis zu den modernen Fibers von PHP 8.1: https://f2r.github.io/fr/asynchrone.html

Anmelden um auf diesen Beitrag zu reagieren

🚀 1

Site

  • Sitemap
  • Kontakt
  • Impressum

Network

  • Hello
  • Blog
  • Apps
  • Photos

Social

Darkwood 2026, alle Rechte vorbehalten