Apis werden beispielsweise als Klassen beschrieben
class Categories(JsonEndpoint):
url = "http://127.0.0.1:8888/categories"
params = {"page": range(100), "language": "en"}
headers = {"User-Agent": get_user_agent}
results_key = "*.slug"
categories = Categories()
class Posts(JsonEndpoint):
url = "http://127.0.0.1:8888/categories/{category}/posts"
params = {"page": range(100), "language": "en"}
url_params = {"category": categories.iter_results()}
results_key = "posts"
async def comments(self, post):
comments = Comments(
self.session,
url_params={"category": post.url.params["category"], "id": post["id"]},
)
return [comment async for comment in comments]
posts = Posts()
Params und url_params können Funktionen (wie hier get_user_agent - gibt einen zufälligen Benutzeragenten zurück), Bereich, Iteratoren, wartbare und asynchrone Iteratoren enthalten (damit Sie sie miteinander verknüpfen können).
Die Parameter Header und Cookies können auch Funktionen enthalten und warten.
Die Kategorie-API im obigen Beispiel gibt ein Array von Objekten mit einem Slug zurück. Der Iterator gibt genau diese zurück. Durch Einfügen dieses Iterators in die url_params von Posts durchläuft der Iterator rekursiv alle Kategorien und alle Seiten in jedem. Es wird abgebrochen, wenn ein 404 oder ein anderer Fehler auftritt, und geht zur nächsten Kategorie über.
Und die Repositorys haben ein Beispiel für einen Aiohttp-Server für diese Klassen, damit alles getestet werden kann.
Zusätzlich zum Abrufen von Parametern können Sie diese als Daten oder JSON übergeben und eine andere Methode festlegen.
results_key ist gepunktet und versucht, Schlüssel aus den Ergebnissen zu ziehen. Beispiel: "Kommentare. *. Text" gibt den Text jedes Kommentars aus dem Array in den Kommentaren zurück.
Die Ergebnisse werden in einen Wrapper mit URL- und Parameter-Eigenschaften eingeschlossen. url wird von einer Zeichenfolge abgeleitet, die auch Parameter enthält. So können Sie herausfinden, welche Parameter verwendet wurden, um dieses Ergebnis zu erhalten. Dies wird in der Kommentarmethode demonstriert.
Es gibt auch eine Basis-Sink-Klasse für die Verarbeitung der Ergebnisse. Zum Beispiel, falten Sie sie in mq oder eine Datenbank. Es arbeitet in separaten Aufgaben und empfängt Daten über asyncio.Queue.
class LoggingSink(Sink):
def transform(self, obj):
return repr(obj)
async def init(self):
from loguru import logger
self.logger = logger
async def process(self, obj):
self.logger.info(obj)
return True
sink = LoggingSink(num_tasks=1)
Ein Beispiel für das einfachste Waschbecken. Mit der Transformationsmethode können wir einige Manipulationen am Objekt durchführen und None zurückgeben, wenn dies nicht zu uns passt. jene. In Themen können Sie auch eine Validierung durchführen.
Sink ist ein asynchroner Kontextmanager, der beim Beenden theoretisch wartet, bis alle Objekte in der Warteschlange verarbeitet sind, und dann seine Aufgaben abbricht.
Und schließlich habe ich, um alles zusammenzubinden, eine Arbeiterklasse gemacht. Es akzeptiert einen Endpunkt und mehrere Senken. Zum Beispiel,
worker = Worker(endpoint=posts, sinks=[loggingsink, mongosink])
worker.run()
run führt asyncio.run_until_complete für die Worker-Pipeline aus. Es hat auch eine Transformationsmethode.
Es gibt auch eine WorkerGroup-Klasse, mit der Sie mehrere Worker gleichzeitig erstellen und eine asyncio.gather für sie erstellen können.
Der Code enthält ein Beispiel für einen Server, der Daten über Fälscher und Handler für seine Endpunkte generiert. Ich denke, das ist das offensichtlichste.
All dies befindet sich in einem frühen Entwicklungsstadium und bisher habe ich oft die API geändert. Aber jetzt scheint es so gekommen zu sein, wie es aussehen sollte. Ich werde nur Anfragen und Kommentare zu meinem Code zusammenführen.