Spring Integration - dynamische Datenflüsse

Feuerwerk, Habr! Heute werden wir einen bestimmten Bereich analysieren - die Streaming-Datenverarbeitung mit dem Spring Integration Framework und wie diese Streams zur Laufzeit ohne vorläufige Initialisierung im Anwendungskontext erstellt werden. Eine vollständige Musteranwendung finden Sie in der Gita .



Einführung



Spring Integration ist ein Enterprise Integration Framework (EIP), das Messaging-Mechanismen unter der Haube zwischen Adaptern verschiedener Protokolle / Integrationssysteme basierend auf Nachrichtenkanälen (bedingte Warteschlangen) verwendet. Berühmte Analoga sind Camel, Mule, Nifi.



Aus dem Testfall müssen wir - einen REST-Service erstellen, der die empfangenen Anforderungsparameter lesen kann, in unsere Datenbank gehen, z. B. Postgres, die Tabellendaten gemäß den von der Quelle empfangenen Parametern aktualisieren und abrufen und das Ergebnis an die Warteschlange zurücksenden (Anforderung) / response) und erstellen Sie mehrere Instanzen mit unterschiedlichen Anforderungspfaden.



Herkömmlicherweise sieht das Datenflussdiagramm folgendermaßen aus:



Bild



Als nächstes werde ich zeigen, wie Sie dies einfach tun können, ohne viel mit einem Tamburin zu tanzen, indem Sie IntegrationFlowContext mit REST-steuernden Komponenten- / Thread-Endpunkten verwenden. Der gesamte Hauptprojektcode befindet sich im Repository. Hier werde ich nur einige Ausschnitte angeben. Nun, wer interessiert sich bitte unter Katze.



Werkzeuge



Beginnen wir standardmäßig mit dem Abhängigkeitsblock. Grundsätzlich benötigen wir Spring-Boot-Projekte - für die REST-Ideologie des Flow- / Komponentenmanagements, der Spring-Integration -, um unseren Fall basierend auf Kanälen und Adaptern zu erstellen.



Und wir überlegen sofort, was wir sonst noch brauchen, um den Fall zu reproduzieren. Zusätzlich zu den Kernabhängigkeiten benötigen wir Teilprojekte - Integration-http, Integration-JDBC, Integration-Groovy (bietet dynamisch anpassbare Datentransformatoren basierend auf Goovy-Skripten). Unabhängig davon werde ich sagen, dass wir in diesem Beispiel den groovigen Konverter nicht als unnötig verwenden, sondern die Möglichkeit bieten, ihn von außen anzupassen.



Abhängigkeitsliste
 <!-- Spring block -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-commons</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-groovy</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-http</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-jdbc</artifactId>
        </dependency>

        <!-- Db block -->
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
        </dependency>

        <dependency>
            <groupId>com.zaxxer</groupId>
            <artifactId>HikariCP</artifactId>
        </dependency>

        <!-- Utility block -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>

        <dependency>
            <groupId>org.reflections</groupId>
            <artifactId>reflections</artifactId>
            <version>0.9.12</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.4</version>
            <scope>provided</scope>
        </dependency>




Interne Küche



Fahren wir mit der Erstellung der erforderlichen Systemkomponenten (Wrapper / Modelle) fort. Wir benötigen Channel-, Bean-, httpInboundGateway-, Handler-, jdbcOutboundGateway- und Ergebnismodelle.



  • bean - ein Hilfsobjekt, das für Adapter und Thread benötigt wird
  • Kanal - Kanal zum Übermitteln von Nachrichten an / von Stream-Komponenten
  • httpInboundGateway - http-Zugangspunkt, an den wir eine Anfrage mit Daten zur weiteren Verarbeitung senden
  • Handler - ein generischer Handlertyp (Rillentransformatoren, verschiedene Adapter usw.)
  • jdbcOutboundGateway - JDBC-Adapter
  • Ergebnis-Handler zum Senden von Informationen an einen bestimmten Kanal


