System.Threading.Channels - ein leistungsstarker Produzent-Konsument und Asynchron ohne Zuordnungen und Stack-Tauchgänge

Hallo wieder. Vor einiger Zeit schrieb ich über ein anderes wenig bekanntes Tool für Hochleistungshobbyisten - System.IO.Pipelines . Im Kern basiert das betrachtete System.Threading.Channels (im Folgenden „Kanäle“) auf ähnlichen Prinzipien wie Pipelines und löst das gleiche Problem - Produzent-Verbraucher. Es hat jedoch eine viel einfachere API, die sich elegant in jede Art von Unternehmenscode einfügt. Gleichzeitig wird Asynchronität ohne Zuordnungen und ohne Stack-Dive verwendet, auch im asynchronen Fall! (Nicht immer, aber oft).







Inhaltsverzeichnis







Einführung



Die Aufgabe Produzent / Konsument wird auf dem Weg der Programmierer ziemlich oft und nicht in den ersten zehn Jahren angetroffen. Edsger Dijkstra selbst war an der Lösung dieses Problems beteiligt - er kam auf die Idee, Semaphoren zu verwenden, um Threads zu synchronisieren, wenn die Arbeit auf Produzenten- / Konsumentenbasis organisiert wird. Und obwohl seine Lösung in seiner einfachsten Form bekannt und eher trivial ist, kann dieses Muster (Produzent / Konsument) in der realen Welt in einer viel komplizierteren Form auftreten. Auch moderne Programmierstandards hinterlassen ihre Spuren, der Code wird vereinfacht geschrieben und zur weiteren Wiederverwendung aufgeschlüsselt. Alles wird getan, um den Schwellenwert für das Schreiben von qualitativ hochwertigem Code zu senken und diesen Prozess zu vereinfachen. Der betreffende Namespace - System.Threading.Channels - ist ein weiterer Schritt in Richtung dieses Ziels.



Vor einiger Zeit habe ich mir System.IO.Pipelines angesehen. Es erforderte eine aufmerksamere Arbeit und ein tiefes Verständnis der Materie, Span und Memory wurden verwendet, und für eine effiziente Arbeit war es erforderlich, keine offensichtlichen Methoden aufzurufen (um unnötige Speicherzuweisungen zu vermeiden) und ständig in Bytes zu denken. Aus diesem Grund war die Pipeline-Programmierschnittstelle nicht trivial und nicht intuitiv.



In System.Threading.Channels wird dem Benutzer eine viel einfachere API zur Verfügung gestellt, mit der er arbeiten kann. Es ist erwähnenswert, dass dieses Tool trotz der Einfachheit der API stark optimiert ist und höchstwahrscheinlich während seiner Arbeit keinen Speicher zuweist. Möglicherweise liegt dies an der Tatsache, dass ValueTask unter der Haube weit verbreitet ist , und selbst bei echter Asynchronität wird IValueTaskSource verwendet, die für weitere Operationen wiederverwendet wird. Dies ist genau das ganze Interesse an der Implementierung der Kanäle.



Kanäle werden verallgemeinert. Die Art der Verallgemeinerung ist, wie Sie sich vorstellen können, der Typ, dessen Instanzen erzeugt und verbraucht werden. Interessanterweise die Implementierung der Channel-Klasse, die in eine Zeile passt ( Github- Quelle ):



namespace System.Threading.Channels
{
    public abstract class Channel<T> : Channel<T, T> { }
}


Somit wird die Hauptklasse von Kanälen durch zwei Typen parametrisiert - getrennt für den Erzeugerkanal und den Verbraucherkanal. Für realisierte Kanäle wird dies jedoch nicht verwendet.

Für diejenigen, die mit den Pipelines vertraut sind, wird der allgemeine Ansatz für den Einstieg vertraut erscheinen. Nämlich. Wir erstellen eine zentrale Klasse, aus der wir Produzenten ( ChannelWriter ) und Konsumenten ( ChannelReader ) getrennt herausziehen . Trotz der Namen ist daran zu erinnern, dass dies der Produzent / Konsument ist und nicht der Leser / Schreiber einer anderen klassischen Multithreading-Aufgabe mit demselben Namen. ChannelReader ändert den Status des allgemeinen Kanals (zieht den Wert heraus), der nicht mehr verfügbar ist. Also liest er lieber nicht, sondern konsumiert. Aber wir werden die Implementierung später kennenlernen.



