ich bin der Schöpfer von Dependency Injector . Dies ist ein Abhängigkeitsinjektionsframework für Python.
Dies ist ein weiteres Tutorial zum Erstellen von Anwendungen mit dem Abhängigkeitsinjektor.
Heute möchte ich zeigen, wie Sie einen asynchronen Daemon basierend auf einem Modul erstellen können
asyncio.
Das Handbuch besteht aus folgenden Teilen:
- Was werden wir bauen?
- Werkzeugprüfung
- Projektstruktur
- Vorbereitung der Umwelt
- Protokollierung und Konfiguration
- Dispatcher
- Überwachung von example.com
- Überwachung von httpbin.org
- Tests
- Fazit
Das abgeschlossene Projekt finden Sie auf Github .
Zu Beginn ist es wünschenswert, Folgendes zu haben:
- Grundkenntnisse von
asyncio - Das Prinzip der Abhängigkeitsinjektion verstehen
Was werden wir bauen?
Wir werden einen Überwachungsdämon erstellen, der den Zugriff auf Webdienste überwacht.
Der Daemon sendet alle paar Sekunden Anforderungen an example.com und httpbin.org . Beim Empfang einer Antwort werden die folgenden Daten in das Protokoll geschrieben:
- Antwortcode
- Anzahl der Bytes als Antwort
- Zeit, die benötigt wird, um die Anfrage abzuschließen
Werkzeugprüfung
Wir werden Docker und Docker-Compose verwenden . Lassen Sie uns überprüfen, ob sie installiert sind:
docker --version
docker-compose --version
Die Ausgabe sollte ungefähr so aussehen:
Docker version 19.03.12, build 48a66213fe
docker-compose version 1.26.2, build eefe0d31
Wenn Docker oder Docker-Compose nicht installiert sind, müssen sie installiert werden, bevor Sie fortfahren können. Befolgen Sie diese Anleitungen:
Die Werkzeuge sind fertig. Fahren wir mit der Projektstruktur fort.
Projektstruktur
Erstellen Sie einen Projektordner und gehen Sie dorthin:
mkdir monitoring-daemon-tutorial
cd monitoring-daemon-tutorial
Jetzt müssen wir eine erste Projektstruktur erstellen. Erstellen Sie Dateien und Ordner gemäß der folgenden Struktur. Alle Dateien sind vorerst leer. Wir werden sie später ausfüllen.
Erste Projektstruktur:
./
├── monitoringdaemon/
│ ├── __init__.py
│ ├── __main__.py
│ └── containers.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt
Die anfängliche Projektstruktur ist fertig. Wir werden es in den folgenden Abschnitten erweitern.
Als nächstes warten wir auf die Vorbereitung der Umgebung.
Vorbereitung der Umwelt
In diesem Abschnitt bereiten wir die Umgebung für den Start unseres Daemons vor.
Zuerst müssen Sie Abhängigkeiten definieren. Wir werden Pakete wie dieses verwenden:
dependency-injector- Abhängigkeitsinjektions-Frameworkaiohttp- Web Framework (wir brauchen nur einen http Client)pyyaml- Bibliothek zum Parsen von YAML-Dateien, die zum Lesen der Konfiguration verwendet wirdpytest- Testrahmenpytest-asyncio- Hilfsbibliothek zum Testen vonasyncioAnwendungenpytest-cov- Hilfsbibliothek zur Messung der Codeabdeckung durch Tests
Fügen wir der Datei die folgenden Zeilen hinzu
requirements.txt:
dependency-injector
aiohttp
pyyaml
pytest
pytest-asyncio
pytest-cov
Und im Terminal ausführen:
pip install -r requirements.txt
Als nächstes erstellen wir
Dockerfile. Es wird den Prozess des Erstellens und Startens unseres Daemons beschreiben. Wir werden es python:3.8-busterals Basisbild verwenden.
Fügen wir der Datei die folgenden Zeilen hinzu
Dockerfile:
FROM python:3.8-buster
ENV PYTHONUNBUFFERED=1
WORKDIR /code
COPY . /code/
RUN apt-get install openssl \
&& pip install --upgrade pip \
&& pip install -r requirements.txt \
&& rm -rf ~/.cache
CMD ["python", "-m", "monitoringdaemon"]
Der letzte Schritt besteht darin, die Einstellungen zu definieren
docker-compose.
Fügen wir der Datei die folgenden Zeilen hinzu
docker-compose.yml:
version: "3.7"
services:
monitor:
build: ./
image: monitoring-daemon
volumes:
- "./:/code"
Alles ist fertig. Beginnen wir mit der Erstellung des Images und überprüfen Sie, ob die Umgebung korrekt konfiguriert ist.
Lassen Sie uns im Terminal ausführen:
docker-compose build
Der Erstellungsprozess kann einige Minuten dauern. Am Ende sollten Sie sehen:
Successfully built 5b4ee5e76e35
Successfully tagged monitoring-daemon:latest
Starten Sie nach Abschluss des Erstellungsprozesses den Container:
docker-compose up
Du wirst sehen:
Creating network "monitoring-daemon-tutorial_default" with the default driver
Creating monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitoring-daemon-tutorial_monitor_1 exited with code 0
Die Umgebung ist bereit. Der Container beginnt und endet mit Code
0.
Der nächste Schritt besteht darin, die Protokollierung einzurichten und die Konfigurationsdatei zu lesen.
Protokollierung und Konfiguration
In diesem Abschnitt konfigurieren wir die Protokollierung und das Lesen der Konfigurationsdatei.
Beginnen wir mit dem Hinzufügen des Hauptteils unserer Anwendung - des Abhängigkeitscontainers (weiter nur des Containers). Der Container enthält alle Komponenten der Anwendung.
Fügen wir die ersten beiden Komponenten hinzu. Dies ist ein Konfigurationsobjekt und eine Funktion zum Konfigurieren der Protokollierung.
Lassen Sie uns bearbeiten
containers.py:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
class ApplicationContainer(containers.DeclarativeContainer):
"""Application container."""
config = providers.Configuration()
configure_logging = providers.Callable(
logging.basicConfig,
stream=sys.stdout,
level=config.log.level,
format=config.log.format,
)
Wir haben die Konfigurationsparameter verwendet, bevor wir ihre Werte eingestellt haben. Dies ist das Prinzip, nach dem der Anbieter arbeitetConfiguration.
Zuerst verwenden wir, dann setzen wir die Werte.
Die Protokollierungseinstellungen sind in der Konfigurationsdatei enthalten.
Lassen Sie uns bearbeiten
config.yml:
log:
level: "INFO"
format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"
Definieren wir nun eine Funktion, die unseren Daemon startet. Sie wird normalerweise genannt
main(). Es wird ein Container erstellt. Der Container wird verwendet, um die Konfigurationsdatei zu lesen und die Protokollierungseinstellungsfunktion aufzurufen.
Lassen Sie uns bearbeiten
__main__.py:
"""Main module."""
from .containers import ApplicationContainer
def main() -> None:
"""Run the application."""
container = ApplicationContainer()
container.config.from_yaml('config.yml')
container.configure_logging()
if __name__ == '__main__':
main()
Der Container ist das erste Objekt in der Anwendung. Es wird verwendet, um alle anderen Objekte abzurufen.
Die Konfigurationsprotokollierung und das Lesen sind konfiguriert. Im nächsten Abschnitt erstellen wir einen Überwachungs-Task-Manager.
Dispatcher
Es ist Zeit, einen Überwachungs-Task-Manager hinzuzufügen.
Der Dispatcher enthält eine Liste der Überwachungsaufgaben und steuert deren Ausführung. Er wird jede Aufgabe gemäß dem Zeitplan ausführen. Klasse
Monitor- Basisklasse für Überwachungsaufgaben. Um bestimmte Aufgaben zu erstellen, müssen Sie untergeordnete Klassen hinzufügen und die Methode implementieren check().
Fügen wir einen Dispatcher und eine Basisklasse für die Überwachungsaufgabe hinzu.
Lassen Sie uns erstellen
dispatcher.pyund monitors.pyim Paket monitoringdaemon:
./
├── monitoringdaemon/
│ ├── __init__.py
│ ├── __main__.py
│ ├── containers.py
│ ├── dispatcher.py
│ └── monitors.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt
Fügen wir der Datei die folgenden Zeilen hinzu
monitors.py:
"""Monitors module."""
import logging
class Monitor:
def __init__(self, check_every: int) -> None:
self.check_every = check_every
self.logger = logging.getLogger(self.__class__.__name__)
async def check(self) -> None:
raise NotImplementedError()
und zur Datei
dispatcher.py:
""""Dispatcher module."""
import asyncio
import logging
import signal
import time
from typing import List
from .monitors import Monitor
class Dispatcher:
def __init__(self, monitors: List[Monitor]) -> None:
self._monitors = monitors
self._monitor_tasks: List[asyncio.Task] = []
self._logger = logging.getLogger(self.__class__.__name__)
self._stopping = False
def run(self) -> None:
asyncio.run(self.start())
async def start(self) -> None:
self._logger.info('Starting up')
for monitor in self._monitors:
self._monitor_tasks.append(
asyncio.create_task(self._run_monitor(monitor)),
)
asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, self.stop)
asyncio.get_event_loop().add_signal_handler(signal.SIGINT, self.stop)
await asyncio.gather(*self._monitor_tasks, return_exceptions=True)
self.stop()
def stop(self) -> None:
if self._stopping:
return
self._stopping = True
self._logger.info('Shutting down')
for task, monitor in zip(self._monitor_tasks, self._monitors):
task.cancel()
self._logger.info('Shutdown finished successfully')
@staticmethod
async def _run_monitor(monitor: Monitor) -> None:
def _until_next(last: float) -> float:
time_took = time.time() - last
return monitor.check_every - time_took
while True:
time_start = time.time()
try:
await monitor.check()
except asyncio.CancelledError:
break
except Exception:
monitor.logger.exception('Error executing monitor check')
await asyncio.sleep(_until_next(last=time_start))
Der Dispatcher muss dem Container hinzugefügt werden.
Lassen Sie uns bearbeiten
containers.py:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
from . import dispatcher
class ApplicationContainer(containers.DeclarativeContainer):
"""Application container."""
config = providers.Configuration()
configure_logging = providers.Callable(
logging.basicConfig,
stream=sys.stdout,
level=config.log.level,
format=config.log.format,
)
dispatcher = providers.Factory(
dispatcher.Dispatcher,
monitors=providers.List(
# TODO: add monitors
),
)
Jede Komponente wird dem Container hinzugefügt.
Schließlich müssen wir die Funktion aktualisieren
main(). Wir holen den Dispatcher aus dem Container und rufen seine Methode auf run().
Lassen Sie uns bearbeiten
__main__.py:
"""Main module."""
from .containers import ApplicationContainer
def main() -> None:
"""Run the application."""
container = ApplicationContainer()
container.config.from_yaml('config.yml')
container.configure_logging()
dispatcher = container.dispatcher()
dispatcher.run()
if __name__ == '__main__':
main()
Starten wir nun den Daemon und testen seine Arbeit.
Lassen Sie uns im Terminal ausführen:
docker-compose up
Die Ausgabe sollte folgendermaßen aussehen:
Starting monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitor_1 | [2020-08-08 16:12:35,772] [INFO] [Dispatcher]: Starting up
monitor_1 | [2020-08-08 16:12:35,774] [INFO] [Dispatcher]: Shutting down
monitor_1 | [2020-08-08 16:12:35,774] [INFO] [Dispatcher]: Shutdown finished successfully
monitoring-daemon-tutorial_monitor_1 exited with code 0
Alles funktioniert richtig. Der Dispatcher startet und stoppt, da keine Überwachungsaufgaben vorhanden sind.
Am Ende dieses Abschnitts ist das Skelett unseres Dämons fertig. Im nächsten Abschnitt werden wir die erste Überwachungsaufgabe hinzufügen.
Überwachung von example.com
In diesem Abschnitt fügen wir eine Überwachungsaufgabe hinzu, die den Zugriff auf http://example.com überwacht .
Wir werden zunächst unser Klassenmodell um eine neue Art von Überwachungsaufgabe erweitern
HttpMonitor.
HttpMonitorEs ist eine Kinderklasse Monitor. Wir werden die check () -Methode implementieren. Es wird eine HTTP-Anfrage gesendet und die empfangene Antwort protokolliert. Die Details der HTTP-Anforderung werden an die Klasse delegiert HttpClient.
Lassen Sie uns zuerst hinzufügen
HttpClient.
Lassen Sie uns eine Datei
http.pyin einem Paket erstellen monitoringdaemon:
./
├── monitoringdaemon/
│ ├── __init__.py
│ ├── __main__.py
│ ├── containers.py
│ ├── dispatcher.py
│ ├── http.py
│ └── monitors.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt
Und fügen Sie die folgenden Zeilen hinzu:
"""Http client module."""
from aiohttp import ClientSession, ClientTimeout, ClientResponse
class HttpClient:
async def request(self, method: str, url: str, timeout: int) -> ClientResponse:
async with ClientSession(timeout=ClientTimeout(timeout)) as session:
async with session.request(method, url) as response:
return response
Als nächstes müssen Sie
HttpClientdem Container hinzufügen .
Lassen Sie uns bearbeiten
containers.py:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
from . import http, dispatcher
class ApplicationContainer(containers.DeclarativeContainer):
"""Application container."""
config = providers.Configuration()
configure_logging = providers.Callable(
logging.basicConfig,
stream=sys.stdout,
level=config.log.level,
format=config.log.format,
)
http_client = providers.Factory(http.HttpClient)
dispatcher = providers.Factory(
dispatcher.Dispatcher,
monitors=providers.List(
# TODO: add monitors
),
)
Wir sind jetzt bereit hinzuzufügen
HttpMonitor. Fügen wir es dem Modul hinzu monitors.
Lassen Sie uns bearbeiten
monitors.py:
"""Monitors module."""
import logging
import time
from typing import Dict, Any
from .http import HttpClient
class Monitor:
def __init__(self, check_every: int) -> None:
self.check_every = check_every
self.logger = logging.getLogger(self.__class__.__name__)
async def check(self) -> None:
raise NotImplementedError()
class HttpMonitor(Monitor):
def __init__(
self,
http_client: HttpClient,
options: Dict[str, Any],
) -> None:
self._client = http_client
self._method = options.pop('method')
self._url = options.pop('url')
self._timeout = options.pop('timeout')
super().__init__(check_every=options.pop('check_every'))
@property
def full_name(self) -> str:
return '{0}.{1}(url="{2}")'.format(__name__, self.__class__.__name__, self._url)
async def check(self) -> None:
time_start = time.time()
response = await self._client.request(
method=self._method,
url=self._url,
timeout=self._timeout,
)
time_end = time.time()
time_took = time_end - time_start
self.logger.info(
'Response code: %s, content length: %s, request took: %s seconds',
response.status,
response.content_length,
round(time_took, 3)
)
Wir sind alle bereit, den Scheck für http://example.com hinzuzufügen . Wir müssen zwei Änderungen am Container vornehmen:
- Fügen Sie eine Fabrik hinzu
example_monitor. - Übergabe
example_monitoran den Dispatcher.
Lassen Sie uns bearbeiten
containers.py:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
from . import http, monitors, dispatcher
class ApplicationContainer(containers.DeclarativeContainer):
"""Application container."""
config = providers.Configuration()
configure_logging = providers.Callable(
logging.basicConfig,
stream=sys.stdout,
level=config.log.level,
format=config.log.format,
)
http_client = providers.Factory(http.HttpClient)
example_monitor = providers.Factory(
monitors.HttpMonitor,
http_client=http_client,
options=config.monitors.example,
)
dispatcher = providers.Factory(
dispatcher.Dispatcher,
monitors=providers.List(
example_monitor,
),
)
Der Anbieter
example_monitorist abhängig von Konfigurationswerten. Fügen wir folgende Werte hinzu:
Bearbeiten
config.yml:
log:
level: "INFO"
format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"
monitors:
example:
method: "GET"
url: "http://example.com"
timeout: 5
check_every: 5
Alles ist fertig. Wir starten den Daemon und überprüfen die Arbeit.
Wir führen im Terminal aus:
docker-compose up
Und wir sehen eine ähnliche Schlussfolgerung:
Starting monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitor_1 | [2020-08-08 17:06:41,965] [INFO] [Dispatcher]: Starting up
monitor_1 | [2020-08-08 17:06:42,033] [INFO] [HttpMonitor]: Check
monitor_1 | GET http://example.com
monitor_1 | response code: 200
monitor_1 | content length: 648
monitor_1 | request took: 0.067 seconds
monitor_1 |
monitor_1 | [2020-08-08 17:06:47,040] [INFO] [HttpMonitor]: Check
monitor_1 | GET http://example.com
monitor_1 | response code: 200
monitor_1 | content length: 648
monitor_1 | request took: 0.073 seconds
Unser Daemon kann die Verfügbarkeit des Zugriffs auf http://example.com überwachen .
Fügen wir die Überwachung https://httpbin.org hinzu .
Überwachung von httpbin.org
In diesem Abschnitt fügen wir eine Überwachungsaufgabe hinzu, die den Zugriff auf http://example.com überwacht .
Das Hinzufügen einer Überwachungsaufgabe für https://httpbin.org wird einfacher, da alle Komponenten bereit sind. Wir müssen nur einen neuen Anbieter zum Container hinzufügen und die Konfiguration aktualisieren.
Lassen Sie uns bearbeiten
containers.py:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
from . import http, monitors, dispatcher
class ApplicationContainer(containers.DeclarativeContainer):
"""Application container."""
config = providers.Configuration()
configure_logging = providers.Callable(
logging.basicConfig,
stream=sys.stdout,
level=config.log.level,
format=config.log.format,
)
http_client = providers.Factory(http.HttpClient)
example_monitor = providers.Factory(
monitors.HttpMonitor,
http_client=http_client,
options=config.monitors.example,
)
httpbin_monitor = providers.Factory(
monitors.HttpMonitor,
http_client=http_client,
options=config.monitors.httpbin,
)
dispatcher = providers.Factory(
dispatcher.Dispatcher,
monitors=providers.List(
example_monitor,
httpbin_monitor,
),
)
Lassen Sie uns bearbeiten
config.yml:
log:
level: "INFO"
format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"
monitors:
example:
method: "GET"
url: "http://example.com"
timeout: 5
check_every: 5
httpbin:
method: "GET"
url: "https://httpbin.org/get"
timeout: 5
check_every: 5
Lassen Sie uns den Daemon starten und die Protokolle überprüfen.
Lassen Sie uns im Terminal ausführen:
docker-compose up
Und wir sehen eine ähnliche Schlussfolgerung:
Starting monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitor_1 | [2020-08-08 18:09:08,540] [INFO] [Dispatcher]: Starting up
monitor_1 | [2020-08-08 18:09:08,618] [INFO] [HttpMonitor]: Check
monitor_1 | GET http://example.com
monitor_1 | response code: 200
monitor_1 | content length: 648
monitor_1 | request took: 0.077 seconds
monitor_1 |
monitor_1 | [2020-08-08 18:09:08,722] [INFO] [HttpMonitor]: Check
monitor_1 | GET https://httpbin.org/get
monitor_1 | response code: 200
monitor_1 | content length: 310
monitor_1 | request took: 0.18 seconds
monitor_1 |
monitor_1 | [2020-08-08 18:09:13,619] [INFO] [HttpMonitor]: Check
monitor_1 | GET http://example.com
monitor_1 | response code: 200
monitor_1 | content length: 648
monitor_1 | request took: 0.066 seconds
monitor_1 |
monitor_1 | [2020-08-08 18:09:13,681] [INFO] [HttpMonitor]: Check
monitor_1 | GET https://httpbin.org/get
monitor_1 | response code: 200
monitor_1 | content length: 310
monitor_1 | request took: 0.126 seconds
Der Funktionsteil ist abgeschlossen. Der Daemon überwacht die Verfügbarkeit des Zugriffs auf http://example.com und https://httpbin.org .
Im nächsten Abschnitt werden wir einige Tests hinzufügen.
Tests
Es wäre schön, einige Tests hinzuzufügen. Lass uns das tun.
Erstellen Sie eine Datei
tests.pyin einem Paket monitoringdaemon:
./
├── monitoringdaemon/
│ ├── __init__.py
│ ├── __main__.py
│ ├── containers.py
│ ├── dispatcher.py
│ ├── http.py
│ ├── monitors.py
│ └── tests.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt
und fügen Sie die folgenden Zeilen hinzu:
"""Tests module."""
import asyncio
import dataclasses
from unittest import mock
import pytest
from .containers import ApplicationContainer
@dataclasses.dataclass
class RequestStub:
status: int
content_length: int
@pytest.fixture
def container():
container = ApplicationContainer()
container.config.from_dict({
'log': {
'level': 'INFO',
'formant': '[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s',
},
'monitors': {
'example': {
'method': 'GET',
'url': 'http://fake-example.com',
'timeout': 1,
'check_every': 1,
},
'httpbin': {
'method': 'GET',
'url': 'https://fake-httpbin.org/get',
'timeout': 1,
'check_every': 1,
},
},
})
return container
@pytest.mark.asyncio
async def test_example_monitor(container, caplog):
caplog.set_level('INFO')
http_client_mock = mock.AsyncMock()
http_client_mock.request.return_value = RequestStub(
status=200,
content_length=635,
)
with container.http_client.override(http_client_mock):
example_monitor = container.example_monitor()
await example_monitor.check()
assert 'http://fake-example.com' in caplog.text
assert 'response code: 200' in caplog.text
assert 'content length: 635' in caplog.text
@pytest.mark.asyncio
async def test_dispatcher(container, caplog, event_loop):
caplog.set_level('INFO')
example_monitor_mock = mock.AsyncMock()
httpbin_monitor_mock = mock.AsyncMock()
with container.example_monitor.override(example_monitor_mock), \
container.httpbin_monitor.override(httpbin_monitor_mock):
dispatcher = container.dispatcher()
event_loop.create_task(dispatcher.start())
await asyncio.sleep(0.1)
dispatcher.stop()
assert example_monitor_mock.check.called
assert httpbin_monitor_mock.check.called
Führen Sie die folgenden Tests aus, um die Tests auszuführen:
docker-compose run --rm monitor py.test monitoringdaemon/tests.py --cov=monitoringdaemon
Sie sollten ein ähnliches Ergebnis erhalten:
platform linux -- Python 3.8.3, pytest-6.0.1, py-1.9.0, pluggy-0.13.1
rootdir: /code
plugins: asyncio-0.14.0, cov-2.10.0
collected 2 items
monitoringdaemon/tests.py .. [100%]
----------- coverage: platform linux, python 3.8.3-final-0 -----------
Name Stmts Miss Cover
----------------------------------------------------
monitoringdaemon/__init__.py 0 0 100%
monitoringdaemon/__main__.py 9 9 0%
monitoringdaemon/containers.py 11 0 100%
monitoringdaemon/dispatcher.py 43 5 88%
monitoringdaemon/http.py 6 3 50%
monitoringdaemon/monitors.py 23 1 96%
monitoringdaemon/tests.py 37 0 100%
----------------------------------------------------
TOTAL 129 18 86%
Beachten Sie, wietest_example_monitorwir im TestHttpClientMock mit der Methode ersetzen.override(). Auf diese Weise können Sie den Rückgabewert eines beliebigen Anbieters überschreiben.
Die gleichen Aktionen werden im Test ausgeführttest_dispatcher, um Überwachungsaufgaben durch Mocks zu ersetzen.
Fazit
Wir haben einen Überwachungsdämon erstellt, der auf
asynciodem Prinzip der Abhängigkeitsinjektion basiert . Wir haben Dependency Injector als Abhängigkeitsinjektions-Framework verwendet.
Der Vorteil, den Sie mit Dependency Injector erhalten, ist der Container.
Der Container beginnt sich auszuzahlen, wenn Sie die Struktur Ihrer Anwendung verstehen oder ändern müssen. Mit einem Container ist dies einfach, da sich alle Komponenten der Anwendung und ihre Abhängigkeiten an einem Ort befinden:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
from . import http, monitors, dispatcher
class ApplicationContainer(containers.DeclarativeContainer):
"""Application container."""
config = providers.Configuration()
configure_logging = providers.Callable(
logging.basicConfig,
stream=sys.stdout,
level=config.log.level,
format=config.log.format,
)
http_client = providers.Factory(http.HttpClient)
example_monitor = providers.Factory(
monitors.HttpMonitor,
http_client=http_client,
options=config.monitors.example,
)
httpbin_monitor = providers.Factory(
monitors.HttpMonitor,
http_client=http_client,
options=config.monitors.httpbin,
)
dispatcher = providers.Factory(
dispatcher.Dispatcher,
monitors=providers.List(
example_monitor,
httpbin_monitor,
),
)
Ein Container als Karte Ihrer Anwendung. Sie wissen immer, was von was abhängt.
Was weiter?
- Erfahren Sie mehr über Dependency Injector auf GitHub
- Lesen Sie die Dokumentation unter Lesen Sie die Dokumente
- Haben Sie eine Frage oder finden Sie einen Fehler? Öffnen Sie eine Ausgabe auf Github