Wir benötigen Wrapper, um Parameter zu speichern und die Komponenten eines gesamten Streams korrekt zu initialisieren, sodass wir sofort einen Komponentenspeicher erstellen und hinzufügen. Funktionalität von JSON-Konvertern -> Definitionsmodell. Die direkte Zuordnung von Feldern mit Jackson und Objekten war in meinem Fall nicht anwendbar - wir haben noch ein Fahrrad für ein bestimmtes Kommunikationsprotokoll.



Machen wir es gleich mit Annotationen :



StreamComponent - ist für die Identifizierung von Klassen als Optimierungsmodell einer Stream-Komponente verantwortlich und verfügt über Serviceinformationen - den Namen der Komponente, den Typ der Komponente, ob die Komponente verschachtelt ist und eine Beschreibung;



SettingClass - verantwortlich für zusätzliche Optionen zum Scannen des Modells, z. B. Scannen von Superklassenfeldern und Ignorieren von Feldern beim Initialisieren von Werten;



SettingValue - verantwortlich für die Identifizierung des Klassenfelds als von außen anpassbar, mit Namenseinstellungen in JSON, Beschreibung, Typkonverter, erforderlichem Feldflag und internem Objektflag zu Informationszwecken;



Komponentenspeichermanager



Hilfsmethoden für die Arbeit mit Modellen für REST-Controller



Basismodell - eine Abstraktion mit einer Reihe von Hilfsfeldern / Modellmethoden



Aktuelle Flusskonfigurationsmodelle



Mapper JSON -> Definitionsmodell



Der Hauptgrund für die Arbeit wurde vorbereitet. Kommen wir nun zur direkten Implementierung von Diensten, die für den Lebenszyklus, die Speicherung und die Initialisierung von Streams verantwortlich sind, und legen sofort die Idee fest, dass wir einen Stream mit derselben Benennung in mehrere Instanzen parallelisieren können, d. H. Wir müssen eindeutige Bezeichner (Hilfslinien) für alle Komponenten des Flusses erstellen, da sonst im Anwendungskontext Kollisionen mit anderen Singleton-Komponenten (Beans, Kanäle usw.) auftreten können. Aber zuerst machen wir Mapper aus zwei Komponenten - http und jdbc, d.h. das Inkrement der Modelle, die zuvor für die Komponenten des Streams selbst erstellt wurden (HttpRequestHandlerEndpointSpec und JdbcOutboundGateway).



HttpRegistry



JdbcRegistry



Central Management Service ( StreamDeployingService) führt die Funktionen zum Speichern von Workern / Inaktiven aus, registriert neue, startet, stoppt und entfernt Threads vollständig aus dem Anwendungskontext. Ein wichtiges Merkmal des Dienstes ist die Einführung der IntegrationFlowBuilderRegistry-Abhängigkeit, mit deren Hilfe wir die Anwendungsdynamik verbessern können (erinnern Sie sich möglicherweise kilometerweit an diese XML-Konfigurationsdateien oder DSL-Klassen). Gemäß der Stream-Spezifikation muss es immer mit einer eingehenden Komponente oder einem eingehenden Kanal beginnen, daher berücksichtigen wir dies bei der Implementierung der registerStreamContext-Methode.



Und der Hilfsmanager ( IntegrationFlowBuilderRegistry)), das sowohl die Funktion eines Mapper von Modellen für Flusskomponenten als auch die Initialisierung des Flusses selbst mit IntegrationFlowBuilder ausführt. Ich habe auch einen Log-Handler in der Flow-Pipeline implementiert, einen Service zum Sammeln von Flow-Channel-Metriken (eine umschaltbare Option) und eine mögliche Implementierung von Flow-Message-Konvertern basierend auf der Groovy-Implementierung (wenn dieses Beispiel plötzlich die Grundlage für den Verkauf wird, muss die Vorkompilierung von Groovy-Skripten in der Phase der Flow-Initialisierung erfolgen , weil Sie im RAM auf Lasttests stoßen und egal wie viele Kerne und Leistung Sie haben). Abhängig von der Konfiguration der Protokollphasen und Parameter auf Protokollebene des Modells ist es nach jeder Übertragung einer Nachricht von Komponente zu Komponente aktiv. Die Überwachung wird durch einen Parameter in application.yml aktiviert und deaktiviert:



monitoring:
  injectction:
    default: true


Jetzt haben wir alle Mechanismen zum Initialisieren dynamischer Datenverarbeitungsabläufe. Außerdem können wir Mapper für verschiedene Protokolle und Adapter wie RabbitMQ, Kafka, Tcp, Ftp usw. schreiben. Umso mehr müssen Sie in den meisten Fällen nichts mit Ihrer eigenen Hand schreiben (außer natürlich Konfigurationsmodelle und Hilfsmethoden) - eine relativ große Anzahl von Komponenten ist bereits im Repository vorhanden .



Die letzte Phase wird die Implementierung von Controllern sein, um Informationen über vorhandene Systemkomponenten zu erhalten, Abläufe zu verwalten und Metriken zu erhalten.



ComponentsController - bietet Informationen zu allen Komponenten in einem für Menschen lesbaren Modell sowie zu einer Komponente nach Name und Typ.



StreamController - bietet ein vollständiges Flussmanagement, nämlich die Initialisierung neuer JSON-Modelle, das Starten, Stoppen, Löschen und Ausgeben von Metriken nach Kennung.



Endprodukt



Wir erheben die resultierende Anwendung und beschreiben den Testfall im JSON-Format.



Beispieldatenstrom
:



CREATE TABLE IF NOT EXISTS account_data
(
    id          INT                      NOT NULL,
    accountname VARCHAR(45)              NOT NULL,
    password    VARCHAR(128),
    email       VARCHAR(255),
    last_ip     VARCHAR(15) DEFAULT NULL NOT NULL
);

CREATE UNIQUE INDEX account_data_username_uindex
    ON account_data (accountname);

ALTER TABLE account_data
    ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
        SEQUENCE NAME account_data_id_seq
            START WITH 1
            INCREMENT BY 1
            NO MINVALUE
            NO MAXVALUE
            CACHE 1
        );

ALTER TABLE account_data
    ADD CONSTRAINT account_data_pk
        PRIMARY KEY (id);

CREATE TABLE IF NOT EXISTS account_info
(
    id             INT NOT NULL,
    banned         BOOLEAN  DEFAULT FALSE,
    premium_points INT      DEFAULT 0,
    premium_type   SMALLINT DEFAULT -1
);

ALTER TABLE account_info
    ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
        SEQUENCE NAME account_info_id_seq
            START WITH 1
            INCREMENT BY 1
            NO MINVALUE
            NO MAXVALUE
            CACHE 1
        );

ALTER TABLE account_info
    ADD CONSTRAINT account_info_account_data_id_fk FOREIGN KEY (id) REFERENCES account_data (id)
        ON UPDATE CASCADE ON DELETE CASCADE;

ALTER TABLE account_info
    ADD CONSTRAINT account_info_pk
        PRIMARY KEY (id);



INSERT INTO account_data (accountname, password, email, last_ip)
VALUES ('test', 'test', 'test@test', '127.0.0.1');
INSERT INTO account_info (banned, premium_points, premium_type)
VALUES (false, 1000, 1);


: order — , .. , . ( ). — .