Arbeitsbeginn. Kanal



Der Einstieg in Channels beginnt mit der abstrakten Channel <T> -Klasse und der statischen Channel- Klasse , wodurch die am besten geeignete Implementierung erstellt wird. Außerdem können Sie von diesem gemeinsamen Kanal einen ChannelWriter zum Schreiben in den Kanal und einen ChannelReader zum Verbrauch vom Kanal erhalten. Ein Kanal ist ein Repository mit allgemeinen Informationen für ChannelWriter und ChannelReader. Es sind also alle darin gespeicherten Daten. Und bereits ist die Logik ihrer Aufzeichnung oder ihres Verbrauchs in ChannelWriter und ChannelReader verteilt. Herkömmlicherweise können Kanäle in zwei Gruppen unterteilt werden - unbegrenzt und begrenzt. Die ersten sind einfacher zu implementieren, Sie können sie unendlich schreiben (solange der Speicher dies zulässt). Die zweiten sind durch einen bestimmten Maximalwert der Anzahl der Datensätze begrenzt.



Hier unterscheidet sich die Art der Asynchronität geringfügig. Bei unbegrenzten Kanälen wird der Aufnahmevorgang immer synchron beendet, da nichts die Aufnahme vom Kanal stoppen kann. Bei begrenzten Kanälen ist die Situation anders. Bei Standardverhalten (das überschrieben werden kann) wird der Schreibvorgang synchron abgeschlossen, solange im Kanal Platz für neue Instanzen vorhanden ist. Sobald der Kanal voll ist, wird der Schreibvorgang erst beendet, wenn Speicherplatz frei ist (nachdem der Verbraucher den verbrauchten verbraucht hat). Daher ist die Operation hier wirklich asynchron mit der Änderung von Flüssen und damit verbundenen Änderungen (oder ohne eine Änderung, die später beschrieben wird).



Das Verhalten der Leser ist größtenteils das gleiche - wenn sich etwas im Kanal befindet, liest der Leser es einfach und endet synchron. Wenn es nichts gibt, erwartet er, dass jemand etwas aufschreibt.



Die statische Klasse "Kanal" enthält 4 Methoden zum Erstellen der oben genannten Kanäle:



Channel<T> CreateUnbounded<T>();
Channel<T> CreateUnbounded<T>(UnboundedChannelOptions options);
Channel<T> CreateBounded<T>(int capacity);
Channel<T> CreateBounded<T>(BoundedChannelOptions options);


Wenn Sie möchten, können Sie genauere Optionen zum Erstellen eines Kanals angeben, um ihn für die angegebenen Anforderungen zu optimieren.



UnboundedChannelOptions enthält 3 Eigenschaften, deren Standardwert false ist:



  1. AllowSynchronousContinuations — , , . -. , . , , , . , , , . , - - , ;
  2. SingleReader — , . , ;
  3. SingleWriter — , ;


BoundedChannelOptions enthält die gleichen 3 Eigenschaften und 2 weitere darüber



  1. AllowSynchronousContinuations - das gleiche;
  2. SingleReader - das gleiche;
  3. SingleWriter - das gleiche;
  4. Kapazität - Die Anzahl der im Kanal platzierten Datensätze. Dieser Parameter ist auch ein Konstruktorparameter.
  5. FullMode - Die BoundedChannelFullMode-Enumeration mit 4 Optionen bestimmt das Verhalten beim Versuch, auf einen vollständigen Kanal zu schreiben:

    • Warten - Wartet auf freien Speicherplatz, um den asynchronen Vorgang abzuschließen
    • DropNewest - Das Element, das geschrieben wird, überschreibt das neueste vorhandene Element und endet synchron
    • DropOldest - Das Element, das geschrieben wird, überschreibt das älteste vorhandene Element und endet synchron
    • DropWrite - Das zu schreibende Element wird nicht geschrieben, es endet synchron




