C ++ - Vorlagenzuordnung mit threadsicherem Ringpuffer

Hier ist eine einfache C ++ - Vorlage für einen Allokator mit einem thread-sicheren Ringpuffer.



Alle Implementierung in einer Header-H-Datei: [fast_mem_pool.h]



Chips, warum dieser Allokator besser ist als Hunderte ähnlicher - unter dem Strich.



So funktioniert mein Fahrrad.



1) Im Release-Build gibt es keine Mutexe und keine Wartezyklen für Atomic - aber der Allokator ist zyklisch und regeneriert kontinuierlich Ressourcen, wenn sie von Threads freigegeben werden. Wie macht er das?



Jede RAM-Zuweisung, die FastMemPool über fmalloc bereitstellt, ist eigentlich mehr für einen Header:



  struct AllocHeader {
//    : tag_this = this + leaf_id
    uint64_t  tag_this  {  2020071700  };  
//  :
    int  size;  
//     :
    int  leaf_id  {  -2020071708  };  
  };


Dieser Header kann immer vom Zeiger des Benutzers abgerufen werden, indem vom Zeiger (res_ptr) sizeof (AllocHeader) zurückgespult wird:



Bild



Durch den Inhalt des AllocHeader-Headers erkennt die Methode ffree (void * ptr) ihre Zuordnungen und findet heraus, in welchem ​​der Blätter des Umlaufpufferspeichers zurückgegeben wird ::



  void  ffree(void  *ptr)
  {
    char  *to_free  =  static_cast<char  *>(ptr)  
         -  sizeof(AllocHeader);
    AllocHeader  *head  =  reinterpret_cast<AllocHeader  *>(to_free);


Wenn der Allokator aufgefordert wird, Speicher zuzuweisen, überprüft er das aktuelle Blatt des Array von Blättern, um festzustellen, ob die erforderliche Größe + Größe der Headergröße von (AllocHeader) abgeschnitten werden kann.



Im Allokator werden Leaf_Cnt-Speicherblätter im Voraus reserviert, jedes Blatt der Größe Leaf_Size_Bytes (hier ist alles traditionell). Auf der Suche nach einer Zuweisungsmöglichkeit kreist die Methode fmalloc (std :: size_t Allocation_size) durch die Blätter des Arrays leaf_array. Wenn überall alles beschäftigt ist, wird vom Betriebssystem, sofern das Flag Do_OS_malloc aktiviert ist, Speicher vom Betriebssystem benötigt, der größer ist als die erforderliche Größe nach Größe von (AllocHeader) - außerhalb Der Speicher wird aus dem internen Ringpuffer oder aus dem Betriebssystem entnommen. Der Allokator erstellt immer einen Header mit Serviceinformationen. Wenn dem Allokator der Speicher ausgeht und das Flag Do_OS_malloc == false nicht mehr vorhanden ist, gibt fmalloc nullptr zurück. Dieses Verhalten ist nützlich, um die Last zu steuern (z. B. Frames von einer Videokamera überspringen, wenn die Frame-Verarbeitungsmodule nicht mit den FPS der Kamera Schritt halten).



Wie Radfahren umgesetzt wird



Zyklische Allokatoren sind für zyklische Aufgaben ausgelegt - Aufgaben sollten nicht ewig dauern. Zum Beispiel kann es sein:



  • Zuordnungen für Benutzersitzungen
  • Videostream-Frame-Verarbeitung für die Videoanalyse
  • das Leben der Kampfeinheiten im Spiel


Da das Array leaf_array eine beliebige Anzahl von Speicherblättern enthalten kann, ist es im Limit möglich, eine Seite für die theoretisch mögliche Anzahl von Kampfeinheiten im Spiel zu erstellen, so dass wir unter der Bedingung, dass Einheiten ausfallen, garantiert ein freies Speicherblatt erhalten. In der Praxis reichen für die Videoanalyse normalerweise 16 große Blätter für mich aus, von denen die ersten paar Blätter für langfristige nichtzyklische Zuweisungen gespendet werden, wenn der Detektor initialisiert wird.



Wie die Thread-Sicherheit implementiert wird



Das Array der Zuordnungsblätter funktioniert ohne Mutexe ... Der Schutz vor Fehlern vom Typ "Datenrennen" erfolgt wie folgt:



      char  *buf;
      // available == offset 
      std::atomic<int>  available  {  Leaf_Size_Bytes  };
      // allocated ==  
      std::atomic<int>  deallocated  {  0  };


Jedes Speicherblatt verfügt über 2 Zähler:



- Verfügbar, initialisiert durch die Größe der Leaf_Size_Bytes. Mit jeder Zuordnung nimmt dieser Zähler ab, und der gleiche Zähler dient als Versatz relativ zum Anfang des Speicherblatts. == Der Speicher wird ab dem Ende des Puffers zugewiesen:



result_ptr  =  leaf_array[leaf_id].buf + available_after;


- Die Freigabe wird durch {0} auf Null initialisiert, und für jede Freigabe auf diesem Blatt (ich erfahre von AllocHeader, auf welchem ​​Blatt oder Betriebssystem das Geschäft behandelt wird) wird der Zähler um das freigegebene Volumen erhöht:



const int  deallocated  =  leaf_array[head->leaf_id].deallocated.fetch_add(real_size, std::memory_order_acq_rel)  +  real_size;


Sobald die Zähler wie diese (freigegeben == (Leaf_Size_Bytes - verfügbar)) übereinstimmen, bedeutet dies, dass alles, was zugewiesen wurde, jetzt freigegeben wird und Sie das Blatt auf seinen ursprünglichen Zustand zurücksetzen können. Hier ist jedoch ein subtiler Punkt: Was passiert, wenn nach der Entscheidung, das Blatt zurückzusetzen Zurück zum Original weist jemand dem Blatt ein weiteres kleines Stück Speicher zu. Um dies auszuschließen, verwenden Sie die Prüfung compare_exchange_strong:



if (deallocated  == (Leaf_Size_Bytes - available))
{  //      ,
  // , ,  Leaf
  if (leaf_array[head->leaf_id].available
      .compare_exchange_strong(available,  Leaf_Size_Bytes))
  {
    leaf_array[head->leaf_id].deallocated  -=  deallocated;
  }
}


Das Speicherblatt wird nur dann in seinen Ausgangszustand zurückgesetzt, wenn zum Zeitpunkt des Zurücksetzens derselbe Zustand des verfügbaren Zählers erhalten bleibt, der zum Zeitpunkt der Entscheidung war. Ta-daa !!!



Ein schöner Bonus ist, dass Sie die folgenden Fehler mithilfe des AllocHeader-Headers jeder Zuordnung abfangen können:



  • Neuzuweisung
  • Freigabe des Gedächtnisses eines anderen
  • Pufferüberlauf
  • Zugriff auf den Speicherbereich einer anderen Person


Das zweite Feature wird für diese Möglichkeiten implementiert.



2) Die Debug-Kompilierung enthält die genauen Informationen, wo sich die vorherige Freigabe während der erneuten Zuordnung befand: Dateiname, Codezeilennummer, Methodenname. Dies wird in Form von Dekoratoren um die Basismethoden implementiert (fmallocd, ffreed, check_accessd - die Debug-Version der Methoden hat am Ende ein d):



