In diesem Artikel werden wir darüber sprechen, wie ein Orchestrator für unendliche Aufgaben mithilfe von Warteschlangen implementiert wird. Als letztes Ziel müssen wir ein System implementieren, das Aufgaben mit einer langen Lebensdauer verwalten kann, ein verteiltes System, in dem eine Gruppe von Aufgaben auf einem bestimmten Server gehostet wird. Wenn dieser Server ausfällt, werden die Aufgaben automatisch auf freie Aufgaben umverteilt.
In den meisten Fällen kommt es bei der gesamten Unternehmensentwicklung darauf an, die gleichen Anforderungen zu erfüllen: Eine Anwendung wird erstellt, abhängig von der Art der Anwendung, sie hat einen Lebenszyklus, am Ende der Lebensdauer der Anwendung erhalten wir (... oder erhalten nicht), was wir wollen. Mit einer Anwendung können wir alles aus einem Online-Kauf eines Produkts, einer Zahlungsanweisung oder der Berechnung der Flugbahn einer ballistischen Rakete meinen. Jede Anwendung hat ihre eigene Lebensweise. Wichtig ist die Lebensdauer. Je kürzer diese Zeit, desto besser. Mit anderen Worten, je schneller meine Überweisung abgeschlossen ist, desto besser. Die Anforderungen sind ebenfalls ähnlich, mehr RPC- Vorgänge pro Sekunde, weniger Latenz , das System sollte fehlertolerant, skalierbar und gestern betriebsbereit sein ... Es gibt eine Million Tools, Hunderte von Datenbanken, verschiedene Ansätze und Muster. Und alles ist schon lange geschrieben, wir müssen nur die vorgefertigten Technologien in unseren Projekten richtig einsetzen.
Das Thema Task-Orchestrierung ist nicht neu, aber zu meiner Überraschung gibt es einfach keine vorgefertigten Lösungen für die Verwaltung von unendlichen Aufgaben (deren Lebensdauer unbegrenzt ist), mit der Möglichkeit, Aufgaben auf aktive Server zu verteilen. Deshalb werden wir unsere eigene Lösung implementieren. Aber das Wichtigste zuerst….
, . — (Job), , . . , “”, . : , . , , , . “”- WebSocket , connected. , , , , . , “” Observer , , .
, , . :
, , .
, , .
, , . , , , , .
/, , ( , RAM ..), .
: N , . , , .
3 . #, . , C# .Net.
Task. . Task “”.
Schedulers. , . , , .
, , . , . RabbitMq, Framework - MassTransit, . .
Task
Task. , ( , ).
. , “Hello Word” :
public async Task SendEmailAsync(Email email, CancellationToken token)
{
//
}
, , await SendEmailAsync.
foreach (var email in emails
{
if(token.IsCancellationRequested)
break;
_emailSender.SendEmailAsync(email, token); // await
}
:
.
FireAndForget Exception .
.
, , .
await- , async/await .
email, , CancellationToken. , , , , . RetryPolicy , ?! , .
Schedulers
.NET , .
. , ( , — , , instance ) /Tasks. Hangfire, - UI, , . .
, Hangfire. BackgroundJob.Enqueue(Expression<Action> methodCall).
var jobIds = new List<string>();
foreach (var email in emails)
{
if(token.IsCancellationRequested)
break;
jobIds.Add(BackgroundJob.Enqueue(
async () => await _emailSender.SendEmailAsync(email, token)));
}
, , . RetryPolicy , . , , .
, . , “” :
_observer.DoWork(observerArg, new CancellationToken())
- , . BackgroundJobClient.
var client = new BackgroundJobClient(JobStorage.Current);
// , .
var state = new EnqueuedState(“unique-queue-name”);
client.Create(() =>_observer.DoWork(observerArg, new CancellationToken()), state);
, . - unique-queue-name.
// instance hangfire .
_server = new BackgroundJobServer(new BackgroundJobServerOptions()
{
WorkerCount = 10,
Queues = new[] { “unique-queue-name” },
ServerName = _serverOptions.ServerName
});
WorkerCount - , . , .
, , . : . , . Hangfire , , .
_monitoringApi = JobStorage.Current.GetMonitoringApi();
:
Observer-service - , , ( HangFilre WorkerCount ).
Observer-manager - , ... . , , . .
Scheduler common db – - , Hangfire MsSql, PostgreSql Redis.
— . “”.
, , , , , , .
, , . , . Hangfire. :
1) . , , .
2) . . , , , .
3) . custom-id, . - .
4) , “default” . , , . job-filters . , .
5) , , , . , , , framework .
, . , , , , .
, ,
, , , . . ? , — , , . , ? , . , , . , .
? “”. - PrefetchCount .
Ready.
Conumer , Unacked. Consumer .
, _Error .
acknowledged, Consumer.
- PrefetchCount , ( ), WorkerCount, Hangfire.
:
Observer-services, . PrefetchCount 1
. , . , , Unacked.
"”, :
Observer-services , , Round-robin.
msg1 . , “Observer 1”. Unacked , .
msg2 . “Observer 1” , , “Observer 2”.
, “Observer-service 1” , ( - “ ... ?”).
, , acknowledgement Unacked Ready. . , , .
- , . _Error, RetryPolicy. , .
RetryPolicy :
1000 .
5 1,4,10... .
int.MaxValue .
? “”, /. PrefetchCount, 10, 10 , . - , , . , 10 , 5 “”, , , 11- , .
? ? , , ... ?! , , "" , CancellationToken.
Manager. . , , . , . , , :
Id () - Guid .
Name (), , , .
CreatedAt/ModifyAt ( / ).
WorkersCount, PrefetchCount - , .
Manager .
Id |
Name |
WorkerCount |
CreatedAt |
ModifyAt |
IsDeleted |
{Unique id} |
Observer service 1 |
10 |
{some date} |
null |
false |
{Unique id} |
Observer service 2 |
10 |
{some date} |
null |
false |
{Unique id} |
Observer service 3 |
10 |
{some date} |
null |
false |
. , , 3 - .
, , , N . IsDeleted=true.
, (Kill –9, ). , Docker. , , . “”, , , . , - ….
“” API. ( , “State queue” ). “” , , , , - .
, , “”. , , , , .
, , “” Created.
Id |
Name |
CreatedAt |
ModifyAt |
ServiceId |
Status |
{Observer id} |
My_new_observer |
{created date} |
null |
null |
Created |
, , , Processing .
Id |
Name |
CreatedAt |
ModifyAt |
ServiceId |
Status |
{Observer id} |
My_new_observer |
{created date} |
{modify date} |
{Observer service 1 id} |
Processing |
“” .
:
Created
Processing
OnDeleting
Deleted
"", :
1) , CancellationToken.
2) , FanOut. , “” , .
, — , ... “ ”.
Observer-service , . , “” CancellationToken. “” .
“” . , id . , .
Created, “” . - , “”.
OnDeleting Deleted, - “” , .
Processing, “” OnDeleting . . , “”, CancellationToken “state queue”. , OnDeleting Deleted.
Id |
Name |
CreatedAt |
ModifyAt |
ServiceId |
Status |
{Observer id} |
My_new_observer |
{created date} |
{modify date} |
{Observer service 1 id} |
Deleted |
:
1) .
, . , - MsSql, RabbitMq, Kafka, Kubernetes , , SLA . , . - , .
2) blackout, .
, - , , , , , “”, . “”, . ( , .)
3) .
, "”, . "”, , .
4) . , "”.
. - , , .
5) “”, , .
, , “” . . . , , , , , .
, . , , - Unacked, - Ready. , , polling , . - "”, . , , , PrefetchCount. , , .