Wenn parallele Streams rutschen

Wir leben seit langer Zeit in der Welt der Multi-Core-Prozessoren und Multitasking-Anwendungen und wissen, dass der naheliegendste Weg zur Leistungssteigerung darin besteht, die Aufgaben zu parallelisieren, die über mehrere Threads oder Prozesse hinweg ausgeführt werden. Genauer gesagt, soweit die Prozessorressourcen dies zulassen. Ein unerfahrener oder sogar erfahrener Entwickler kann jedoch in einer scheinbar offensichtlichen Situation mit einer Reihe von Fallstricken konfrontiert sein. In diesem Artikel nahm der Autor den einfachsten Code, maß seine Leistung in einem Thread, parallelisierte ihn und erwartete zu Recht bessere Ergebnisse, aber etwas ging schief ...



Ein Beispiel starten



Im Folgenden werden wir eine einfache Anwendung in Java schreiben (der Autor hat Java 14 verwendet, aber Java 8 ist auch in Ordnung), die Leistung anhand von Zählern in der Anwendung messen und versuchen, das Ergebnis durch Ausführen des Codes in mehreren Threads zu verbessern. Alles, was zur Reproduktion des Beispiels benötigt wird, ist eine Java-Entwicklungsumgebung oder nur JDK und ein VisualVM-Dienstprogramm, mit dessen Hilfe wir die aufgetretenen Probleme diagnostizieren können. Das Beispiel verwendet absichtlich keine verschiedenen Benchmarks zur Messung der Leistung und anderer fortschrittlicher Tools - in diesem Fall sind sie überflüssig. Der Testfall wurde unter Windows auf einem Intel Core i7-Prozessor mit 4 physischen und 8 logischen Kernen ausgeführt.



Erstellen wir also eine einfache Anwendung, die in einer Schleife eine Rechenaufgabe ausführt, die den Prozessor belastet, nämlich die Berechnung der Fakultät. Darüber hinaus berechnet jede Aufgabe in der Schleife auch die Fakultät einer Zahl im Bereich von 1 bis 25. Der schwebende Bereich wird verwendet, um das Beispiel der Realität näher zu bringen. Unten ist der Code für die work () -Funktion:



void work(int power) {
        for (int i = 0; i < power; i++) {
            long result = factorial(RandomUtils.nextInt(1, 25));
        }
        if (counter.incrementAndGet() % LOG_STEP == 0) {
            System.out.printf("%d    %d %n", counter.longValue(), (long) ((System.currentTimeMillis() - startTime) / 1000));
        }
}


Die Funktion erhält als Eingabe die Anzahl der Zyklen zur Berechnung der Fakultät, angegeben durch eine Konstante:



private static final int POWER_BASE = 1000000;


Nach Abschluss einer bestimmten Anzahl von Aufgaben, die in der Variablen angegeben sind



private static final int LOG_STEP = 10;


Die Anzahl der erledigten Aufgaben und die Gesamtzeit ihrer Ausführung werden protokolliert. Die



Funktion work () verwendet außerdem:



//    
private long startTime;
//    
private AtomicLong counter = new AtomicLong();

//    
private long factorial(int power) {
        if (power == 1) return power;
        else return power * factorial(power - 1);
}


Es ist zu beachten, dass eine einmalige Ausführung der work () - Funktion in einem Thread etwa 20 ms dauert, sodass ein synchronisierter Aufruf des Zählers für gemeinsam genutzte Variablen am Ende, der ein Engpass sein kann, keine Probleme verursacht, da er für jeden Thread nicht mehr als 20 Mal auftritt ms, was die Ausführungszeit von counter.incrementAndGet () erheblich überschreitet. Mit anderen Worten, Konflikte zwischen Threads, die mit dem Zugriff auf einen synchronisierten Zähler verbunden sind, sollten die Ergebnisse des Experiments nicht wesentlich beeinflussen und können vernachlässigt werden.



Lassen Sie uns den folgenden Code in einem Thread ausführen und das Ergebnis sehen:



startTime = System.currentTimeMillis();
for (int i = 0; i < Integer.MAX_VALUE; i++) {
        work(POWER_BASE);
}


In der Konsole wird die folgende Ausgabe angezeigt:



10 Aufgaben in 0 Sekunden erledigt