Abhängig von den übergebenen Parametern und der aufgerufenen Methode wird eine von drei Implementierungen erstellt: SingleConsumerUnboundedChannel , UnboundedChannel , BoundedChannel . Dies ist jedoch nicht so wichtig, da wir den Kanal über die Basisklasse Channel <TWrite, TRead> verwenden werden.



Es hat 2 Eigenschaften:



  • ChannelReader <TRead> Reader {get; geschütztes Set; }}
  • ChannelWriter <TWrite> Writer {get; geschütztes Set; }}


Außerdem 2 Operatoren für implizites Casting in ChannelReader <TRead> und ChannelWriter <TWrite>.



Ein Beispiel für den Einstieg in Kanäle:



Channel<int> channel = Channel.CreateUnbounded<int>();
//  
ChannelWriter<int> writer = channel.Writer;
ChannelReader<int> reader = channel.Reader; 
// 
ChannelWriter<int> writer = channel;
ChannelReader<int> reader = channel;


Die Daten werden in einer Warteschlange gespeichert. Für 3 Typen werden 3 verschiedene Warteschlangen verwendet - ConcurrentQueue <T>, Deque <T> und SingleProducerSingleConsumerQueue <T>. Zu diesem Zeitpunkt schien es mir, dass ich veraltet war und eine Reihe neuer einfacher Kollektionen verpasst hatte. Aber ich beeile mich aufzuregen - sie sind nicht jedermanns Sache. Sie sind als intern gekennzeichnet und können daher nicht verwendet werden. Aber wenn Sie sie plötzlich am Produkt benötigen, finden Sie sie hier (SingleProducerConsumerQueue) und hier (Deque) . Die Implementierung des letzteren ist recht einfach. Ich rate Ihnen, es zu lesen, Sie können es sehr schnell studieren.



Kommen wir also zu ChannelReader und ChannelWriter sowie zu interessanten Implementierungsdetails. Sie alle laufen auf asynchrone, keine Speicherzuweisung mit IValueTaskSource hinaus.



ChannelReader - Verbraucher



Wenn ein Consumer-Objekt angefordert wird, wird eine der Implementierungen der abstrakten ChannelReader <T> -Klasse zurückgegeben. Im Gegensatz zu Pipelines sind APIs einfach und es gibt nur wenige Methoden. Sie müssen nur die Liste der Methoden kennen, um zu verstehen, wie sie in der Praxis angewendet werden.



Methoden:



  1. Virtuelle Get-Only-Eigenschaft Task Completion {get; } Ein

    Objekt vom Typ Task, das abgeschlossen wird, wenn der Kanal geschlossen wird.
  2. Virtuelle Nur-Get-Eigenschaft int Count {get; }

    Hierbei ist zu beachten, dass die aktuelle Anzahl der zum Lesen verfügbaren Objekte zurückgegeben wird.
  3. Virtuelle get-only-Eigenschaft bool CanCount {get; }

    Gibt an, ob die Count-Eigenschaft verfügbar ist.
  4. bool TryRead(out T item)

    . bool, , . out ( null, );
  5. ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default)

    ValueTask true, , . ValueTask false, ( );
  6. ValueTask<T> ReadAsync(CancellationToken cancellationToken = default)

    . , . .



    , TryRead WaitToReadAsync. ( cancelation tokens), — TryRead. , while(true) WaitToReadAsync. true, , TryRead. TryRead , , . — , WaitToReadAsync, , , .

    , , - .




ChannelWriter - Hersteller