/**
 * @brief FFREE  -      free
 * @param iFastMemPool  -   FastMemPool    
 * @param ptr  -      fmaloc
 */
#if defined(Debug)
#define FFREE(iFastMemPool, ptr) \
   (iFastMemPool)->ffreed (__FILE__, __LINE__, __FUNCTION__, ptr)
#else
#define FFREE(iFastMemPool, ptr) \
   (iFastMemPool)->ffree (ptr)
#endif


Die Präprozessor-Makros werden verwendet:



  • __FILE__ - c ++ - Quelldatei
  • __LINE__ - Zeilennummer in der c ++ - Quelldatei
  • __FUNCTION__ - Funktionsname, in dem dies passiert ist


Diese Informationen werden als Übereinstimmung zwischen dem Zuordnungszeiger und den Zuordnungsinformationen im Mediator gespeichert:



  struct AllocInfo {
//   : ,   ,   :
    std::string  who;  
//  true - ,  false - :
    bool  allocated  {  false  };  
  };
  std::map<void *,  AllocInfo>  map_alloc_info;
  std::mutex  mut_map_alloc_info;


Da Geschwindigkeit für das Debuggen nicht mehr so ​​wichtig ist, wurde ein Mutex verwendet, um die Standard-std :: map zu schützen. Der Vorlagenparameter (bool Raise_Exeptions = DEF_Raise_Exeptions) beeinflusst, ob bei einem Fehler eine Ausnahme ausgelöst werden soll.



