Atomics.wait()jedoch blockiert, kann sie nicht im Hauptthread aufgerufen werden (wenn Sie dies versuchen, wird ein Fehler ausgegeben TypeError).
Die V8-Engine unterstützt seit Version 8.7 eine nicht blockierende Option
Atomics.wait()namens Atomics.waitAsync () . Diese neue Methode kann für den Hauptthread verwendet werden.
Heute zeigen wir Ihnen, wie Sie mit diesen Low-Level-APIs einen Mutex erstellen, der sowohl synchron (in Arbeitsthreads) als auch asynchron (in Arbeitsthreads oder im Hauptthread) ausgeführt werden kann.
Atomics.wait () und Atomics.waitAsync ()
Methoden
Atomics.wait()und Atomics.waitAsync()nehmen Sie die folgenden Parameter:
buffer: ein Array vom TypInt32ArrayoderBigInt64Array, das auf basiertSharedArrayBuffer.index: Der tatsächliche Index des Elements im Array.expectedValue: der Wert, von dem wir erwarten, dass er an der mitbufferund beschriebenen Stelle im Speicher dargestellt wirdindex.timeout: Zeitüberschreitung in Millisekunden (optional, Standard istInfinity).
Atomics.wait()gibt eine Zeichenfolge zurück. Wenn der erwartete Wert nicht im angegebenen Speicherort gefunden wird, wird er Atomics.wait()sofort beendet und gibt eine Zeichenfolge zurück not-equal. Andernfalls wird der Thread blockiert. Eines der folgenden Ereignisse muss eintreten, damit die Sperre aufgehoben wird. Der erste ist ein Aufruf eines anderen Threads einer Methode Atomics.notify()mit einer Angabe der Stelle im Speicher, an der die Methode interessiert ist Atomics.wait(). Der zweite ist der Ablauf des Timeouts. Im ersten Fall wird Atomics.wait()eine Zeichenfolge zurückgegeben ok, im zweiten Fall ein Zeichenfolgenwert timed-out.
Die Methode verwendet
Atomics.notify()die folgenden Parameter:
typedArray: ein Array vom TypInt32ArrayoderBigInt64Array, das auf basiertSharedArrayBuffer.index: Der tatsächliche Index des Elements im Array.count: Anzahl der Agenten, die auf eine Benachrichtigung warten (optionaler Parameter, standardmäßig eingestelltInfinity).
Die Methode
Atomics.notify()benachrichtigt die angegebene Anzahl von Agenten, die auf eine Benachrichtigung warten, an die beschriebene Adresse typedArrayund indexumgeht sie in FIFO-Reihenfolge. Wenn mehrere Anrufe getätigt wurden Atomics.wait()oder Atomics.waitAsync()denselben Speicherort überwachen, befinden sich alle in derselben Warteschlange.
Im Gegensatz zu einer Methode
Atomics.wait()gibt eine Methode Atomics.waitAsync()sofort einen Wert zurück, an dem sie aufgerufen wird. Dies kann einer der folgenden Werte sein:
{ async: false, value: 'not-equal' }- wenn der angegebene Speicherort nicht den erwarteten Wert enthält.{ async: false, value: 'timed-out' }- Nur wenn das Timeout auf 0 gesetzt ist.{ async: true, value: promise }- in anderen Fällen.
Ein Versprechen kann nach einiger Zeit erfolgreich durch einen Zeichenfolgenwert aufgelöst werden
ok(wenn eine Methode aufgerufen wurde Atomics.notify(), an die Informationen über den übergebenen Speicherplatz übergeben wurden Atomics.waitAsync()). Es kann mit einem Wert aufgelöst werden timed-out. Dieses Versprechen wird niemals abgelehnt.
Das folgende Beispiel zeigt die Grundlagen der Verwendung
Atomics.waitAsync():
const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const result = Atomics.waitAsync(i32a, 0, 0, 1000);
// | | ^ - ()
// | ^
// ^
if (result.value === 'not-equal') {
// SharedArrayBuffer .
} else {
result.value instanceof Promise; // true
result.value.then(
(value) => {
if (value == 'ok') { /* */ }
else { /* - */ }
});
}
// :
Atomics.notify(i32a, 0);
Lassen Sie uns nun darüber sprechen, wie Sie einen Mutex erstellen, der sowohl im synchronen als auch im asynchronen Modus verwendet werden kann. Es ist zu beachten, dass die Implementierung der synchronen Version des Mutex bereits erörtert wurde. Zum Beispiel - in diesem Material.
In diesem Beispiel wird der Parameter
timeoutbeim Aufrufen von Atomics.wait()und nicht verwendet Atomics.waitAsync(). Dieser Parameter kann verwendet werden, um Timeout-bezogene Bedingungen zu implementieren.
Unsere Klasse,
AsyncLockdie einen Mutex darstellt, arbeitet mit einem Puffer SharedArrayBufferund implementiert die folgenden Methoden:
lock(): blockiert den Thread, bis wir die Möglichkeit haben, den Mutex zu erfassen (gilt nur im Worker-Thread).unlock(): gibt den Mutex frei (dieser ist das Gegenteillock()).executeLocked(callback): versucht, die Sperre zu erlangen, ohne den Thread zu blockieren. Diese Methode kann für den Hauptthread verwendet werden. Es ist geplant, den Rückruf zu dem Zeitpunkt auszuführen, zu dem wir die Sperre erwerben können.
Lassen Sie uns einen Blick darauf werfen, wie diese Methoden implementiert werden können. Die Klassendeklaration enthält Konstanten und einen Konstruktor, der einen Puffer benötigt
SharedArrayBuffer.
class AsyncLock {
static INDEX = 0;
static UNLOCKED = 0;
static LOCKED = 1;
constructor(sab) {
this.sab = sab;
this.i32a = new Int32Array(sab);
}
lock() {
/* … */
}
unlock() {
/* … */
}
executeLocked(f) {
/* … */
}
}
Hier
i32a[0]enthält das Element den Wert LOCKEDoder UNLOCKED. Er repräsentiert außerdem den Ort in der Erinnerung, der interessiert Atomics.wait()und Atomics.waitAsync(). Die Klasse AsyncLockbietet die folgenden grundlegenden Funktionen:
i32a[0] == LOCKEDund der Thread befindet sich in einem Wartezustand (nach einem AnrufAtomics.wait()oderAtomics.waitAsync()) und wird beobachteti32a[0]. Er wird schließlich benachrichtigt.- Nachdem der Thread benachrichtigt wurde, versucht er, die Sperre zu erlangen. Wenn dies erfolgreich ist, wird es aufgerufen, wenn die Sperre aufgehoben wird
Atomics.notify().
Synchrones Sperren erfassen und freigeben
Betrachten Sie den Code für eine Methode
lock(), die nur von einem Arbeitsthread aufgerufen werden kann.
lock() {
while (true) {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* >>> */ AsyncLock.UNLOCKED,
/* >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
return;
}
Atomics.wait(this.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED); // <<< ,
}
}
Wenn eine Methode von einem Thread aufgerufen wird
lock(), versucht sie zunächst, die Sperre abzurufen Atomics.compareExchange(), um den Status der Sperre von UNLOCKEDauf zu ändern LOCKED. Die Methode Atomics.compareExchange()versucht, eine atomare Operation zum Ändern des Sperrstatus auszuführen und gibt den ursprünglichen Wert zurück, der sich im angegebenen Speicherbereich befindet. Wenn der ursprüngliche Wert war UNLOCKED, wird dies uns mitteilen, dass die Statusänderung erfolgreich war und dass der Thread die Sperre erhalten hat. Sie müssen nichts anderes tun.
Wenn
Atomics.compareExchange()der Status der Sperre nicht geändert werden konnte, bedeutet dies, dass ein anderer Thread die Sperre hält. Infolgedessen versucht der Thread, von dem aus die Methode aufgerufen wird lock(), die Methode zu verwendenAtomics.wait()um zu warten, bis die Sperre von einem anderen Thread aufgehoben wird. Wenn der erwartete Wert noch im interessierenden Speicherbereich gespeichert ist (in unserem Fall - AsyncLock.LOCKED), Atomics.wait()blockiert der Aufruf den Thread. Die Rückkehr von Atomics.wait()erfolgt nur, wenn ein anderer Thread aufruft Atomics.notify().
Die Methode
unlock()gibt die Sperre frei, indem sie sie in den Status versetzt UNLOCKEDund aufruft Atomics.notify(), um die Agenten zu benachrichtigen, die auf die Freigabe der Sperre warten. Es wird angenommen, dass eine Sperrzustandsänderungsoperation immer erfolgreich ist. Dies liegt daran, dass der Thread, der diese Operation ausführt, eine Sperre hält. Daher sollte zu diesem Zeitpunkt nichts anderes die Methode aufrufen unlock().
unlock() {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* >>> */ AsyncLock.LOCKED,
/* >>> */ AsyncLock.UNLOCKED);
if (oldValue != AsyncLock.LOCKED) {
throw new Error('Tried to unlock while not holding the mutex');
}
Atomics.notify(this.i32a, AsyncLock.INDEX, 1);
}
In einem typischen Fall geschieht alles so: Die Sperre ist frei und der Thread T1 erfasst sie und ändert ihren Status mit
Atomics.compareExchange(). Thread T2 versucht, die Sperre durch Aufrufen zu erhalten Atomics.compareExchange(), kann jedoch ihren Status nicht ändern. Wenn T2 aufruft Atomics.wait(), blockiert dieser Aufruf den Thread. Nach einiger Zeit gibt der Thread T1 die Sperre auf und ruft auf Atomics.notify(). Dies bewirkt, dass der Aufruf Atomics.wait()von T2 zurückkehrt okund der Thread T2 die Sperre verlässt. T2 versucht dann erneut, die Sperre zu erlangen. Diesmal gelingt es ihm.
Hier gibt es zwei Sonderfälle. Ihre Analyse zielt darauf ab, die Gründe dafür aufzuzeigen
Atomics.wait()und Atomics.waitAsync()am angegebenen Index des Array-Elements nach einem bestimmten Wert zu suchen. Dies sind die Fälle:
- T1 , T2 . T2 ,
Atomics.compareExchange(), . T1 , T2Atomics.wait(). T2Atomics.wait(),not-equal. T2 . - T1 , T2
Atomics.wait(). T1 , T2 (Atomics.wait())Atomics.compareExchange(). , T3, . .Atomics.compareExchange()T2 . T2Atomics.wait(), T3 .
Der letzte Sonderfall zeigt, dass unser Mutex nicht fair funktioniert. Es kann vorkommen, dass der Thread T2 auf die Freigabe der Sperre gewartet hat, T3 sie jedoch sofort nach der Freigabe erhalten hat. Eine Sperrimplementierung, die für den realen Gebrauch besser geeignet ist, kann mehrere Sperrzustände verwenden, um zwischen Situationen zu unterscheiden, in denen die Sperre einfach „erworben“ wurde und in denen „während der Erfassung ein Konflikt aufgetreten ist“.
Asynchrone Sperrenerfassung
Eine nicht blockierende Methode
executeLocked()kann im Gegensatz zu einer Methode lock()vom Hauptthread aus aufgerufen werden. Es empfängt als einziger Parameter einen Rückruf und plant den Rückruf nach erfolgreichem Erwerb der Sperre.
executeLocked(f) {
const self = this;
async function tryGetLock() {
while (true) {
const oldValue = Atomics.compareExchange(self.i32a, AsyncLock.INDEX,
/* >>> */ AsyncLock.UNLOCKED,
/* >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
f();
self.unlock();
return;
}
const result = Atomics.waitAsync(self.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED);
// ^ ,
await result.value;
}
}
tryGetLock();
}
Die innere Funktion
tryGetLock()versucht zunächst, die Sperre mit zu erlangen Atomics.compareExchange(). Wenn das Aufrufen dieser Methode zu einer erfolgreichen Änderung des Sperrstatus führt, kann die Funktion einen Rückruf aufrufen und dann die Sperre aufheben und beenden.
Wenn der Anruf
Atomics.compareExchange()das Sperren nicht zuließ, müssen wir es erneut versuchen, in dem Moment, in dem das Schloss wahrscheinlich frei sein wird. Wir können den Thread jedoch nicht blockieren und warten, bis die Sperre aufgehoben wird. Stattdessen Atomics.waitAsync()planen wir einen neuen Versuch , die Sperre mithilfe der Methode und des zurückgegebenen Versprechens zu erwerben.
Wenn es uns gelungen ist, die Methode auszuführen
Atomics.waitAsync(), wird das von dieser Methode zurückgegebene Versprechen aufgelöst, wenn der Thread, der die Sperre enthielt, aufgerufen wirdAtomics.notify()... Danach versucht der Thread, der die Sperre wie zuvor erhalten wollte, es erneut.
Hier sind die Sonderfälle möglich, die für die synchrone Version charakteristisch sind (die Sperre wird zwischen Anrufen
Atomics.compareExchange()und aufgehoben Atomics.waitAsync(); die Sperre wird von einem anderen Thread erfasst, und zwar zwischen den Momenten der Lösung des Versprechens und des Anrufs Atomics.compareExchange()). Daher muss dies in einem ähnlichen Code, der für reale Projekte gilt, berücksichtigt werden.
Ergebnis
In diesem Artikel sprachen wir über die Low-Level - Synchronisation Primitiven
Atomics.wait(), Atomics.waitAsync()und Atomics.notify(). Wir haben ein Beispiel für die Erstellung eines darauf basierenden Mutex analysiert, der sowohl im Hauptthread als auch in Arbeitsthreads verwendet werden kann.
Werden Atomics.wait (), Atomics.waitAsync () und Atomics.notify () in Ihren Projekten nützlich sein?