Alles ist ähnlich wie beim Verbraucher. Schauen wir uns also gleich die Methoden an:



  1. Virtuelle Methode bool TryComplete (Ausnahme? Fehler = null)

    Versuche, den Kanal als vollständig zu markieren, d. H. zeigen, dass keine Daten mehr darauf geschrieben werden. Die Ausnahme, die die Beendigung des Kanals verursacht hat, kann als optionaler Parameter übergeben werden. Gibt true zurück, wenn es erfolgreich abgeschlossen wurde, andernfalls false (wenn der Kanal bereits abgeschlossen wurde oder die Beendigung nicht unterstützt).
  2. Abstrakte Methode bool TryWrite (T-Element)

    Versucht, einen Wert in den Kanal zu schreiben. Gibt true zurück, wenn erfolgreich, und false, wenn nicht
  3. Abstrakte Methode ValueTask <bool> WaitToWriteAsync (CancellationToken CancellationToken = Standard)

    Gibt eine ValueTask mit einem wahren Wert zurück, der abgeschlossen wird, wenn auf dem Kanal Platz zum Schreiben vorhanden ist. Der Wert false ist, wenn die Einträge im Kanal nicht mehr zulässig sind.
  4. Virtuelle Methode ValueTask WriteAsync (T-Element, CancellationToken CancellationToken = Standard)

    Schreibt asynchron in den Kanal. Wenn der Kanal beispielsweise voll ist, ist der Vorgang erst dann wirklich asynchron und abgeschlossen, wenn Speicherplatz für diesen Datensatz freigegeben wurde.
  5. Methode void abgeschlossen (Ausnahme? Fehler = null)

    Versucht nur, den Kanal mit TryComplete als abgeschlossen zu markieren, und löst im Fehlerfall eine Ausnahme aus.


Ein kleines Beispiel für das Obige (um ganz einfach eigene Experimente zu starten):



Channel<int> unboundedChannel = Channel.CreateUnbounded<int>();

//      ,        
ChannelWriter<int> writer = unboundedChannel;
ChannelReader<int> reader = unboundedChannel;

//     
int objectToWriteInChannel = 555;
await writer.WriteAsync(objectToWriteInChannel);
//  ,     ,   ,  
writer.Complete();

//         
int valueFromChannel = await reader.ReadAsync();


Kommen wir nun zum interessantesten Teil.



Asynchronität ohne Zuordnungen



Beim Schreiben und Studieren des Codes wurde mir klar, dass die Implementierung all dieser Operationen fast nichts Interessantes ist. Im Allgemeinen kann beschrieben werden, dass unnötige Sperren durch wettbewerbsfähige Sammlungen und die häufige Verwendung von ValueTask vermieden werden, einer Struktur, die Speicher spart. Ich möchte Sie jedoch schnell daran erinnern, dass es sich nicht lohnt, alle Dateien auf Ihrem PC schnell zu ersetzen und alle Aufgaben durch ValueTask zu ersetzen. Dies ist nur in Fällen sinnvoll, in denen der Vorgang in den meisten Fällen synchron abgeschlossen wird. Wie wir uns erinnern, ist bei Asynchronität eine Änderung des Flusses sehr wahrscheinlich, was bedeutet, dass der Stapel nicht mehr derselbe ist wie zuvor. Wie auch immer, ein echter Performance-Profi weiß - optimieren Sie nicht, bevor Probleme auftreten.



Eine gute Sache ist, dass ich mich nicht als Profi registrieren werde. Daher ist es an der Zeit herauszufinden, was das Geheimnis des Schreibens von asynchronem Code ohne Speicherzuweisungen ist, was auf den ersten Blick zu gut für die Wahrheit klingt. Aber es passiert auch.



IValueTaskSource-Schnittstelle



Wir beginnen unsere Reise mit den Ursprüngen - der ValueTask- Struktur , die zu .net Core 2.0 hinzugefügt und in 2.1 ergänzt wurde. Innerhalb dieser Struktur gibt es ein kniffliges Objekt _obj Feld. Anhand des sprechenden Namens ist leicht zu erraten, dass eines von drei Dingen in diesem Feld ausgeblendet werden kann - null, Task / Task <T> oder IValueTaskSource. Tatsächlich ergibt sich dies aus der Art und Weise, wie ValueTask erstellt wird.



