Anatomie eines Gegendrucks in Jetstreams

Durch das Lesen zahlreicher Artikel zum Thema reaktive Ströme kann der Leser zu dem Schluss kommen, dass:



  • Gegendruck ist cool
  • Gegendruck ist nur in Bibliotheken verfügbar, die die Spezifikation für reaktive Streams implementieren
  • Diese Spezifikation ist so komplex, dass Sie nicht einmal versuchen sollten, sie selbst zu implementieren


In diesem Artikel werde ich versuchen, das zu zeigen:



  • Gegendruck ist sehr einfach
  • Um einen asynchronen Gegendruck zu implementieren, reicht es aus, eine asynchrone Version des Semaphors zu erstellen
  • Wenn eine asynchrone Semaphorimplementierung vorhanden ist, wird die Schnittstelle org.reactivestreams.Publisher in einigen Dutzend Codezeilen implementiert


Der Gegendruck ist ein Feedback, das die Geschwindigkeit des Datenproduzenten an die Geschwindigkeit des Verbrauchers anpasst. Ohne eine solche Verbindung kann der schnellere Hersteller den Puffer des Verbrauchers überlaufen lassen oder, wenn der Puffer dimensionslos ist, den gesamten RAM erschöpfen.



In der Multithread-Programmierung wurde dieses Problem von Dijkstroy gelöst, der einen neuen Synchronisationsmechanismus vorschlug - das Semaphor. Ein Semaphor kann als Berechtigungszähler betrachtet werden. Es wird davon ausgegangen, dass der Produzent die Erlaubnis des Semaphors anfordert, bevor er eine ressourcenintensive Aktion ausführt. Wenn das Semaphor leer ist, wird der Producer-Thread blockiert.



Asynchrone Programme können Threads nicht blockieren, sodass sie nicht auf ein leeres Semaphor zugreifen können, um Berechtigungen zu erhalten (sie können jedoch alle anderen Semaphoroperationen ausführen). Sie müssen ihre Ausführung auf andere Weise blockieren. Auf diese andere Weise verlassen sie nur den Arbeitsthread, auf dem sie ausgeführt wurden, aber vorher vereinbaren sie, zur Arbeit zurückzukehren, sobald das Semaphor voll ist.



Die eleganteste Möglichkeit, ein asynchrones Programm anzuhalten und fortzusetzen, besteht darin, es als Datenflussakteur mit Ports zu strukturieren :







Ein Datenflussmodell - Akteure mit Ports, die gerichteten Verbindungen zwischen ihren Ports und Anfangstoken. Entnommen aus: Eine strukturierte Beschreibung der Datenflussakteure und ihrer Anwendung



Es gibt Eingangs- und Ausgangsanschlüsse. Die Eingangsports empfangen Token (Nachrichten und Signale) von den Ausgangsports anderer Akteure. Wenn der Eingabeport Token enthält und der Ausgabeport einen Platz zum Platzieren von Token hat, wird er als aktiv betrachtet. Wenn alle Ports des Akteurs aktiv sind, wird er zur Ausführung gesendet. Wenn das Schauspielerprogramm seine Arbeit wieder aufnimmt, kann es Token sicher von den Eingabeports lesen und auf das Wochenende schreiben. Dieser einfache Mechanismus enthält alle Weisheiten der asynchronen Programmierung. Die Zuweisung von Ports als separate Unterobjekte von Akteuren vereinfacht die Codierung asynchroner Programme erheblich und ermöglicht die Erhöhung ihrer Vielfalt durch die Kombination von Ports unterschiedlicher Typen.



Der klassische Hewitt-Akteur enthält zwei Ports - einer ist sichtbar, mit einem Puffer für eingehende Nachrichten, der andere ist eine versteckte Binärdatei, die blockiert, wenn der Akteur zur Ausführung gesendet wird, und somit verhindert, dass der Akteur bis zum Ende des ersten Starts neu gestartet wird. Das gewünschte asynchrone Semaphor ist eine Kreuzung zwischen diesen beiden Ports. Wie ein Nachrichtenpuffer kann er viele Token speichern, und wie ein versteckter Port sind diese Token schwarz, dh nicht unterscheidbar, wie in Petri-Netzen, und ein Token-Zähler reicht aus, um sie zu speichern.



Auf der ersten Ebene der Hierarchie, haben wir eine Klasse AbstractActormit drei verschachtelten Klassen - Basis Portund Derivate AsyncSemaPortund InPortsowie mit einem Mechanismus einen Schauspielers für die Ausführung in Abwesenheit von blockierten Ports für den Start. Kurz gesagt, es sieht so aus:



public abstract class AbstractActor {
    /**    */
    private int blocked = 0;

    protected synchronized void restart() {
            controlPort.unBlock();
    }

    private synchronized void incBlockCount() {
        blocked++;
    }

    private synchronized void decBlockCount() {
        blocked--;
        if (blocked == 0) {
            controlPort.block();
            excecutor.execute(this::run);
        }
    }

    protected abstract void turn() throws Throwable;

    /**   */
    private void run() {
        try {
            turn();
            restart();
        } catch (Throwable throwable) {
            whenError(throwable);
        }
    }
}


Es enthält eine minimale Anzahl von Portklassen:



Port- Basisklasse aller Ports



    protected  class Port {
        private boolean isBlocked = true;

        public Port() {
            incBlockCount();
        }

        protected synchronized void block() {
            if (isBlocked) {
                return;
            }
            isBlocked = true;
            incBlockCount();
        }

        protected synchronized void unBlock() {
            if (!isBlocked) {
                return;
            }
            isBlocked = false;
            decBlockCount();
        }
    }


Asynchrones Semaphor:



    public class AsyncSemaPort extends Port {
        private long permissions = 0;

        public synchronized void release(long n) {
            permissions += n;
            if (permissions > 0) {
                unBlock();
            }
        }

        public synchronized void aquire(long delta) {
            permissions -= delta;
            if (permissions <= 0) { 
                //    
                //        ,
                //       
                block();
            }
        }
    }


InPort - Mindestpuffer für eine eingehende Nachricht:



    public class InPort<T> extends Port implements OutMessagePort<T> {
        private T item;

        @Override
        public void onNext(T item) {
            this.item = item;
            unBlock();
        }

        public synchronized T poll() {
            T res = item;
            item = null;
            return res;
        }
    }


Die Vollversion der Klasse AbstractActorkann hier eingesehen werden.



Auf der nächsten Ebene der Hierarchie haben wir drei abstrakte Akteure mit bestimmten Ports, aber mit undefinierten Verarbeitungsroutinen:



  • Eine Klasse AbstractProducerist ein Akteur mit einem Port vom Typ des asynchronen Semaphors (und einem internen Kontrollport, der standardmäßig in allen Akteuren vorhanden ist).
  • Die Klasse AbstractTransformerist ein regulärer Hewitt-Akteur mit einem Verweis auf den Eingabeport des nächsten Akteurs in der Kette, an den die konvertierten Token gesendet werden.
  • Die Klasse AbstractConsumerist auch ein gewöhnlicher Akteur, sendet jedoch die transformierten Token nirgendwo hin, während sie einen Link zum Produzenten-Semaphor hat, und öffnet dieses Semaphor, nachdem das Eingabe-Token absorbiert wurde. Dies hält die Anzahl der Token im Prozess konstant und es tritt kein Pufferüberlauf auf.


Auf der letzten Ebene, die sich bereits im Testverzeichnis befindet, werden bestimmte in Tests verwendete Akteure definiert :



  • Die Klasse ProducerActorerzeugt einen endlichen Strom von ganzen Zahlen.
  • Die Klasse TransformerActornimmt die nächste Nummer aus dem Stream und sendet sie über die Kette.
  • Klasse ConsumerActor- akzeptiert und druckt die resultierenden Zahlen


Jetzt können wir eine Kette von asynchronen Parallelverarbeitungshandlern wie folgt aufbauen: Hersteller - beliebig viele Transformatoren - Verbraucher







Daher haben wir einen Gegendruck implementiert, und selbst in einer allgemeineren Form als in der Spezifikation für reaktive Ströme kann die Rückkopplung eine beliebige Anzahl von Verarbeitungskaskaden umfassen und nicht nur benachbarte, wie in der Spezifikation.



Um die Spezifikation zu implementieren, müssen Sie einen Ausgabeport definieren, der von der Anzahl der Berechtigungen abhängt, die mit der request () -Methode an ihn übergeben wurden. Dies ist der Fall Publisher, und die vorhandene mit einem InPortAufruf dieser Methode ergänzen Subscriber. Das heißt, wir gehen davon aus, dass Schnittstellen PublisherundSubscriberbeschreiben das Verhalten von Häfen, nicht von Akteuren. Gemessen an der Tatsache, dass es in der Liste der Schnittstellen auch eine Schnittstelle gibt Processor, die in keiner Weise eine Portschnittstelle sein kann, betrachten die Autoren der Spezifikation ihre Schnittstellen als Akteurschnittstellen. Nun, wir können Akteure machen, die all diese Schnittstellen implementieren, indem wir die Ausführung von Schnittstellenfunktionen an die entsprechenden Ports delegieren.