{
  "flowName": "Rest Postgres stream",
  "components": [
    {
      "componentName": "bean",
      "componentType": "other",
      "componentParameters": {
        "id": "pgDataSource",
        "bean-type": "com.zaxxer.hikari.HikariDataSource",
        "property-args": [
          {
            "property-name": "username",
            "property-value": "postgres"
          },
          {
            "property-name": "password",
            "property-value": "postgres"
          },
          {
            "property-name": "jdbcUrl",
            "property-value": "jdbc:postgresql://localhost:5432/test"
          },
          {
            "property-name": "driverClassName",
            "property-value": "org.postgresql.Driver"
          }
        ]
      }
    },
    {
      "componentName": "message-channel",
      "componentType": "source",
      "componentParameters": {
        "id": "jdbcReqChannel",
        "order": 1,
        "channel-type": "direct",
        "max-subscribers": 1000
      }
    },
    {
      "componentName": "message-channel",
      "componentType": "source",
      "componentParameters": {
        "id": "jdbcRepChannel",
        "order": 1,
        "channel-type": "direct"
      }
    },
    {
      "componentName": "http-inbound-gateway",
      "componentType": "source",
      "componentParameters": {
        "order": 2,
        "http-inbound-supported-methods": [
          "POST"
        ],
        "payload-type": "org.genfork.integration.model.request.http.SimpleJdbcPayload",
        "log-stages": true,
        "log-level": "INFO",
        "request-channel": "jdbcReqChannel",
        "reply-channel": "jdbcRepChannel"
      }
    },
    {
      "componentName": "handler",
      "componentType": "processor",
      "componentParameters": {
        "order": 3,
        "handler-definition": {
          "componentName": "jdbc-outbound-adapter",
          "componentType": "app",
          "componentParameters": {
            "data-source": "pgDataSource",
            "query": "SELECT accountname, password, email, last_ip, banned, premium_points, premium_type FROM account_data d INNER JOIN account_info i ON d.id = i.id WHERE d.id = :payload.accountId",
            "update-query": "UPDATE account_info SET banned = true WHERE id = :payload.accountId",
            "jdbc-reply-channel": "jdbcRepChannel",
            "log-stages": true,
            "log-level": "INFO"
          }
        }
      }
    },
    {
      "componentName": "result",
      "componentType": "app",
      "componentParameters": {
        "order": 4,
        "cancel": false,
        "result-channel": "jdbcRepChannel"
      }
    }
  ]
}





Testen:



1) Wir initialisieren einen neuen Stream mit der



POST / stream / deploy- Methode , wobei sich unser JSON im Anforderungshauptteil befindet.



Als Antwort muss das System senden, wenn alles korrekt ist, andernfalls wird eine Fehlermeldung angezeigt:



{
    "status": "SUCCESS", -  
    "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b" -  
}


2) Wir starten den Start mit der Methode:



GET / stream / 2bf65d9d-97c6-4199-86aa-0c808c25071b / start, wobei wir die ID des initialisierten Streams früher angeben.



Als Antwort muss das System senden, wenn alles korrekt ist, andernfalls wird eine Fehlermeldung angezeigt:



{
    "status": "SUCCESS", -  
}


3) Aufrufen eines Streams über eine Kennung im System? Wie, was und wo - im Mapper des HttpRegistry-Modells habe ich die Bedingung geschrieben



Http.inboundGateway(localPath != null ? localPath : String.format("/stream/%s/call", uuid))


Dabei wird der Parameter http-inbound-path berücksichtigt. Wenn er in der Konfiguration der Komponente nicht explizit angegeben ist, wird er ignoriert und der Systemaufrufpfad festgelegt. In unserem Fall ist dies:



POST / stream / ece4d4ac-3b46-4952-b0a6-8cf334074b99 / call - wo die Stream-ID vorhanden ist, mit dem Anforderungshauptteil:



{
    "accountId": 1
}


Als Antwort erhalten wir, wenn die Phasen der Verarbeitung der Anfrage korrekt funktioniert haben, eine flache Struktur von Datensätzen der Tabellen account_data und account_info.



{
    "accountname": "test",
    "password": "test",
    "email": "test@test",
    "last_ip": "127.0.0.1",
    "banned": true,
    "premium_points": 1000,
    "premium_type": 1
}


Die Spezifität des JdbcOutboundGateway-Adapters ist so, dass bei Angabe des Parameters update-query ein zusätzlicher Handler registriert wird, der zuerst die Daten aktualisiert und erst dann über den Abfrageparameter abruft.



Wenn Sie dieselben Pfade manuell angeben, wird die Möglichkeit, Komponenten mit HttpInboundGateway als Zugriffspunkt auf einen Stream in mehreren Instanzen zu starten, aufgehoben, da das System die Registrierung eines ähnlichen Pfads nicht zulässt.



4) Sehen wir uns die Metriken mit der GET-Methode / stream / 2bf65d9d-97c6-4199-86aa-0c808c25071b /metrics an