Wie der Hersteller versichert, sollte diese Struktur nur offensichtlich verwendet werden - mit dem Schlüsselwort await. Das heißt, Sie sollten nicht viele Male auf dieselbe ValueTask warten, Kombinatoren verwenden, mehrere Fortsetzungen hinzufügen usw. Außerdem sollten Sie das Ergebnis nicht mehr als einmal von ValueTask erhalten. Und dies hängt genau mit dem zusammen, was wir zu verstehen versuchen - indem wir all dieses Zeug wiederverwenden, ohne Speicher zuzuweisen.



Ich habe bereits die IValueTaskSource- Schnittstelle erwähnt . Er hilft, Speicher zu retten. Dies erfolgt durch mehrmaliges Wiederverwenden von IValueTaskSource selbst für viele Aufgaben. Aber gerade wegen dieser Wiederverwendung gibt es keine Möglichkeit, sich ValueTask hinzugeben.



Also IValueTaskSource. Diese Schnittstelle verfügt über drei Methoden, mit deren Hilfe Sie erfolgreich Speicher und Zeit beim Zuweisen dieser geschätzten Bytes sparen.



  1. GetResult - Wird einmal aufgerufen, wenn in der Zustandsmaschine, die zur Laufzeit für asynchrone Methoden erstellt wurde, das Ergebnis benötigt wird. ValueTask verfügt über eine GetResult-Methode, die die gleichnamige Schnittstellenmethode aufruft, die, wie wir uns erinnern, im Feld _obj gespeichert werden kann.
  2. GetStatus - Wird von der Zustandsmaschine aufgerufen, um den Status der Operation zu ermitteln. Auch über ValueTask.
  3. OnCompleted - Die Zustandsmaschine ruft sie erneut auf, um der zu diesem Zeitpunkt noch nicht abgeschlossenen Aufgabe eine Fortsetzung hinzuzufügen.


Trotz der einfachen Benutzeroberfläche erfordert die Implementierung einige Kenntnisse. Und hier können wir uns erinnern, womit wir begonnen haben - Kanäle . Diese Implementierung verwendet die AsyncOperation- KlasseDies ist eine Implementierung von IValueTaskSource. Diese Klasse ist hinter dem internen Zugriffsmodifikator versteckt. Dies beeinträchtigt jedoch nicht das Verständnis der grundlegenden Mechanismen. Die Frage ist, warum nicht die Massen die Implementierung von IValueTaskSource geben? Der erste Grund (zum Spaß) ist, wenn ein Hammer in der Hand ist, Nägel überall sind, wenn eine IValueTaskSource-Implementierung in den Händen ist, gibt es überall eine Analphabetenarbeit mit Gedächtnis. Der zweite Grund (plausibler) ist, dass die Benutzeroberfläche zwar einfach und universell ist, die tatsächliche Implementierung jedoch bei Verwendung bestimmter Anwendungsnuancen optimal ist. Und wahrscheinlich ist es aus diesem Grund möglich, Implementierungen in verschiedenen Teilen des großen und leistungsstarken .net zu finden, wie z. B. AsyncOperation unter der Haube von Kanälen, AsyncIOOperation in der neuen Socket-API usw.

Fairerweise gibt es jedoch noch eine gemeinsame Implementierung -ManualResetValueTaskSourceCore . Dies ist aber schon zu weit vom Thema des Artikels entfernt.



CompareExchange



Eine sehr beliebte Methode einer beliebten Klasse, die den Overhead klassischer Synchronisationsprimitive vermeidet. Ich denke, die meisten sind damit vertraut, aber es lohnt sich immer noch, es in drei Worten zu beschreiben, da diese Konstruktion in AsyncOperation ziemlich häufig verwendet wird.

In der Literatur wird diese Funktion als Compare and Swap (CAS) bezeichnet. In .net ist es in der Interlocked- Klasse verfügbar .



Die Signatur lautet wie folgt:



public static T CompareExchange<T>(ref T location1, T value, T comparand) where T : class;


Es gibt auch Überladungen mit int, long, float, double, IntPtr, object.



