Hallo Habr! Karma wurde aufgrund eines nachlässigen Kommentars unter dem Holivar-Artikel ausgelaugt, was bedeutet, dass Sie einen interessanten (ich hoffe) Beitrag schreiben und sich rehabilitieren müssen.
Ich benutze seit mehreren Jahren einen Telegrammserver-Client in PHP. Und wie viele Benutzer - müde vom stetig wachsenden Speicherverbrauch. Einige Sitzungen können 1 bis 8 Gigabyte RAM benötigen! Datenbankunterstützung wurde schon lange versprochen, aber in dieser Richtung wurden keine Fortschritte erzielt. Ich musste das Problem selbst lösen :) Die Popularität des Open-Source-Projekts stellte interessante Anforderungen an die Pull-Anfrage:
- Abwärtskompatibilität . Alle vorhandenen Sitzungen sollten in der neuen Version weiterhin funktionieren (Sitzung ist eine serialisierte Instanz der Anwendung in einer Datei).
- Wahlfreiheit der Datenbank . Die Möglichkeit, den Speichertyp jederzeit und ohne Datenverlust zu ändern, da Benutzer unterschiedliche Konfigurationen der Umgebung haben.
- Erweiterbarkeit . Einfaches Hinzufügen neuer Datenbanktypen;
- Schnittstelle speichern . Anwendungscode, der Daten manipuliert, sollte sich nicht ändern.
- Asynchronität . Das Projekt verwendet amphp, daher müssen alle Datenbankoperationen nicht blockierend sein.
Für Details lade ich alle unter Katze ein.
Was werden wir übertragen
Der größte Teil des Speichers von MadelineProto wird von Chats, Benutzern und Dateien belegt. Zum Beispiel habe ich im Peer-Cache mehr als 20.000 Einträge. Dies sind alle Benutzer, die das Konto jemals gesehen hat (einschließlich Mitglieder aller Gruppen), sowie Kanäle, Bots und Gruppen. Je älter und aktiver das Konto ist, desto mehr Daten werden gespeichert. Dies sind Dutzende und Hunderte von Megabyte, und die meisten von ihnen werden nicht verwendet. Sie können jedoch nicht den gesamten Cache leeren, da Telegramme das Konto sofort stark einschränken, wenn Sie versuchen, dieselben Daten mehrmals zu empfangen. Nach der Neuerstellung der Sitzung auf meinem öffentlichen Demo-Server beantworteten Telegramme innerhalb einer Woche die meisten Anfragen mit dem Fehler FLOOD_WAIT, und es funktionierte nichts wirklich. Nachdem sich der Cache aufgewärmt hatte, normalisierte sich alles wieder.
Aus Code-Sicht werden diese Daten als Arrays in den Eigenschaften eines Klassenpaars gespeichert.
Die Architektur
Basierend auf den Anforderungen wurde ein Schema geboren:
- Alle "schweren" Arrays werden durch Objekte ersetzt, die ArrayAccess implementieren.
- Für jeden Datenbanktyp erstellen wir unsere eigenen Klassen, die die Basisklasse erben.
- Objekte werden während __consrtuct und __awake erstellt und in Eigenschaften geschrieben.
- Die abstrakte Factory wählt die gewünschte Klasse für das Objekt abhängig von der ausgewählten Datenbank in den Anwendungseinstellungen aus.
- Wenn die Anwendung bereits über einen anderen Speichertyp verfügt, lesen wir alle Daten von dort und schreiben das Array in den neuen Speicher.
Asynchrone Weltprobleme
Als erstes habe ich Schnittstellen und eine Klasse zum Speichern von Arrays im Speicher erstellt. Dies war die Standardeinstellung, deren Verhalten mit der älteren Version des Programms identisch war. Am ersten Abend war ich sehr gespannt auf den Erfolg des Prototyps. Der Code war nett und einfach. Bisher wurde nicht entdeckt, dass es unmöglich ist, Generatoren innerhalb von Methoden der Iterator-Schnittstelle und innerhalb von Methoden zu verwenden, die für das Nicht-Setzen und Ausgeben verantwortlich sind.
Hier sollte klargestellt werden, dass amphp die Generatorsyntax verwendet, um Async in PHP zu implementieren. Die Ausbeute wird analog zu asynchron ... warten von js. Wenn eine Methode Asynchronität verwendet, müssen Sie mit Yield auf dieses Ergebnis im Code warten, um ein Ergebnis daraus zu erhalten. Zum Beispiel:
<?php
include 'vendor/autoload.php';
$MadelineProto = new \danog\MadelineProto\API('session.madeline');
$MadelineProto->async(true);
$MadelineProto->loop(function() use($MadelineProto) {
$myAsyncFunction = function() use($MadelineProto): \Generator {
$me = yield $MadelineProto->start();
yield $MadelineProto->echo(json_encode($me, JSON_PRETTY_PRINT|JSON_UNESCAPED_UNICODE));
};
yield $myAsyncFunction();
});
Wenn von Zeichenfolge
yield $myAsyncFunction();Ertrag entfernen, dann wird die Anwendung beendet, bevor dieser Code ausgeführt wird. Wir werden das Ergebnis nicht bekommen.
Das Hinzufügen von Yield vor dem Aufrufen von Methoden und Funktionen ist nicht sehr schwierig. Da jedoch die ArrayAccess-Schnittstelle verwendet wird, werden die Methoden nicht direkt aufgerufen. Beispielsweise ruft unset () offsetUnset () und isset () offsetIsset () auf. Ähnlich verhält es sich mit foreach-Iteratoren bei Verwendung der Iterator-Schnittstelle.
Das Hinzufügen von Yield vor den integrierten Methoden führt zu einem Fehler, da diese Methoden nicht für die Verwendung mit Generatoren ausgelegt sind. Ein bisschen mehr in den Kommentaren: hier und hier .
Ich musste den Code kompromittieren und neu schreiben, um meine eigenen Methoden zu verwenden. Glücklicherweise gab es nur sehr wenige solcher Orte. In den meisten Fällen wurden Arrays zum Lesen oder Schreiben per Schlüssel verwendet. Diese Funktionalität hat gute Freunde mit Generatoren gefunden.
Die resultierende Schnittstelle ist:
<?php
use Amp\Producer;
use Amp\Promise;
interface DbArray extends DbType, \ArrayAccess, \Countable
{
public function getArrayCopy(): Promise;
public function isset($key): Promise;
public function offsetGet($offset): Promise;
public function offsetSet($offset, $value);
public function offsetUnset($offset): Promise;
public function count(): Promise;
public function getIterator(): Producer;
/**
* @deprecated
* @internal
* @see DbArray::isset();
*
* @param mixed $offset
*
* @return bool
*/
public function offsetExists($offset);
}
Beispiele für die Arbeit mit Daten
<?php
...
//
$existingChat = yield $this->chats[$user['id']];
//.
yield $this->chats[$user['id']] = $user;
// yield, .
$this->chats[$user['id']] = $user;
//unset
yield $this->chats->offsetUnset($id);
//foreach
$iterator = $this->chats->getIterator();
while (yield $iterator->advance()) {
[$key, $value] = $iterator->getCurrent();
//
}
Datenspeicher
Der einfachste Weg, Daten zu speichern, ist die Serialisierung. Ich musste auf die Verwendung von json verzichten, um Objekte zu unterstützen. Die Tabelle enthält zwei Hauptspalten: Schlüssel und Wert.
Ein Beispiel für eine SQL-Abfrage zum Erstellen einer Tabelle:
CREATE TABLE IF NOT EXISTS `{$this->table}`
(
`key` VARCHAR(255) NOT NULL,
`value` MEDIUMBLOB NULL,
`ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`key`)
)
ENGINE = InnoDB
CHARACTER SET 'utf8mb4'
COLLATE 'utf8mb4_general_ci'
Bei jedem Start der Anwendung versuchen wir, für jede Eigenschaft eine Tabelle zu erstellen. Es wird nicht empfohlen, Telegrammclients mehrmals alle paar Stunden neu zu starten, sodass nicht mehrere Anforderungen zum Erstellen von Tabellen pro Sekunde erforderlich sind :)
Da der Primärschlüssel nicht automatisch inkrementiert wird, kann das Einfügen und Aktualisieren von Daten mit einer Anforderung erfolgen, wie in einem regulären Array:
INSERT INTO `{$this->table}`
SET `key` = :index, `value` = :value
ON DUPLICATE KEY UPDATE `value` = :value
Für jede Variable wird eine Tabelle mit einem Namen im Format% account_id% _% class% _% variable_name% erstellt. Wenn Sie die Anwendung zum ersten Mal starten, ist noch kein Konto vorhanden. In diesem Fall müssen Sie eine zufällige temporäre ID mit dem Präfix tmp generieren. Bei jedem Start prüft die Klasse jeder Variablen, ob die Konto-ID angezeigt wurde. Wenn id vorhanden ist, werden die Tabellen umbenannt.
Indizes
Die Struktur der Datenbank ist so einfach wie möglich, sodass in Zukunft automatisch neue Eigenschaften hinzugefügt werden. Es gibt keine Verbindungen. Es werden nur PRIMARY-Schlüsselindizes verwendet. Es gibt jedoch Situationen, in denen Sie in anderen Feldern suchen müssen.
Zum Beispiel gibt es Array- / Tabellen-Chats. Der Schlüssel dazu ist die Chat-ID. Aber oft muss man nach Benutzernamen suchen. Wenn die Anwendung Daten in Arrays speicherte, wurde die Suche nach Benutzername wie gewohnt durchgeführt, indem das Array in foreach durchlaufen wurde. Diese Suche funktionierte mit einer akzeptablen Geschwindigkeit im Speicher, jedoch nicht in der Datenbank. Daher wurde eine andere Tabelle / ein anderes Array und eine entsprechende Eigenschaft in der Klasse erstellt. Der Schlüssel ist der Benutzername, der Wert ist die Chat-ID. Der einzige Nachteil dieses Ansatzes besteht darin, dass Sie zusätzlichen Code schreiben müssen, um die beiden Tabellen zu synchronisieren.
Caching
Lokales MySQL ist schnell, aber ein bisschen Caching tut nie weh. Insbesondere, wenn derselbe Wert mehrmals hintereinander verwendet wird. Zum Beispiel überprüfen wir zuerst das Vorhandensein eines Chats in der Datenbank und erhalten dann einige Daten daraus.
Ein einfaches
<?php
namespace danog\MadelineProto\Db;
use Amp\Loop;
use danog\MadelineProto\Logger;
trait ArrayCacheTrait
{
/**
* Values stored in this format:
* [
* [
* 'value' => mixed,
* 'ttl' => int
* ],
* ...
* ].
* @var array
*/
protected array $cache = [];
protected string $ttl = '+5 minutes';
private string $ttlCheckInterval = '+1 minute';
protected function getCache(string $key, $default = null)
{
$cacheItem = $this->cache[$key] ?? null;
$result = $default;
if (\is_array($cacheItem)) {
$result = $cacheItem['value'];
$this->cache[$key]['ttl'] = \strtotime($this->ttl);
}
return $result;
}
/**
* Save item in cache.
*
* @param string $key
* @param $value
*/
protected function setCache(string $key, $value): void
{
$this->cache[$key] = [
'value' => $value,
'ttl' => \strtotime($this->ttl),
];
}
/**
* Remove key from cache.
*
* @param string $key
*/
protected function unsetCache(string $key): void
{
unset($this->cache[$key]);
}
protected function startCacheCleanupLoop(): void
{
Loop::repeat(\strtotime($this->ttlCheckInterval, 0) * 1000, fn () => $this->cleanupCache());
}
/**
* Remove all keys from cache.
*/
protected function cleanupCache(): void
{
$now = \time();
$oldKeys = [];
foreach ($this->cache as $cacheKey => $cacheValue) {
if ($cacheValue['ttl'] < $now) {
$oldKeys[] = $cacheKey;
}
}
foreach ($oldKeys as $oldKey) {
$this->unsetCache($oldKey);
}
Logger::log(
\sprintf(
"cache for table:%s; keys left: %s; keys removed: %s",
$this->table,
\count($this->cache),
\count($oldKeys)
),
Logger::VERBOSE
);
}
}
Ich möchte besonders auf startCacheCleanupLoop achten. Dank der Magie von Amphp ist die Ungültigmachung des Caches so einfach wie möglich. Der Rückruf beginnt im angegebenen Intervall, durchläuft alle Werte und zeigt das Feld ts an, in dem der Zeitstempel des letzten Aufrufs dieses Elements gespeichert ist. Wenn der Aufruf vor mehr als 5 Minuten erfolgte (in den Einstellungen konfigurierbar), wird das Element gelöscht. Es ist sehr einfach, ein ttl-Analogon aus Redis oder Memcache mit amphp zu implementieren. All dies geschieht im Hintergrund und blockiert den Haupt-Thread nicht.
Mit Hilfe von Cache und Asynchronität werden nicht nur Lesevorgänge beschleunigt, sondern auch Schreibvorgänge.
Hier ist der Quellcode für die Methode, die Daten in die Datenbank schreibt.
/**
* Set value for an offset.
*
* @link https://php.net/manual/en/arrayiterator.offsetset.php
*
* @param string $index <p>
* The index to set for.
* </p>
* @param $value
*
* @throws \Throwable
*/
public function offsetSet($index, $value): Promise
{
if ($this->getCache($index) === $value) {
return call(fn () =>null);
}
$this->setCache($index, $value);
$request = $this->request(
"
INSERT INTO `{$this->table}`
SET `key` = :index, `value` = :value
ON DUPLICATE KEY UPDATE `value` = :value
",
[
'index' => $index,
'value' => \serialize($value),
]
);
//Ensure that cache is synced with latest insert in case of concurrent requests.
$request->onResolve(fn () => $this->setCache($index, $value));
return $request;
}
$ this-> request erstellt ein Promise, das Daten asynchron schreibt. Und Operationen mit dem Cache erfolgen synchron. Das heißt, Sie müssen nicht auf einen Schreibvorgang in die Datenbank warten und gleichzeitig sicherstellen, dass Lesevorgänge sofort neue Daten zurückgeben.
Die onResolve-Methode von amphp erwies sich als sehr nützlich. Nach Abschluss der Einfügung werden die Daten erneut in den Cache geschrieben. Wenn ein Schreibvorgang zu spät ist und sich Cache und Basis unterscheiden, wird der Cache mit dem zuletzt in die Basis geschriebenen Wert aktualisiert. Jene. Unser Cache wird wieder mit der Basis konsistent.
Quelle
→ Link zum Abrufen der Anfrage
Und einfach so hat ein anderer Benutzer die Unterstützung für Postgre hinzugefügt . Es dauerte nur 5 Minuten, um die Anweisungen dafür zu schreiben .
Die Menge an Code könnte reduziert werden, indem die doppelten Methoden in die allgemeine abstrakte Klasse SqlArray verschoben werden.
Eine Sache noch
Es wurde festgestellt, dass beim Herunterladen von Mediendateien per Telegramm der Standard-Garbage-Collector-PHP die Arbeit nicht bewältigt und Teile der Datei im Speicher verbleiben. In der Regel hatten die Lecks dieselbe Größe wie die Datei. Mögliche Ursache: Der Garbage Collector wird automatisch ausgelöst, wenn sich 10.000 Links ansammeln. In unserem Fall gab es nur wenige (Dutzende) Links, aber jeder konnte sich auf Megabyte Daten im Speicher beziehen. Es war sehr faul, mit der mtproto-Implementierung Tausende von Codezeilen zu studieren. Probieren Sie zuerst die elegante Krücke mit \ gc_collect_cycles () aus.
Überraschenderweise löste es das Problem. Dies bedeutet, dass es ausreicht, den regelmäßigen Reinigungsbeginn zu konfigurieren. Glücklicherweise bietet amphp einfache Tools für die Hintergrundausführung in bestimmten Intervallen.
Das Löschen des Speichers jede Sekunde schien zu einfach und nicht sehr effektiv. Ich habe mich für einen Algorithmus entschieden, der den Speichergewinn seit der letzten Bereinigung überprüft. Das Löschen erfolgt, wenn die Verstärkung größer als der Schwellenwert ist.
<?php
namespace danog\MadelineProto\MTProtoTools;
use Amp\Loop;
use danog\MadelineProto\Logger;
class GarbageCollector
{
/**
* Ensure only one instance of GarbageCollector
* when multiple instances of MadelineProto running.
* @var bool
*/
public static bool $lock = false;
/**
* How often will check memory.
* @var int
*/
public static int $checkIntervalMs = 1000;
/**
* Next cleanup will be triggered when memory consumption will increase by this amount.
* @var int
*/
public static int $memoryDiffMb = 1;
/**
* Memory consumption after last cleanup.
* @var int
*/
private static int $memoryConsumption = 0;
public static function start(): void
{
if (static::$lock) {
return;
}
static::$lock = true;
Loop::repeat(static::$checkIntervalMs, static function () {
$currentMemory = static::getMemoryConsumption();
if ($currentMemory > static::$memoryConsumption + static::$memoryDiffMb) {
\gc_collect_cycles();
static::$memoryConsumption = static::getMemoryConsumption();
$cleanedMemory = $currentMemory - static::$memoryConsumption;
Logger::log("gc_collect_cycles done. Cleaned memory: $cleanedMemory Mb", Logger::VERBOSE);
}
});
}
private static function getMemoryConsumption(): int
{
$memory = \round(\memory_get_usage()/1024/1024, 1);
Logger::log("Memory consumption: $memory Mb", Logger::ULTRA_VERBOSE);
return (int) $memory;
}
}