Orchestrator endloser Aufgaben

In diesem Artikel werden wir darĂĽber sprechen, wie ein Orchestrator fĂĽr unendliche Aufgaben mithilfe von Warteschlangen implementiert wird. Als letztes Ziel mĂĽssen wir ein System implementieren, das Aufgaben mit einer langen Lebensdauer verwalten kann, ein verteiltes System, in dem eine Gruppe von Aufgaben auf einem bestimmten Server gehostet wird. Wenn dieser Server ausfällt, werden die Aufgaben automatisch auf freie Aufgaben umverteilt. 





In den meisten Fällen kommt es bei der gesamten Unternehmensentwicklung darauf an, die gleichen Anforderungen zu erfĂĽllen: Eine Anwendung wird erstellt, abhängig von der Art der Anwendung, sie hat einen Lebenszyklus, am Ende der Lebensdauer der Anwendung erhalten wir (... oder erhalten nicht), was wir wollen. Mit einer Anwendung können wir alles aus einem Online-Kauf eines Produkts, einer Zahlungsanweisung oder der Berechnung der Flugbahn einer ballistischen Rakete meinen. Jede Anwendung hat ihre eigene Lebensweise. Wichtig ist die Lebensdauer. Je  kĂĽrzer diese Zeit, desto besser. Mit anderen Worten, je schneller meine Ăśberweisung abgeschlossen ist, desto besser. Die Anforderungen sind ebenfalls ähnlich, mehr  RPC-  Vorgänge pro Sekunde, weniger  Latenz , das System sollte fehlertolerant, skalierbar und gestern betriebsbereit sein ... Es gibt eine Million Tools, Hunderte von Datenbanken, verschiedene Ansätze und Muster. Und alles ist schon lange geschrieben, wir mĂĽssen nur die vorgefertigten Technologien in unseren Projekten richtig einsetzen. 





Das Thema Task-Orchestrierung ist nicht neu, aber zu meiner Ăśberraschung gibt es einfach keine vorgefertigten Lösungen fĂĽr die Verwaltung von unendlichen Aufgaben (deren Lebensdauer unbegrenzt ist), mit der Möglichkeit, Aufgaben auf aktive Server zu verteilen. Deshalb werden wir unsere eigene Lösung implementieren. Aber das Wichtigste zuerst…. 






, . — (Job), , . . , “”, . : , . , , , . “”-  WebSocket ,  connected. , , , , . , “”  Observer  , , . 





, , .     : 





  • , , . 





  • , , . 





  • , , . , , , , . 





  • /, , ( , RAM ..), . 





: N , .  , , . 





3 . #, . , C# .Net. 





  1.  Task. .  Task  “”.  





  2. Schedulers. , . , , . 





  3. , , . ,    .  RabbitMq,  Framework - MassTransit, . . 





 Task 

 Task. , ( , ). 





. ,   “Hello Word”   : 





public async Task SendEmailAsync(Email email, CancellationToken token) 
{ 
	   //   
}
      
      



, ,  await   SendEmailAsync. 





foreach (var email in emails 
{ 
   if(token.IsCancellationRequested) 
        break; 
    _emailSender.SendEmailAsync(email, token); // await 
} 
      
      



:









  • FireAndForget   Exception  . 









  • , ,    .  





 await- ,    async/await  .





 email, ,  CancellationToken. , , , , .  RetryPolicy  , ?! , .





Schedulers

.NET , . 





  • HangFire 





  • Quartz.net 





  .  , ( , — , ,  instance )  /Tasks.  Hangfire, - UI, , . .





,  Hangfire.  BackgroundJob.Enqueue(Expression<Action> methodCall). 





var jobIds = new List<string>(); 
foreach (var email in emails) 
{ 
   if(token.IsCancellationRequested) 
      break; 
   jobIds.Add(BackgroundJob.Enqueue( 
      async () => await _emailSender.SendEmailAsync(email, token))); 
} 
      
      



, , .  RetryPolicy , . , , . 





, . , “” :





_observer.DoWork(observerArg, new CancellationToken())







- , .   BackgroundJobClient





var client = new BackgroundJobClient(JobStorage.Current);

//  ,    .
var state = new EnqueuedState(“unique-queue-name”); 
client.Create(() =>_observer.DoWork(observerArg, new CancellationToken()), state);
      
      



, . - unique-queue-name





//  instance hangfire . 
_server = new BackgroundJobServer(new BackgroundJobServerOptions() 
{   
    WorkerCount = 10, 
    Queues = new[] { “unique-queue-name” }, 
    ServerName = _serverOptions.ServerName 
}); 
      
      



WorkerCount - , . , . 





, , . : . , .  Hangfire  , , . 





_monitoringApi = JobStorage.Current.GetMonitoringApi(); 
      
      







Observer-service  - , , ( HangFilre  WorkerCount ). 





Observer-manager - , ... .  , , . . 





Scheduler common db â€“ -  , Hangfire   MsSql,  PostgreSql   Redis.





—    . “”.  





, , , , , , . 





, , . , .    Hangfire. :





1)   . , , . 