...

100 Aufgaben in 2 Sekunden erledigt

...

500 Aufgaben in 10 Sekunden erledigt



In einem Thread haben wir also eine Leistung von 50 Aufgaben pro Sekunde oder 20 ms pro Aufgabe.



Code parallelisieren



Wenn wir die Leistung X in einem Thread erhalten, können wir auf 4 Prozessoren ohne zusätzliche Last erwarten, dass die Leistung etwa 4 * X beträgt, dh sich um das Vierfache erhöht. Es scheint ziemlich logisch. Na lass es uns versuchen!



Einführung eines einfachen Pools mit einer festen Anzahl von Threads:



private ExecutorService executorService = Executors.newFixedThreadPool(POOL_SIZE);


Konstante:



private static final int POOL_SIZE = 1;


Wir werden den Bereich von 1 bis 16 ändern und das Ergebnis korrigieren.



Neugestaltung des Startcodes:



startTime = System.currentTimeMillis();
for (int i = 0; i < Integer.MAX_VALUE; i++) {
        executorService.execute(() -> work(POWER_BASE));
}


Standardmäßig ist die Größe der Aufgabenwarteschlange im Thread-Pool Integer.MAX_VALUE. Wir fügen dem Thread-Pool nicht mehr als Integer.MAX_VALUE-Aufgaben hinzu, damit die Aufgabenwarteschlange nicht überläuft.



Gehen!



Setzen wir zuerst die POOL_SIZE-Konstante auf 8 Threads:



private static final int POOL_SIZE = 8;


Führen Sie die Anwendung aus und sehen Sie sich die Konsole an:



10 Aufgaben in 3 Sekunden

erledigt 20 Aufgaben in 6 Sekunden

erledigt 30 Aufgaben in 8 Sekunden

erledigt 40 Aufgaben in 10 Sekunden

erledigt 50 Aufgaben in 14 Sekunden

erledigt 60 Aufgaben in 16 Sekunden

erledigt 70 Aufgaben in 19 Sekunden erledigt

80 Aufgaben erledigt in 20 Sekunden

90 Aufgaben erledigt in 23 Sekunden

100 Aufgaben erledigt in 24 Sekunden

110 Aufgaben erledigt in 26 Sekunden

120 Aufgaben erledigt in 28 Sekunden

130 Aufgaben erledigt in 29 Sekunden

140 Aufgaben erledigt in 31 Sekunden

150 Aufgaben erledigt in 33 Sekunden

160 Aufgaben erledigt in 36 Sekunden

170 Aufgaben in 46 Sekunden erledigt



Was sehen wir? Anstelle der erwarteten Leistungssteigerung fiel sie von 20 ms pro Aufgabe um mehr als das Zehnfache auf 270 ms. Aber das ist nicht alles! Die Meldung über 170 abgeschlossene Aufgaben ist die letzte im Protokoll. Dann schien die Anwendung vollständig gestoppt zu sein.



Bevor wir uns mit den Gründen für dieses seltsame Verhalten des Programms befassen, wollen wir die Dynamik verstehen und das Protokoll nacheinander für 4 und 16 Threads entfernen, indem wir die POOL_SIZE-Konstante auf die entsprechenden Werte setzen.



Protokoll für 4 Threads:



10 Aufgaben in 2 Sekunden

erledigt 20 Aufgaben in 4 Sekunden

erledigt 30 Aufgaben in 6 Sekunden

erledigt 40 Aufgaben in 8 Sekunden

erledigt 50 Aufgaben in 10 Sekunden

erledigt 60 Aufgaben in 13 Sekunden

erledigt 70 Aufgaben in 15 Sekunden erledigt

80 Aufgaben in 18 Sekunden

erledigt 90 Aufgaben in 21 Sekunden

erledigt 100 Aufgaben in 33 Sekunden



erledigt Die ersten 90 Aufgaben wurden ungefähr zur gleichen Zeit wie für 8 Threads erledigt, dann waren weitere 12 Sekunden erforderlich, um weitere 10 Aufgaben zu erledigen, und die Anwendung hing.



Protokoll für 16 Threads:



10 Aufgaben in 2 Sekunden

erledigt 20 Aufgaben in 3 Sekunden

erledigt 30 Aufgaben in 6 Sekunden