Die Methode selbst ist atomar, dh sie wird ohne Unterbrechungen ausgeführt. Vergleicht 2 Werte und führt, wenn sie gleich sind, die Zuweisung eines neuen Werts zu einer Variablen durch. Sie lösen das Problem, wenn Sie den Wert einer Variablen überprüfen und die Variable abhängig davon ändern müssen.



Angenommen, Sie möchten eine Variable inkrementieren, wenn ihr Wert kleiner als 10 ist.



Dann gibt es 2 Threads.



Stream 1 Stream 2
Überprüft den Wert einer Variablen auf eine Bedingung (dh weniger als 10), die ausgelöst wird - -
Zwischen Prüfung und Wertänderung Weist einer Variablen einen Wert zu, der eine Bedingung nicht erfüllt (z. B. 15).
Ändert den Wert, sollte es aber nicht, da die Bedingung nicht mehr erfüllt ist - -




Bei Verwendung dieser Methode ändern Sie entweder genau den gewünschten Wert oder nicht, während Sie den tatsächlichen Wert der Variablen erhalten.



location1 ist eine Variable, deren Wert wir ändern möchten. Es wird mit compare verglichen und im Falle der Gleichheit wird der Wert in location1 geschrieben. Wenn die Operation erfolgreich ist, gibt die Methode den letzten Wert der Variablen location1 zurück. Wenn nicht, wird der aktuelle Wert von location1 zurückgegeben.

Wenn Sie etwas tiefer sprechen, gibt es eine Anweisung in der Assemblersprache cmpxchg, die diese Aktionen ausführt. Sie wird unter der Haube benutzt.



Stapeltauchen



Beim Betrachten des gesamten Codes bin ich mehr als einmal auf Verweise auf "Stack Dive" gestoßen. Dies ist eine sehr coole und interessante Sache, die eigentlich sehr unerwünscht ist. Das Fazit ist, dass wir durch die synchrone Ausführung von Fortsetzungen die Stapelressourcen erschöpfen können.



Nehmen wir an, wir haben 10.000 Aufgaben mit Stil



//code1
await ...
//code2


Angenommen, die erste Aufgabe schließt die Ausführung ab und gibt dadurch die Fortsetzung der zweiten frei, die wir sofort synchron in diesem Thread ausführen, dh ein Stück des Stapels mit dem Rahmen dieser Fortsetzung nehmen. Diese Fortsetzung wird wiederum die Fortsetzung der dritten Aufgabe aufheben, die wir ebenfalls sofort ausführen. Usw. Wenn in der Fortsetzung keine Wartezeiten mehr vorhanden sind oder etwas, das den Stapel irgendwie fallen lässt, verbrauchen wir einfach den Stapelspeicherplatz vollständig. Was könnte StackOverflow und Anwendungsabsturz verursachen. In der Codeüberprüfung werde ich erwähnen, wie AsyncOperation dies bekämpft.



AsyncOperation als IValueTaskSource-Implementierung



Quellcode .



In AsyncOperation gibt es ein _continuation-Feld vom Typ Action <Objekt>. Das Feld wird für Fortsetzungen verwendet, ob Sie es glauben oder nicht. Wie es bei zu modernem Code häufig der Fall ist, haben Felder zusätzliche Verantwortlichkeiten (wie der Garbage Collector und das letzte Bit in der Methodentabellenreferenz). Feld _Fortsetzung aus derselben Serie. Neben der Fortsetzung selbst und null können in diesem Feld zwei spezielle Werte gespeichert werden. s_availableSentinel und s_completedSentinel . Diese Felder zeigen an, dass der Vorgang verfügbar bzw. abgeschlossen ist. Es kann nur zur Wiederverwendung für einen vollständig asynchronen Vorgang aufgerufen werden.



Außerdem implementiert AsyncOperation IThreadPoolWorkItemmit einer einzigen Methode - void Execute () => SetCompletionAndInvokeContinuation (). Die SetCompletionAndInvokeContinuation-Methode führt die Fortsetzung durch. Diese Methode wird entweder direkt im AsyncOperation-Code oder über die erwähnte Ausführung aufgerufen. Schließlich können Typen, die IThreadPoolWorkItem implementieren, wie folgt in den Thread-Pool geworfen werden. ThreadPool.UnsafeQueueUserWorkItem (this, prepareLocal: false).