Für diejenigen, die maximalen Komfort im Release-Build wünschen, können Sie das DEF_Auto_deallocate-Flag setzen. Anschließend werden alle OS-Malloc-Zuweisungen geschrieben (bereits unter dem Mutex in std :: set <>) und im FastMemPool-Destruktor freigegeben (als Zuordnungs-Tracker verwendet).



3)Um Fehler wie "Pufferüberlauf" zu vermeiden, empfehle ich, die Prüfung FastMemPool :: check_access zu verwenden, bevor Sie mit dem zugewiesenen Speicher arbeiten. Während sich das Betriebssystem nur beschwert, wenn Sie in den RAM einer anderen Person gelangen, berechnet die Funktion check_access (oder das Makro FCHECK_ACCESS) anhand des AllocHeader-Headers, ob die angegebene Zuordnung überschritten wird:



  /**
   * @brief check_access  -        
   * @param base_alloc_ptr -      FastMemPool
   * @param target_ptr  -     
   * @param target_size  -   ,    
   * @return - true         FastMemPool
   */
  bool  check_access(void  *base_alloc_ptr,  void  *target_ptr,  std::size_t  target_size)

//  :
  if (FCHECK_ACCESS(fastMemPool, elem.array, 
      &elem.array[elem.array_size - 1], sizeof (int))) 
  {
    elem.array[elem.array_size - 1] = rand();
  }


Wenn Sie den Zeiger der anfänglichen Zuordnung kennen, können Sie immer den Header abrufen. Aus dem Header ermitteln wir die Größe der Zuordnung und berechnen dann, ob das Zielelement innerhalb der anfänglichen Zuordnung liegt. Es reicht aus, vor Beginn des Verarbeitungszyklus einmal den maximal theoretisch möglichen Zugriff zu überprüfen. Es kann sehr gut sein, dass die Grenzwerte die Zuordnungsgrenzen durchbrechen (in den Berechnungen wird beispielsweise angenommen, dass eine Variable aufgrund der Physik des Prozesses nur in einem bestimmten Bereich laufen kann und Sie daher nicht prüfen, ob die Zuordnungsgrenze überschritten wird).



Es ist besser, einmal zu überprüfen, als eine Woche später nach jemandem zu suchen, der gelegentlich zufällige Daten in Ihre Strukturen schreibt ...



4) Festlegen des Standardvorlagencodes zur Kompilierungszeit über CMake.



CmakeLists.txt enthält konfigurierbare Parameter, zum Beispiel:



set(DEF_Leaf_Size_Bytes "65536" CACHE PATH "Size of each memory pool leaf")
message("DEF_Leaf_Size_Bytes: ${DEF_Leaf_Size_Bytes}")
set(DEF_Leaf_Cnt "16" CACHE PATH "Memory pool leaf count")
message("DEF_Leaf_Cnt: ${DEF_Leaf_Cnt}")


Dies macht es sehr bequem, die Parameter in QtCreator:



Bild



oder CMake GUI zu bearbeiten :



Bild



Dann werden die Parameter während der Kompilierung wie folgt an den Code übergeben:



set(SPEC_DEFINITIONS
      ${CMAKE_SYSTEM_NAME}
      ${CMAKE_BUILD_TYPE}
      ${SPEC_BUILD}
      SPEC_VERSION="${Proj_VERSION}"
      DEF_Leaf_Size_Bytes=${DEF_Leaf_Size_Bytes}
      DEF_Leaf_Cnt=${DEF_Leaf_Cnt}
      DEF_Average_Allocation=${DEF_Average_Allocation}
      DEF_Need_Registry=${DEF_Need_Registry}
  )
#
target_compile_definitions(${TARGET} PUBLIC ${TARGET_DEFINITIONS})


und überschreiben Sie im Code die Vorlagenwerte in Standard:



#ifndef DEF_Leaf_Size_Bytes
  #define DEF_Leaf_Size_Bytes  65535
#endif


template<int Leaf_Size_Bytes = DEF_Leaf_Size_Bytes, 
    int Leaf_Cnt = DEF_Leaf_Cnt,
    int Average_Allocation = DEF_Average_Allocation,
    bool Do_OS_malloc = DEF_Do_OS_malloc,
    bool Need_Registry = DEF_Need_Registry, 
    bool Raise_Exeptions = DEF_Raise_Exeptions>
class FastMemPool
{
// ..
};


So kann die Allokatorvorlage bequem mit der Maus angepasst werden, indem die Kontrollkästchen der CMake-Parameter ein- und ausgeschaltet werden.



5) Um den Allokator in STL-Containern in derselben .h-Datei verwenden zu können, sind die Funktionen von std :: allocator teilweise in der FastMemPoolAllocator-Vorlage implementiert:



//    compile time  :
std::unordered_map<int,  int, std::hash<int>,
  std::equal_to<int>,
  FastMemPoolAllocator<std::pair<const int,  int>> >   umap1;

//    runtime  :
std::unordered_map<int,  int>  umap2(
   1024, std::hash<int>(),
   std::equal_to<int>(),
   FastMemPoolAllocator<std::pair<const int,  int>>());


Anwendungsbeispiele finden Sie hier: test_allocator1.cpp und test_stl_allocator2.cpp .



Zum Beispiel die Arbeit von Konstruktoren und Destruktoren bei Zuordnungen:



bool test_Strategy()
{
  /*
   *     Runtime
   *  (     )
 */
  using MyAllocatorType = FastMemPool<333, 33>;
// instance of:
  MyAllocatorType  fastMemPool;  
// inject instance:
  FastMemPoolAllocator<std::string,
     MyAllocatorType > myAllocator(&fastMemPool); 
  //     3 :
  std::string* str = myAllocator.allocate(3);
  //     : 
  myAllocator.construct(str, "Mother ");
  myAllocator.construct(str + 1, " washed ");
  myAllocator.construct(str + 2, "the frame");

//- 
  std::cout << str[0] << str[1] << str[2]; 

  //  :
  myAllocator.destroy(str);
  myAllocator.destroy(str + 1);
  myAllocator.destroy(str + 2);
  //  :
  myAllocator.deallocate(str, 3);
  return  true;
}


6) Manchmal macht man in einem großen Projekt eine Art Modul, testet alles gründlich - es funktioniert wie eine Schweizer Uhr. Ihr Modul ist im Detektor enthalten, wird in eine Schlacht verwickelt - und dort fällt die Bibliothek manchmal einmal am Tag in eine Müllkippe. Nachdem Sie den Speicherauszug auf dem Debugger ausgeführt haben, stellen Sie fest, dass in einer der Schleifendurchläufe von Zeigern anstelle von nullptr jemand die Nummer 8 in Ihren Zeiger geschrieben hat. Wenn Sie zu diesem Zeiger gehen, haben Sie das Betriebssystem natürlich verärgert.