Lassen Sie uns der Einfachheit halber Publisherkeinen eigenen Puffer haben und schreiben Sie direkt in den Puffer Subscriber. Dazu benötigen Sie jemanden, der sich Subscriberanmeldet und erfüllt request(). Das heißt, wir haben zwei Bedingungen und dementsprechend zwei Ports - InPort<Subscriber>und AsyncSemaPort. Keiner von ihnen eignet sich als Basis für die ImplementierungPublisher'a, da es unnötige Methoden enthält, werden wir diese Ports zu internen Variablen machen:



public class ReactiveOutPort<T> implements Publisher<T>, Subscription, OutMessagePort<T> {
    protected AbstractActor.InPort<Subscriber<? super T>> subscriber;
    protected AbstractActor.AsyncSemaPort sema;

    public ReactiveOutPort(AbstractActor actor) {
        subscriber = actor.new InPort<>();
        sema = actor.new AsyncSemaPort();
    }
}


Dieses Mal haben wir die Klasse nicht ReactiveOutPortals verschachtelt definiert, daher war ein Konstruktorparameter erforderlich, ein Verweis auf den einschließenden Akteur, um die als verschachtelte Klassen definierten Ports zu instanziieren.



Die Methode subscribe(Subscriber subscriber)läuft darauf hinaus, den Teilnehmer zu speichern und aufzurufen subscriber.onSubscribe():



    public synchronized void subscribe(Subscriber<? super T> subscriber) {
        if (subscriber == null) {
            throw new NullPointerException();
        }
        if (this.subscriber.isFull()) {
            subscriber.onError(new IllegalStateException());
            return;
        }
        this.subscriber.onNext(subscriber);
        subscriber.onSubscribe(this);
    }


was normalerweise zu einem Aufruf führt Publisher.request(), der darauf hinausläuft, das Semaphor mit einem Aufruf zu erhöhen AsyncSemaPort.release():



    public synchronized void request(long n) {
        if (subscriber.isEmpty()) {
            return; // this spec requirement
        }
        if (n <= 0) {
            subscriber.current().onError(new IllegalArgumentException());
            return;
        }
        sema.release(n);
    }


Und jetzt müssen wir nicht vergessen, das Semaphor AsyncSemaPort.aquire()zum Zeitpunkt der Ressourcennutzung durch einen Aufruf zu senken :



    public synchronized void onNext(T item) {
        Subscriber<? super T> subscriber = this.subscriber.current();
        if (subscriber == null) {
            throw  new IllegalStateException();
        }
        sema.aquire();
        subscriber.onNext(item);
    }


Das AsyncSemaphore- Projekt wurde speziell für diesen Artikel entwickelt. Es ist absichtlich so kompakt wie möglich gestaltet, um den Leser nicht zu ermüden. Infolgedessen enthält es erhebliche Einschränkungen:



  • Publisher' Subscriber'
  • Subscriber' 1


Außerdem ist AsyncSemaPortes kein vollständiges Analogon eines synchronen Semaphors - nur ein Client kann die Operation aquire()y ausführen AsyncSemaPort(dh den einschließenden Akteur). Dies ist jedoch kein Nachteil - AsyncSemaPortes erfüllt seine Rolle gut. Im Prinzip können Sie es anders machen - nehmen Sie es java.util.concurrent.Semaphoreund ergänzen Sie es mit einer asynchronen Abonnementschnittstelle (siehe AsyncSemaphore.java aus dem DF4J- Projekt ). Ein solches Semaphor kann Akteure und Fäden der Ausführung in beliebiger Reihenfolge binden.



Im Allgemeinen hat jede Art von synchroner (blockierender) Interaktion ihr eigenes asynchrones (nicht blockierendes) Gegenstück. Im selben DF4J-Projekt gibt es also eine ImplementierungBlockingQueue, ergänzt durch eine asynchrone Schnittstelle. Dies eröffnet die Möglichkeit einer schrittweisen Umwandlung eines Multithread-Programms in ein asynchrones Programm, wobei Threads teilweise durch Akteure ersetzt werden.



All Articles