Die Execute-Methode wird vom Thread-Pool ausgeführt.



Die Ausführung der Fortsetzung selbst ist ziemlich trivial.



Die Fortsetzung von _continuation wird in eine lokale Variable kopiert, s_completedSentinel wird an seiner Stelle geschrieben- ein künstliches Puppenobjekt (oder ein Wachposten, ich weiß nicht, wie ich in unserer Rede mit mir sprechen soll), der anzeigt, dass die Aufgabe abgeschlossen ist. Und dann wird einfach eine lokale Kopie der realen Fortsetzung ausgeführt. Wenn ein ExecutionContext vorhanden ist, werden diese Aktionen in den Kontext gestellt. Hier gibt es kein Geheimnis. Dieser Code kann entweder direkt von der Klasse aufgerufen werden - einfach durch Aufrufen der Methode, die diese Aktionen kapselt, oder über die IThreadPoolWorkItem-Schnittstelle im Thread-Pool. Jetzt können Sie erraten, wie die Funktion mit Fortsetzungsausführung synchron funktioniert.



Die erste Methode der IValueTaskSource-Schnittstelle ist GetResult ( github ).



Es ist einfach, er:



  1. _currentId.

    _currentId — , . . ;
  2. _continuation - s_availableSentinel. , , AsyncOperation . , (pooled = true);
  3. _result.

    _result TrySetResult .


TrySetResult-Methode ( Github ).



Die Methode ist trivial. - Es speichert den akzeptierten Parameter in _result und signalisiert die Vervollständigung, nämlich die SignalCompleteion- Methode , was sehr interessant ist.



SignalCompletion ( Github ) -Methode .



Diese Methode verwendet alles, worüber wir am Anfang gesprochen haben.



Ganz am Anfang, wenn _continuation == null ist, schreiben wir die s_completedSentinel-Puppe.



Ferner kann das Verfahren in 4 Blöcke unterteilt werden. Um das Verständnis des Schemas zu vereinfachen, muss ich sofort sagen, dass der 4. Block nur die synchrone Ausführung der Fortsetzung ist. Das heißt, die triviale Ausführung der Fortsetzung durch die Methode, wie ich im Abschnitt über IThreadPoolWorkItem beschrieben habe.



  1. _schedulingContext == null, .. ( if).

    _runContinuationsAsynchronously == true, , — ( if).

    IThreadPoolWorkItem . AsyncOperation . .

    , if ( , ), , 2 3 , — .. 4 ;
  2. _schedulingContext is SynchronizationContext, ( if).

    _runContinuationsAsynchronously = true. . , , . , . 2 , :

    sc.Post(s => ((AsyncOperation<TResult>)s).SetCompletionAndInvokeContinuation(), this);
    


    . , , ( , ), 4 — ;
  3. , 2 . .

    , _schedulingContext TaskScheduler, . , 2, .. _runContinuationsAsynchronously = true TaskScheduler . , Task.Factory.StartNew . .
  4. — . , .


Die zweite Methode der IValueTaskSource-Schnittstelle ist GetStatus ( Github ).

Genau wie ein Petersburger Donut.



Wenn _continuation! = _CompletedSentinel, dann ValueTaskSourceStatus.Pending zurückkehren

Wenn Fehler == null, dann wieder ValueTaskSourceStatus.Succeeded

Wenn _error.SourceException OperationCanceledException ist, gehen dann zurück ValueTaskSourceStatus.Canceled

Nun, da hier viel kam, Rückkehr ValueTaskSourceStatus.Faulted



dritte und letzte Die komplexeste Methode der IValueTaskSource-Schnittstelle ist jedoch OnCompleted ( github ).



Die Methode fügt eine Fortsetzung hinzu, die nach Abschluss ausgeführt wird.



Erfasst bei Bedarf ExecutionContext und SynchronizationContext.