erledigt 40 Aufgaben in 8 Sekunden erledigt

...

290 Aufgaben erledigt in 51 Sekunden

300 Aufgaben erledigt in 52 Sekunden

310 Aufgaben erledigt in 63 Sekunden



Nach Abschluss Bei 310 Aufgaben wurde die Anwendung eingefroren, und wie in den vorherigen Fällen dauerten die letzten 10 Aufgaben mehr als 10 Sekunden.



Fassen wir zusammen:



Die Parallelisierung der Ausführung von Aufgaben führt zu einer 10- oder mehrfachen Leistungsminderung.



In allen Fällen hängt die Anwendung und je weniger Threads, desto schneller hängt sie (wir werden auf diese Tatsache zurückkommen).



Suche nach Problemen



Offensichtlich stimmt etwas mit unserem Code nicht. Aber wie findest du den Grund? Dazu verwenden wir das Dienstprogramm visualvm. Wir werden es vor der Ausführung unserer Anwendung starten und nach dem Start der Anwendung auf den erforderlichen Java-Prozess in der visualvm-Oberfläche umschalten. Die Anwendung kann direkt aus der Entwicklungsumgebung gestartet werden. Natürlich ist dies im Allgemeinen falsch, aber in unserem Beispiel hat dies keinen Einfluss auf das Ergebnis.



Zunächst schauen wir auf die Registerkarte Monitor und stellen fest, dass etwas mit dem Speicher nicht stimmt.







In weniger als einer Minute gingen 4 GB Speicher einfach aus! Daher wurde die Anwendung gestoppt. Aber wohin ging die Erinnerung?



Starten Sie die Anwendung neu und klicken Sie auf der Registerkarte Monitor auf die Schaltfläche Heap Dump. Nach dem Entfernen und Öffnen des Speicherauszugs sehen wir:







Im Abschnitt Klassen nach Instanzgröße wird mehr als 1 GB von der LinkedBlockingQueue $ Node-Klasse belegt. Es ist nichts weiter als ein Anfang der Thread-Pool-Task-Warteschlange. Die zweitgrößte Klasse ist die Aufgabe selbst, die dem Thread-Pool hinzugefügt wird. Um dies zu unterstützen, sehen wir im Abschnitt Klassen nach Anzahl der Instanzen die Entsprechung zwischen der Anzahl der Instanzen der ersten und zweiten Klasse (die Übereinstimmung ist nicht ganz genau, anscheinend aufgrund der Tatsache, dass zuerst eine Aufgabe erstellt wird und dann nur ein neuer Anfang der Warteschlange und aufgrund des Zeitunterschieds multipliziert mit der Anzahl der Threads ergibt sich eine leichte Diskrepanz in der Anzahl der Instanzen.



Jetzt zählen wir. Wir erstellen ungefähr 2 Milliarden Aufgaben in einer Schleife (Integer.MAX_VALUE), dh ungefähr 2 GB Aufgaben. Aufgaben werden langsamer ausgeführt als sie erstellt werden, sodass die Warteschlangengröße weiter wächst. Selbst wenn jede Aufgabe nur 8 Byte Speicher benötigt, beträgt die maximale Warteschlangengröße:



8 * 2 GB = 16 GB



Bei einer Gesamtgröße des Heapspeichers von 4 GB ist es nicht überraschend, dass nicht genügend Speicher vorhanden ist. Wenn wir die Ausführung der Anwendung, deren Protokoll gestoppt wurde, nicht unterbrechen würden, würden wir nach einer Weile den berühmten OutOfMemoryError sehen und sogar ohne visualvm, wenn wir uns nur den Code ansehen, könnten wir erraten, wohin der Speicher geht.



Denken Sie daran, dass die Anwendung umso schneller gestoppt wird, je weniger Threads die Aufgaben ausführen. Wir können jetzt versuchen, dies zu erklären. Je weniger Threads vorhanden sind, desto schneller wird die Anwendung ausgeführt (warum - wir müssen es noch herausfinden) und desto schneller füllt sich die Task-Warteschlange und der Speicher wird voll.



Die Behebung des Speicherüberlaufproblems ist sehr einfach. Erstellen wir eine Konstante anstelle von Integer.MaxValue:



private static final int MAX_TASKS = 1024 * 1024;


Und ändern wir den Code wie folgt:



startTime = System.currentTimeMillis();
for (int i = 0; i < MAX_TASKS; i++) {
        executorService.execute(() -> work(POWER_BASE));
}


Jetzt müssen Sie die Anwendung starten und sicherstellen, dass alles in Ordnung mit dem Speicher ist:







Wir setzen die Analyse fort



Wir starten unsere Anwendung erneut, erhöhen schrittweise die Anzahl der Threads und korrigieren das Ergebnis.



1 Thread - 500 Aufgaben in 10 Sekunden

2 Threads - 500 Aufgaben in 21 Sekunden

4 Threads - 500 Aufgaben in 37 Sekunden

8 Threads - 500 Aufgaben in 49 Sekunden

16 Threads - 500 Aufgaben in 57 Sekunden



Wie wir sehen können, erhöht sich die Ausführungszeit von 500 Aufgaben Die Anzahl der Threads nimmt nicht ab, sondern nimmt zu, während die Ausführungsgeschwindigkeit für jeden Teil von 10 Aufgaben gleichmäßig ist und die Threads nicht mehr einfrieren.



Lassen Sie uns das Dienstprogramm visualvm erneut verwenden und einen Thread-Dump erstellen, während die Anwendung ausgeführt wird. Für das genaueste Bild ist es besser, einen Dump zu erstellen, wenn Sie an 16 Threads arbeiten. Es gibt verschiedene Dienstprogramme zum Analysieren von Thread-Dumps. In unserem Fall können Sie jedoch einfach durch alle Threads mit den Namen "Pool-1-Thread-1", "Pool-1-Thread-2" usw. in der visualvm-Oberfläche scrollen und Folgendes sehen:







Zum Zeitpunkt des Dumpings generieren die meisten Threads die nächste Zufallszahl, um die Fakultät zu berechnen. Es stellt sich heraus, dass dies die zeitaufwändigste Funktion ist. Warum dann? Um dies herauszufinden, gehen wir in den Quellcode von Random.next () und sehen uns Folgendes an:



private final AtomicLong seed;

protected int next(int bits) {
        long oldseed, nextseed;
        AtomicLong seed = this.seed;
        do {
            oldseed = seed.get();
            nextseed = (oldseed * multiplier + addend) & mask;
        } while (!seed.compareAndSet(oldseed, nextseed));

        return (int)(nextseed >>> (48 - bits));
}


Alle Threads teilen sich eine einzelne Instanz der Seed-Variablen, auf die der Zugriff mithilfe der AtomicLong-Klasse synchronisiert wird. Dies bedeutet, dass bei der Generierung jeder Zufallszahl Threads in die Warteschlange gestellt werden, um auf diese Variable zuzugreifen, anstatt parallel ausgeführt zu werden. Daher wächst die Produktivität nicht. Aber warum fällt sie? Die Antwort ist einfach. Bei der Parallelisierung der Ausführung werden zusätzliche Ressourcen für die Unterstützung der Parallelverarbeitung aufgewendet, insbesondere für das Umschalten des Prozessorkontexts zwischen Threads. Es stellt sich heraus, dass zusätzliche Kosten aufgetreten sind und die Threads immer noch nicht parallel arbeiten, da sie um den Zugriff auf den Wert der Startvariablen konkurrieren und beim Aufruf von seed.compareAndSet () in die Warteschlange gestellt werden. Vielleicht Konkurrenz zwischen Threads um eine begrenzte RessourceDie häufigste Ursache für Leistungseinbußen bei der Parallelisierung von Berechnungen.



Ändern wir den Code der work () - Funktion wie folgt:



void work(int power) {
        for (int i = 0; i < power; i++) {
            long result = factorial(20);
        }
        if (counter.incrementAndGet() % LOG_STEP == 0) {
            System.out.printf("%d    %d %n", counter.longValue(), (long) ((System.currentTimeMillis() - startTime) / 1000));
        }
}


und überprüfen Sie erneut die Leistung für eine andere Anzahl von Threads:



1 Thread - 1000 Aufgaben in 17 Sekunden

2 Threads - 1000 Aufgaben in 10 Sekunden

4 Threads - 1000 Aufgaben in 5 Sekunden

8 Threads - 1000 Aufgaben in 4 Sekunden

16 Threads - 1000 Aufgaben in 4 Sekunden



Jetzt Das Ergebnis liegt nahe an unseren Erwartungen. Die Leistung von 4 Threads wurde um das 4-fache erhöht. Ferner wurde die Leistungssteigerung praktisch gestoppt, da die Parallelisierung durch Prozessorressourcen begrenzt ist. Werfen wir einen Blick auf die Diagramme der Prozessorlast, die bei der Arbeit an 4 und 8 Threads über visualvm erfasst wurden.







Wie Sie den Grafiken entnehmen können, sind bei 4 Threads mehr als 50% der Prozessorressourcen frei, und bei 8 Threads wird der Prozessor zu fast 100% verwendet. Dies bedeutet, dass in diesem Beispiel 8 Threads die Grenze sind und die weitere Leistung nur abnimmt. In unserem Beispiel wurde das Leistungswachstum bereits bei 4 Threads gestoppt. Wenn die Threads jedoch anstelle der Berechnung der Fakultät synchrone E / A ausführen, kann die Parallelisierungsgrenze, bei der ein Leistungsgewinn erzielt wird, höchstwahrscheinlich erheblich erhöht werden. Leser können dies selbst überprüfen und das Ergebnis in die Kommentare zum Artikel schreiben.



Wenn wir über die Praxis sprechen, können zwei wichtige Punkte beachtet werden:



Die Parallelisierung ist normalerweise effektiv, wenn die Anzahl der Threads bis zum Zweifachen der Anzahl der Prozessorkerne beträgt (natürlich ohne andere Prozessorlast). Die



CPU-Auslastung sollte in der Praxis 80% nicht überschreiten, um Fehlertoleranz sicherzustellen



Reduzieren von Konflikten zwischen Threads



Nachdem wir uns über die Leistung unterhalten hatten, vergaßen wir eine wesentliche Sache. Durch Ändern des Aufrufs von RandomUtils.nextInt () im Code in eine Konstante haben wir die Geschäftslogik unserer Anwendung geändert. Kehren wir zum alten Algorithmus zurück und vermeiden Sie Leistungsprobleme. Wir haben herausgefunden, dass der Aufruf von RandomUtils.nextInt () dazu führt, dass jeder der Threads dieselbe Startvariable verwendet, um eine Zufallszahl zu generieren. In der Zwischenzeit ist dies völlig optional. Verwenden Sie in unserem Beispiel anstelle von



RandomUtils.nextInt(1, 25)


der ThreadLocalRandom-Klasse:



ThreadLocalRandom.current().nextInt(1, 25)


wird das Problem mit dem Wettbewerb lösen. Jetzt verwendet jeder Thread seine eigene Instanz der internen Variablen, die zum Generieren der nächsten Zufallszahl benötigt wird.



Die Verwendung einer separaten Variablen für jeden Thread anstelle des synchronisierten Zugriffs auf eine einzelne Instanz einer Klasse, die von Threads gemeinsam genutzt wird, ist eine gängige Technik zur Verbesserung der Leistung durch Reduzierung von Konflikten zwischen Threads. Die Klasse java.lang.ThreadLocal kann verwendet werden, um die Werte von Variablen im Kontext eines Threads zu speichern, obwohl es erweiterte Tools gibt, z. B. Mapped Diagnostic Context.



Abschließend möchte ich darauf hinweisen, dass die Reduzierung des Wettbewerbs zwischen Threads nicht nur eine technische, sondern auch eine logische Aufgabe ist. In unserem Beispiel kann jeder Thread problemlos seine eigene variable Instanz verwenden. Was ist jedoch, wenn wir eine Instanz für alle benötigen, z. B. einen gemeinsam genutzten Zähler? In diesem Fall müssten Sie den Algorithmus selbst umgestalten. Speichern Sie beispielsweise einen Zähler im Kontext jedes Streams und berechnen Sie regelmäßig oder auf Anfrage den Gesamtzählerwert basierend auf den Werten der Zähler für jeden Stream.



Fazit



Es gibt also drei Punkte, die sich auf die Leistung der Parallelverarbeitung auswirken:



  • CPU-Ressourcen
  • Wettbewerb zwischen Threads
  • Andere Faktoren, die indirekt das Gesamtergebnis beeinflussen



All Articles