2)  . . , , , . 





3)  .  custom-id, . - . 





4)  , “default” . , , .  job-filters   . , . 





5)  , , , .  , , ,  framework  .  





6)  ,  Hangfire   MsSql,  Redis, . 





, .  , , , , . 





, ,

, , , . . ? , — , , . , ? , . , , . , . 





? “”.  - PrefetchCount  .  





  •  Ready. 





  •  Conumer  ,  Unacked.  Consumer  .  





  • , _Error .  





  •  acknowledged,  Consumer.  





- PrefetchCount  , ( ),  WorkerCount,  Hangfire. 









 Observer-services, . PrefetchCount  1



. , . , ,  Unacked. 





"”, : 





 Observer-services  , ,  Round-robin





  • msg1  .  , “Observer 1”.  Unacked  ,  . 





  • msg2  . “Observer 1”  ,  ,  â€śObserver 2”. 





, “Observer-service 1”  , ( - “ ... ?”). 





 , ,  acknowledgement   Unacked  Ready.  . , , . 





-  , . _Error,  RetryPolicy. ,    .  

 RetryPolicy  : 





  • 1000 . 





  • 5 1,4,10...  . 





  •  int.MaxValue . 





? “”, /.  PrefetchCount, 10, 10 , .    - , ,  . , 10 , 5 “”, , ,    11- , .





 ? ? , , ... ?! , , "" ,  CancellationToken.  





 Manager. . , , . , . , , : 





  • Id () - Guid  . 





  • Name (), , , . 





  • CreatedAt/ModifyAt ( / ). 





  • WorkersCount,  PrefetchCount - , . 





Manager  . 





Id





Name





WorkerCount





CreatedAt





ModifyAt





IsDeleted





{Unique id} 





Observer service 1





10





{some date}





null





false





{Unique id} 





Observer service 2





10





{some date}





null





false





{Unique id} 





Observer service 3





10





{some date}





null





false





 . , , 3 - . 





,  ,   , N .  IsDeleted=true





, (Kill â€“9, ). ,  Docker. , , . “”, , , . , - …. 





“” API. ( , “State queue” ). “” , , , , - .





, , “”. , , , , . 





,  ,  “”  Created.





Id





Name





CreatedAt





ModifyAt





ServiceId





Status





{Observer id} 





My_new_observer





{created date}





null





null





Created





, , ,  Processing  .  





Id





Name





CreatedAt





ModifyAt





ServiceId





Status





{Observer id} 





My_new_observer





{created date}





{modify date} 





{Observer service 1 id} 





Processing 





“” . 









  • Created  





  • Processing 





  • OnDeleting





  • Deleted 





"", : 





1) ,  CancellationToken. 





2) ,  FanOut. ,  “” , . 





, — , ... “ ”.  





 Observer-service  , . , “”  CancellationToken. “” . 





“” . ,  id . , .  





  •  Created, “” . - , “”. 





  •  OnDeleting  Deleted, - “” , . 





  •  Processing, “”  OnDeleting  .    . , “”,  CancellationToken  “state queue”. ,  OnDeleting  Deleted





Id





Name





CreatedAt





ModifyAt





ServiceId





Status





{Observer id} 





My_new_observer 





{created date}





{modify date} 





{Observer service 1 id} 





Deleted 





: 





1)





  ,  . , - MsSql, RabbitMq, Kafka,  Kubernetes  , , SLA . , . - , .  





2)  blackout, . 





, - , , , , , “”, . “”, .  ( , .) 





3)





, "”, . "”, , . 





4) . , "”. 





. - , , . 





5) “”, , . 





, , “” . . . , , , , , . 





, . , , - Unacked, - Ready. , ,  polling , . - "”, . , , ,  PrefetchCount. , , .








All Articles