Wie können wir die Bandbreite möglicher Schuldiger eingrenzen? Es ist sehr einfach, Ihre Strukturen von den Verdächtigen auszuschließen - sie müssen in den Arbeitsspeicher an einen anderen Ort verschoben werden (wo der Saboteur nicht bombardiert):



Bild



Wie geht das mit FastMemPool? Ganz einfach: In FastMemPool erfolgt die Zuweisung durch Abbeißen am Ende einer Speicherseite. Indem Sie eine Speicherseite anfordern, die mehr als für die Arbeit erforderlich ist, stellen Sie sicher, dass der Anfang der Speicherseite ein Testfeld für fehlerhafte Bombenangriffe bleibt. Zum Beispiel:



FastMemPool<100000000, 1, 1024, true, true>  bulletproof_mempool;
void *ptr = bulletproof_mempool.fmalloc(1234567);
// ..
//  -    c ptr
// ..
bulletproof_mempool.ffree(ptr);


Wenn an einem neuen Ort jemand Ihre Strukturen bombardiert, sind Sie es höchstwahrscheinlich selbst ...



Wenn sich die Bibliothek stabilisiert, erhält das Team mehrere Geschenke gleichzeitig:



  • Ihr Algorithmus funktioniert wieder wie eine Schweizer Uhr
  • Ein Buggy-Codierer kann jetzt sicher einen leeren Speicherbereich bombardieren (während alle danach suchen). Die Bibliothek ist stabil.
  • Der Bombenbereich kann überwacht werden, um den Speicher zu ändern - um Fallen an einem Buggy-Encoder zu setzen.





Insgesamt , was sind die Vorteile dieser besonderen Fahrrad:



  • ( / )
  • , Debug ,
  • , /
  • , ( nullptr), — , ( FPS , FastMemPool -).


In unserem Unternehmen erforderte die Installation der Analyse der 3D-Geometrie von Blechen eine Multithread-Videoverarbeitung (50 FPS). Die Blätter gehen unter der Kamera hindurch und auf der Reflexion des Lasers erstelle ich eine 3D-Karte des Blattes. FastMemPool wurde verwendet, um maximale Arbeitsgeschwindigkeit mit Speicher und Sicherheit zu gewährleisten. Wenn die Streams die eingehenden Frames nicht verarbeiten können, führt das Speichern von Frames für die zukünftige Verarbeitung auf die übliche Weise zu einem unkontrollierten RAM-Verbrauch. Mit FastMemPool wird im Falle eines Überlaufs bei der Zuweisung einfach nullptr zurückgegeben und der Frame übersprungen. Im endgültigen 3D-Bild zeigt ein solcher Fehler in Form eines Sprunges in einem Schritt, dass der Verarbeitung CPU-Threads hinzugefügt werden müssen.



Der mutexfreie Betrieb von Threads mit einem kreisförmigen Speicherzuweiser und einem Taskstack ermöglichte es, eingehende Frames sehr schnell, ohne Frame-Verlust und ohne überfüllten RAM zu verarbeiten. Jetzt läuft dieser Code in 16 Threads auf einer AMD Ryzen 9 3950X-CPU. Es wurden keine Fehler in der FastMemPool-Klasse festgestellt.



Ein vereinfachtes Beispieldiagramm des Videoanalyseprozesses mit RAM-Überlaufsteuerung ist im Quellcode test_memcontrol1.cpp zu sehen .



Und zum Nachtisch: Im gleichen Beispielschema wird ein Nicht-Mutex-Stapel verwendet:



using  TWorkStack = SpecSafeStack<VideoFrame>;
//..
  // Video frames exchanger:
TWorkStack  work_stack;
//..
work_staff->work_stack.push(frame);
//..
VideoFrame * frame = work_staff->work_stack.pop();


Ein funktionierender Demo-Stand mit allen Quellen ist hier auf gihub .



All Articles