Antwortinhalt
, / , / / :



[
    {
        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
        "channelName": "application.Rest Postgres stream_2bf65d9d-97c6-4199-86aa-0c808c25071b_jdbcReqChannel",
        "sendDuration": {
            "count": 1,
            "min": 153.414,
            "max": 153.414,
            "mean": 153.414,
            "standardDeviation": 0.0,
            "countLong": 1
        },
        "maxSendDuration": 153.414,
        "minSendDuration": 153.414,
        "meanSendDuration": 153.414,
        "meanSendRate": 0.001195117818082359,
        "sendCount": 1,
        "sendErrorCount": 0,
        "errorRate": {
            "count": 0,
            "min": 0.0,
            "max": 0.0,
            "mean": 0.0,
            "standardDeviation": 0.0,
            "countLong": 0
        },
        "meanErrorRate": 0.0,
        "meanErrorRatio": 1.1102230246251565E-16
    },
    {
        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
        "channelName": "application.2bf65d9d-97c6-4199-86aa-0c808c25071b.channel#2",
        "sendDuration": {
            "count": 1,
            "min": 0.1431,
            "max": 0.1431,
            "mean": 0.1431,
            "standardDeviation": 0.0,
            "countLong": 1
        },
        "maxSendDuration": 0.1431,
        "minSendDuration": 0.1431,
        "meanSendDuration": 0.1431,
        "meanSendRate": 0.005382436008121413,
        "sendCount": 1,
        "sendErrorCount": 0,
        "errorRate": {
            "count": 0,
            "min": 0.0,
            "max": 0.0,
            "mean": 0.0,
            "standardDeviation": 0.0,
            "countLong": 0
        },
        "meanErrorRate": 0.0,
        "meanErrorRatio": 0.0
    },
    {
        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
        "channelName": "application.Rest Postgres stream_2bf65d9d-97c6-4199-86aa-0c808c25071b_jdbcRepChannel",
        "sendDuration": {
            "count": 1,
            "min": 0.0668,
            "max": 0.0668,
            "mean": 0.0668,
            "standardDeviation": 0.0,
            "countLong": 1
        },
        "maxSendDuration": 0.0668,
        "minSendDuration": 0.0668,
        "meanSendDuration": 0.0668,
        "meanSendRate": 0.001195118373693797,
        "sendCount": 1,
        "sendErrorCount": 0,
        "errorRate": {
            "count": 0,
            "min": 0.0,
            "max": 0.0,
            "mean": 0.0,
            "standardDeviation": 0.0,
            "countLong": 0
        },
        "meanErrorRate": 0.0,
        "meanErrorRatio": 1.1102230246251565E-16
    }
]




Fazit



So wurde gezeigt, wie Sie mit etwas mehr Zeit und Mühe eine Anwendung für die Integration in verschiedene Systeme schreiben können, als jedes Mal zusätzliche manuelle Handler (Pipelines) in Ihre Anwendung für die Integration mit anderen Systemen mit jeweils 200-500 Codezeilen zu schreiben.



Im aktuellen Beispiel können Sie die Arbeit desselben Threadtyps für mehrere Instanzen mithilfe eindeutiger Bezeichner parallelisieren, um Kollisionen im globalen Kontext der Anwendung zwischen Threadabhängigkeiten (Bins, Kanäle usw.) zu vermeiden.



Außerdem können Sie das Projekt entwickeln:



  • Streams in der Datenbank speichern;
  • Unterstützung für alle Integrationskomponenten, die uns die Spring- und Spring-Integration-Community bietet;
  • Arbeiter, die nach einem Zeitplan mit Threads arbeiten würden;
  • Erstellen Sie eine vernünftige Benutzeroberfläche zum Konfigurieren von Streams mit einem bedingten "Maus- und Komponentenwürfel" (das Beispiel wurde übrigens für das Projekt github.com/spring-cloud/spring-cloud-dataflow-ui teilweise geschärft ).


Und noch einmal werde ich den Link zum Repository duplizieren .



All Articles