Als nächstes wird Interlocked.CompareExchange , wie oben beschrieben, verwendet, um die Fortsetzung im Feld zu speichern und mit null zu vergleichen. Zur Erinnerung: CompareExchange gibt den tatsächlichen Wert einer Variablen zurück.



Wenn das Speichern der Fortsetzung abgeschlossen ist, wird der Wert, der sich vor der Aktualisierung in der Variablen befand, zurückgegeben, dh null. Dies bedeutet, dass der Vorgang zum Zeitpunkt der Aufzeichnung der Fortsetzung nicht abgeschlossen war. Und derjenige, der es selbst fertigstellt, wird es herausfinden (wie wir oben gesehen haben). Und es macht für uns keinen Sinn, zusätzliche Aktionen durchzuführen. Und hier endet die Arbeit der Methode.



Wenn das Speichern des Werts nicht funktioniert hat, wird von CompareExchange etwas anderes als null zurückgegeben. In diesem Fall hat es jemand geschafft, den Wert schneller als wir einzubringen. Das heißt, eine von zwei Situationen ist aufgetreten - entweder wurde die Aufgabe schneller erledigt als wir hier erreicht haben, oder es wurde versucht, mehr als eine Fortsetzung aufzuzeichnen, was nicht durchgeführt werden sollte.



Daher überprüfen wir den zurückgegebenen Wert, ob er gleich s_completedSentinel ist - er würde geschrieben, wenn er abgeschlossen wäre.



  • Wenn dies nicht s_completedSentinel ist , wurden wir nicht planmäßig verwendet - sie haben versucht, mehr als eine Fortsetzung hinzuzufügen. Das heißt, derjenige, der bereits niedergeschrieben wurde, und derjenige, den wir schreiben. Und das ist eine Ausnahmesituation;
  • s_completedSentinel, , , . , _runContinuationsAsynchronously = false.

    , , OnCompleted, awaiter'. . , AsyncOperation — System.Threading.Channels. , . , . , , ( ) . , awaiter' , , . awaiter'.

    Um diese Situation trotz allem zu vermeiden, muss die Fortsetzung asynchron gestartet werden. Es wird nach denselben Schemata wie die ersten drei Blöcke in der SignalCompleteion-Methode ausgeführt - nur in einem Pool, in einem Kontext oder über eine Factory und einen Scheduler


Hier ist ein Beispiel für synchrone Fortsetzungen:



class Program
    {
        static async Task Main(string[] args)
        {
            Channel<int> unboundedChannel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions
            {
                AllowSynchronousContinuations = true
            });

            ChannelWriter<int> writer = unboundedChannel;
            ChannelReader<int> reader = unboundedChannel;

            Console.WriteLine($"Main, before await. Thread id: {Thread.CurrentThread.ManagedThreadId}");

            var writerTask = Task.Run(async () =>
            {
                Thread.Sleep(500);
                int objectToWriteInChannel = 555;
                Console.WriteLine($"Created thread for writing with delay, before await write. Thread id: {Thread.CurrentThread.ManagedThreadId}");
                await writer.WriteAsync(objectToWriteInChannel);
                Console.WriteLine($"Created thread for writing with delay, after await write. Thread id: {Thread.CurrentThread.ManagedThreadId}");
            });

            //Blocked here because there are no items in channel
            int valueFromChannel = await reader.ReadAsync();
            Console.WriteLine($"Main, after await (will be processed by created thread for writing). Thread id: {Thread.CurrentThread.ManagedThreadId}");

            await writerTask;

            Console.Read();
        }
    }


Ausgabe:



Main, bevor Sie warten. Thread-ID: 1

Erstellt einen Thread zum verzögerten Schreiben, bevor auf das Schreiben gewartet wird. Thread-ID: 4

Main, nach dem Warten (wird vom erstellten Thread zum Schreiben verarbeitet). Thread-ID: 4

Thread zum verzögerten Schreiben erstellt, nachdem auf das Schreiben gewartet wurde. Thread-ID: 4



All Articles