Nehmen wir an, wir haben eine solche Aufgabe.
Es gibt eine Quelle für Transaktionen an der Börse. Diese Quelle sendet uns Transaktionen über die Rest-Schnittstelle.
Wir müssen diese Transaktionen abrufen, in der Datenbank speichern und einen bequemen In-Memory-Speicher erstellen.
Dieses Repository sollte die folgenden Funktionen ausführen:
- eine Liste der Trades zurückgeben;
- die volle Position zurückgeben, d.h. Tabelle "Instrument" - "aktuelle Anzahl der Wertpapiere";
- Geben Sie eine Position für ein bestimmtes Instrument zurück.
Wie gehen wir mit dieser Aufgabe um?
Gemäß den Vorschriften der Microservice-Mode müssen wir die Aufgabe in Microservices-Komponenten unterteilen:
- Erhalt einer Transaktion bei Rest;
- Speichern der Transaktion in der Datenbank;
- In-Memory-Speicher zur Darstellung von Positionsdaten.
Lassen Sie uns den ersten und dritten Dienst im Rahmen dieses Tutorials ausführen und den zweiten für den zweiten Teil belassen (schreiben Sie in die Kommentare, wenn es interessant ist).
Wir haben also zwei Microservices.
Der erste empfängt Daten von außen.
Der zweite verarbeitet diese Daten und reagiert auf eingehende Anfragen.
Natürlich möchten wir horizontale Skalierung, Non-Stop-Updates und andere Vorteile von Microservices erhalten.
Was ist eine sehr schwierige Aufgabe vor uns?
Es gibt tatsächlich viele davon, aber jetzt wollen wir darüber sprechen, wie Daten zwischen diesen Mikrodiensten fließen. Sie können auch Ruhe zwischen ihnen machen, Sie können eine Art Warteschlange stellen, Sie können sich eine Menge Dinge mit ihren Vor- und Nachteilen einfallen lassen.
Schauen wir uns einen möglichen Ansatz an - die asynchrone Kommunikation über das Axon-Framework .
Was sind die Vorteile dieser Lösung?
Erstens erhöht die asynchrone Kommunikation die Flexibilität (ja, hier gibt es ein Minus, aber wir sprechen bisher nur über Pluspunkte).
Zweitens erhalten wir Event Sourcing und CQRS sofort .
Drittens bietet Axon eine vorgefertigte Infrastruktur, und wir müssen uns nur auf die Entwicklung der Geschäftslogik konzentrieren.
Lass uns anfangen.
Wir werden das Projekt auf Gradle haben. Es wird drei Module haben:
- verbreitet. Modul mit gemeinsamen Datenstrukturen (wir mögen Copy-Paste nicht);
- tradeCreator. Modul mit Microservice zum Akzeptieren von Transaktionen in Ruhe;
- tradeQueries. Modul mit Microservices zur Positionsanzeige.
Nehmen wir Spring Boot als Basis und schließen Sie den Axon-Starter an.
Axon funktioniert gut ohne Spring, aber wir werden sie zusammen verwenden.
Wir müssen hier anhalten und Ihnen ein paar Worte über Axon sagen.
Es ist ein Client-Server-System. Es gibt einen Server - dies ist eine separate Anwendung, wir werden sie im Docker ausführen.
Und es gibt Kunden, die sich in Microservices einbetten.
Das ist das Bild. Zuerst wird der Axon-Server (im Docker) gestartet, dann unsere Microservices.
Beim Start suchen Microservices nach einem Server und beginnen mit ihm zu interagieren. Die Interaktion kann bedingt in zwei Typen unterteilt werden: technische und geschäftliche.
Der technische ist der Austausch von Nachrichten "Ich lebe" (solche Nachrichten können im Debug-Protokollierungsmodus angezeigt werden).
Das Geschäft wird durch Nachrichten wie "New Deal" verdeckt.
Eine wichtige Funktion, die nach dem Starten des Mikrodienstes den Axon-Server fragen kann, "was passiert ist", und der Server sendet die akkumulierten Ereignisse an den Mikrodienst. Somit kann der Microservice relativ sicher ohne Datenverlust neu gestartet werden.
Mit diesem Austauschschema können wir sehr einfach viele Instanzen von Microservices
und auf verschiedenen Hosts ausführen .
Ja, eine Instanz von Axon Server ist nicht zuverlässig, aber bisher.
Wir arbeiten in den Paradigmen Event Sourcing und CQRS. Dies bedeutet, dass wir "Teams", "Events" und "Samples" haben müssen.
Wir haben einen Befehl: "Deal erstellen", ein Ereignis "Deal erstellt" und drei Auswahlmöglichkeiten: "Alle Deals anzeigen", "Position anzeigen", "Position für ein Instrument anzeigen".
Das Arbeitsschema ist wie folgt:
- Der TradeCreator-Mikroservice akzeptiert eine Rest-Transaktion.
- Der TradeCreator-Microservice erstellt einen Befehl "create trade" und sendet ihn an den Axon-Server.
- Der Axon-Server empfängt den Befehl und leitet ihn an den interessierten Empfänger weiter. In unserem Fall handelt es sich um den tradeCreator-Mikroservice.
- Der TradeCreator-Microservice empfängt einen Befehl, generiert ein "Deal Created" -Ereignis und sendet es an den Axon-Server.
- Der Axon-Server empfängt das Ereignis und leitet es an interessierte Abonnenten weiter.
- Jetzt haben wir nur noch einen interessierten Empfänger - den TradeQueries-Microservice.
- Der TradeQueries-Microservice empfängt das Ereignis und aktualisiert die internen Daten.
(Es ist wichtig, dass der tradeQueries Microservice zum Zeitpunkt der Gründung des Ereignisses möglicherweise nicht verfügbar ist. Sobald er jedoch startet, erhält er das Ereignis sofort.)
Ja, der Axon-Server befindet sich im Zentrum der Kommunikation, alle Nachrichten durchlaufen ihn.
Fahren wir mit der Codierung fort.
Um den Beitrag nicht mit Code zu überladen, werde ich unten nur Fragmente geben, der Link zum gesamten Beispiel wird unten sein.
Beginnen wir mit dem gemeinsamen Modul.
Darin sind die gemeinsamen Teile das Ereignis (Klasse CreatedTradeEvent). Achten Sie auf den Namen. Dies ist der Name des Teams, das dieses Ereignis generiert hat, jedoch in der Vergangenheitsform. In der Vergangenheit, weil Zunächst wird der Befehl angezeigt, der zur Erstellung des Ereignisses führt.
Andere übliche Strukturen umfassen Klassen zum Beschreiben einer Position (Klassenposition), eines Handels (Klassenhandel) und einer Seite eines Handels (Aufzählungsseite), d.h. kaufen oder verkaufen.
Fahren wir mit dem tradeCreator-Modul fort.
Dieses Modul verfügt über eine Rest-Schnittstelle (Klasse TradeController) zum Akzeptieren von Trades.
Der Befehl "Deal erstellen" wird aus dem empfangenen Deal gebildet und an den Axon-Server gesendet.
@PostMapping("/trade")
public ResponseEntity<String> create(@RequestBody Trade trade) {
var createTradeCommand = CreateTradeCommand.builder()
.tradeId(trade.getTradeId())
...
.build();
var result = commandGateway.sendAndWait(createTradeCommand, 3, TimeUnit.SECONDS);
return ResponseEntity.ok(result.get().toString());
}
Zur Verarbeitung des Befehls wird die Klasse TradeAggregate verwendet.
Damit Axon es findet, fügen wir die Annotation @Aggregate hinzu.
Die Methode zur Verarbeitung des Befehls sieht folgendermaßen aus (mit einer Abkürzung):
@CommandHandler
public TradeAggregate(CreateTradeCommand command) {
log.info("command: {}", command);
var event = CreatedTradeEvent.builder()
.tradeId(command.tradeId())
....
.build();
AggregateLifecycle.apply(event);
}
Aus dem Befehl wird ein Ereignis generiert und an den Server gesendet.
Der Befehl befindet sich in der CreateTradeCommand-Klasse.
Schauen wir uns nun das letzte tradeQueries-Modul an.
Die Auswahlmöglichkeiten sind im Abfragepaket beschrieben.
Dieses Modul verfügt auch über eine
öffentliche TradeController Rest-Schnittstelle .
Sehen wir uns zum Beispiel die Verarbeitung der Anfrage an: "Alle Transaktionen anzeigen".
@GetMapping("/trade/all")
public List<Trade> findAllTrades() {
return queryGateway.query(new FindAllTradesQuery(),
ResponseTypes.multipleInstancesOf(Trade.class)).join();
}
Eine Abrufanforderung wird erstellt und an den Server gesendet.
Die TradesEventHandler-Klasse wird zum Verarbeiten der Abrufanforderung verwendet.
Es hat eine Methode mit Anmerkungen versehen
@QueryHandler
public List<Position> handleFindCurrentPositionQuery(FindCurrentPositionQuery query)
Er ist dafür verantwortlich, Daten aus dem In-Memory-Speicher abzurufen.
Es stellt sich die Frage, wie Informationen in diesem Geschäft aktualisiert werden.
Zunächst ist dies nur eine Sammlung von ConcurrentHashMaps, die auf bestimmte Auswahlen zugeschnitten sind.
Um sie zu aktualisieren, wird die Methode angewendet:
@EventHandler
public void on(CreatedTradeEvent event) {
log.info("event:{}", event);
var trade = Trade.builder()
...
.build();
trades.put(event.tradeId(), trade);
position.merge(event.shortName(), event.size(),
(oldValue, value) -> event.side() == Side.BUY ? oldValue + value : oldValue - value);
}
Es empfängt das Ereignis "Deal erstellt" und aktualisiert die Karten.
Dies sind die Highlights der Entwicklung von Microservices.
Was ist mit den Schwächen des Axons?
Erstens ist dies die Komplikation der Infrastruktur, ein Fehlerpunkt ist aufgetreten - der Axon-Server, die gesamte Kommunikation wird durchlaufen.
Zweitens zeigt sich der Nachteil solcher verteilter Systeme sehr deutlich - vorübergehende Dateninkonsistenz. In unserem Fall kann eine unannehmbar lange Zeit zwischen dem Erhalt eines neuen Geschäfts und der Aktualisierung der Daten für die Proben vergehen.
Was bleibt hinter den Kulissen?
Über Event Sourcing und CQRS, was es ist und wofür es ist, wird überhaupt nichts gesagt.
Ohne Offenlegung dieser Konzepte sind einige Punkte möglicherweise nicht klar.
Möglicherweise müssen auch einige Codefragmente geklärt werden.
Wir haben darüber in einem offenen Webinar gesprochen .
Vollständiges Beispiel .