Einer davon ist Azure Service Bus, und heute möchte ich über die Funktionen der Verwendung in einer regulären Spring Boot-Anwendung sprechen.
Wenn Sie mehr über die
Was ist Azure Service Bus?
Ein paar Worte zu Azure Service Bus ist ein Cloud-Nachrichtenbroker (Cloud-Ersatz für RabbitMQ, ActiveMQ). Unterstützt Warteschlangen (die Nachricht wird an einen Empfänger gesendet) und Themen (der Publish / Subscribe-Mechanismus) - hier detaillierter.
Support wird deklariert:
- Geordnete Nachrichten - Die Dokumentation besagt, dass dies ein FIFO ist, ABER es wird unter Verwendung des Konzepts von Nachrichtensitzungen implementiert - eine Gruppe von Nachrichten, nicht die gesamte Warteschlange. Wenn Sie die Reihenfolge der Nachrichten garantieren müssen, kombinieren Sie Nachrichten zu einer Gruppe, und jetzt werden Nachrichten in der Gruppe als FIFO zugestellt. Die Azure Service Bus-Warteschlange ist also kein FIFO. Sie übermittelt Ihre Nachrichten so zufällig, wie es Ihnen passt
- Dead-Letter-Warteschlange - hier ist alles einfach, sie konnten die Nachricht nach N Versuchen oder einer bestimmten Zeit nicht erfolgreich zustellen - wurde zu DLQ verschoben
- Geplante Lieferung - Sie können eine Verzögerung vor der Lieferung festlegen
- Zurückstellen von Nachrichten - Blendet Nachrichten in der Warteschlange aus. Die Nachricht wird nicht automatisch zugestellt, kann jedoch anhand der ID abgerufen werden. Wir müssen diese ID irgendwo speichern
Integration in Azure Service Bus
Azure Service Bus unterstützt AMQP 1.0, was bedeutet, dass es nicht mit RabbitMQ-Clients kompatibel ist. bunny verwendet AMQP 0.9.1
Der einzige "Standard" -Client, der mit dem Service Bus arbeiten kann, ist Apache Qpid .
Es gibt drei Möglichkeiten, Ihre Spring Boot-Anwendung mit Service Bus zu koppeln:
- JMS + QPID — , — QPID — .
timeout producer — — factory.setCacheProducers(false); - Spring Cloud — Azure Service Bus — , . Service Bus
( 1.2.6) — , azure service bus java sdk.
Spring Integration — , «Scheduled delivery» «Message deferral» .
sdk, MessageAndSessionPump
- azure service bus java sdk — ,
Spring Cloud — Azure Service Bus
Ich werde auf diese Methode näher eingehen und Ihnen erläutern, welche Funktionen die Verwendung der
Beispielanwendung im offiziellen Repository bietet. Es macht also keinen Sinn, den Code zu duplizieren - das Repository mit einem Beispiel befindet sich hier .
weil Es ist Spring Integration Messaging. Alles hängt von Channel, MessageHandler, MessagingGateway und ServiceActivator ab.
Und dann gibt es noch die ServiceBusQueueTemplate .
Nachrichten senden
Wir müssen einen Kanal haben, in den wir die Nachricht schreiben, die wir senden möchten. Am anderen Ende befindet sich ein MessageHandler , der sie an den Service Bus sendet.
Der MessagHandler ist com.microsoft.azure.spring.integration.core.DefaultMessageHandler - dies ist der Konnektor zum externen Dienst.
Wie binde ich es an einen Kanal? - Fügen Sie die Anmerkung hinzu - @ServiceActivator (inputChannel = OUTPUT_CHANNEL) und jetzt hört unser MessagHandler den OUTPUT_CHANNEL- Kanal .
Als nächstes müssen wir unsere Nachricht irgendwie in den Kanal schreiben - hier wieder die Magie des Frühlings - wir kündigen MessagingGateway an und binden sie mit Namen an den Kanal.
Ein Ausschnitt aus dem Beispiel :
@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
public interface QueueOutboundGateway {
void send(String text);
}
Das ist alles: Gateway -> Kanal -> MessagHandler -> ServiceBusQueueTemplate -> ServiceBusMessageConverter .
Im Code bleibt es, unser Gateway zu injizieren und die send- Methode aufzurufen .
Ich habe ServiceBusMessageConverter aus einem bestimmten Grund in der Aufrufkette erwähnt. Wenn Sie der Nachricht benutzerdefinierte Header (z. B. CORRELATION_ID) hinzufügen möchten, müssen diese von org.springframework.messaging.MessageHeaders in die Azure- Nachricht verschoben werden .
Die spezielle Methode setCustomHeaders .
In diesem Fall sieht Ihr Gateway ungefähr so aus:
@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
public interface QueueOutboundGateway {
void send(@Payload String text, @Header("CORRELATION_ID") String correlationId);
}
Nachrichten empfangen
Okay, wir wissen, wie man Nachrichten sendet, wie man sie jetzt erhält?
Hier ist alles gleich - MessageProducer -> Channel -> Handler
Der MessageProducer ist com.microsoft.azure.spring.integration.servicebus.inbound.ServiceBusQueueInboundChannelAdapter - dies ist unser Anschluss an einen externen Service. Im selben ServiceBusQueueTemplate mit ServiceBusMessageConverter, in dem Sie benutzerdefinierte Header lesen und in die Spring-Integrationsnachricht einfügen können.
Der Kanal ist bereits von Hand darin installiert:
@Bean
public ServiceBusQueueInboundChannelAdapter queueMessageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
ServiceBusQueueOperation queueOperation) {
queueOperation.setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.MANUAL).build());
ServiceBusQueueInboundChannelAdapter adapter = new ServiceBusQueueInboundChannelAdapter(QUEUE_NAME,
queueOperation);
adapter.setOutputChannel(inputChannel);
return adapter;
}
Der Handler selbst ist jedoch über @ServiceActivator mit dem Kanal verbunden .
@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
String message = new String(payload);
.......
Sie können sofort die Zeile erhalten:
@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(String payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
.......
Möglicherweise haben Sie einen seltsamen Checkpointer-Checkpointer- Parameter bemerkt, mit dem die Nachrichtenverarbeitung manuell bestätigt wird.
Wenn Sie beim Erstellen des ServiceBusQueueInboundChannelAdapter CheckpointMode.MANUAL festlegen, müssen Sie die Nachricht selbst bestätigen.
Wenn Sie CheckpointMode.RECORD verwenden, wird automatisch eine Bestätigung gesendet - Details im ServiceBusQueueTemplate- Code .
Nutzungsmerkmale
Also die Liste der "Rechen" und "Chips", auf die wir schon gegangen sind.
ReceiveMode.PEEKLOCK
Der Azure Service Bus unterstützt den PEEKLOCK- Modus. Der Verbraucher nimmt eine Nachricht entgegen, sperrt sich in den Service Bus ein, ist für eine bestimmte Zeit ( Sperrdauer) für niemanden zugänglich, wird jedoch nicht daraus gelöscht. Wenn der Verbraucher innerhalb der vorgegebenen Zeit keine Bestätigung der Verarbeitung gesendet hat - Erfolg / Abbruch oder die Sperre nicht erweitert hat -, wird die Nachricht als wieder verfügbar betrachtet und ein neuer Zustellversuch unternommen.
Interessanterweise setzt Abbruch einfach die Sperre zurück und die Nachricht wird sofort für die erneute Zustellung verfügbar.
ServiceBusQueueTemplate erstellt standardmäßig den QueueClient- Modus ReceiveMode.PEEKLOCK .
Wenn eine nicht behandelte Ausnahme in unserem Handler fliegt- Es wird keine Bestätigung an den Server gesendet und die Nachricht bleibt gesperrt und wird per Timeout erneut zugestellt.
In diesem Fall erhöht sich der Lieferzähler, was logisch ist.
Ich weiß nicht, ob dies ein Fehler oder eine Funktion ist - aber es ist sehr praktisch, eine Verzögerung zwischen den erneuten Versuchen für Situationen vorzunehmen, in denen dies erforderlich ist.
Wenn die Nachricht auch bei einem erneuten Versuch nicht verarbeitet werden kann, müssen Ausnahmen abgefangen und die Nachricht als verarbeitet markiert und der Anwendung zusätzliche Logik hinzugefügt werden. Andernfalls wird sie immer wieder zugestellt, bis das Limit für die Anzahl der erneuten Zustellungen erreicht ist (konfiguriert beim Erstellen einer Warteschlange im Servicebus) )
Anzahl der Parallelitäts- und Prefetch-Nachrichten
Wie Sie vielleicht schon erraten haben, die Parallelität Einstellung für die Anzahl der parallelen Meldungshandler verantwortlich, und die Prefetch - Nachrichtenzahl ist , wie viele Nachrichten , die wir in den Puffer vom Server erhalten wird.
Standardmäßig ist die ServiceBusQueueTemplate ist automatisch konfiguriert (AzureServiceBusQueueAutoConfiguration) mit einem Wert von 1 für beide Parameter, das heißt Standardmäßig hat jede Warteschlange einen Verarbeitungsthread, obwohl das Konzept eines Servicebusses mit Bestätigung für jede einzelne Nachricht viele gleichzeitige Handler impliziert. Dies ist umso wichtiger, wenn Sie eine lange Anforderungsverarbeitung haben.
Leider können diese Einstellungen nicht über die Anwendungskonfiguration (application.yml / application.properties) festgelegt werden und können nur im Code festgelegt werden. Aber selbst durch Code können Sie keine unterschiedlichen Einstellungen für unterschiedliche Warteschlangen vornehmen.
Wenn Sie unterschiedliche Einstellungen vornehmen müssen, müssen Sie daher für jeden ServiceBusQueueInboundChannelAdapter mehrere ServiceBusQueueTemplate-Beans erstellen
CompletableFuture in Azure Service Bus Java SDK
Der Azure Service Bus Java SDK selbst ist um CompletableFuture und CachedThreadPool Executor - MessagingFactory.INTERNAL_THREAD_POOL implementiert. Seien Sie also vorsichtig mit allen Arten von lokalen Thread-Beans
Bestellte Nachrichten
Wir verwenden den Servicebus als Jobwarteschlange - einige Jobs hängen voneinander ab und müssen daher in der Reihenfolge ausgeführt werden, in der sie erstellt wurden.
Wie oben erwähnt, verwenden T-Shirts das Konzept von Nachrichtensitzungen - wenn Nachrichten nach Schlüssel zu einer Sitzung gruppiert werden (im Header übertragen), besteht die Sitzung, solange mindestens eine Nachricht mit dem Sitzungsschlüssel vorhanden ist - ausführlich in der Dokumentation
Service Bus garantiert die Zustellung von Nachrichten innerhalb einer solchen Gruppe in der Reihenfolge des Hinzufügens zu Server (d. h. in der Reihenfolge, in der der Service-Bus-Server sie in das Repository geschrieben hat).
Erwähnenswert ist auch, wenn Sie eine sitzungsfähige Warteschlange erstellt haben. Dies bedeutet, dass alle Nachrichten einen Header mit einem Sitzungsschlüssel haben müssen.
Wir waren sofort sehr zufrieden mit der Möglichkeit, dass der Servicebus Nachrichten in einer FIFO-Warteschlange anordnet - allerdings für eine Gruppe von Nachrichten.
Aber nach einer Weile bemerkten wir Probleme:
- Einige Nachrichten kamen unendlich oft an
- Die Warteschlangenverarbeitung wurde verlangsamt
- In der Service-Bus-Statistik wird die Hälfte der Anforderungen als fehlgeschlagen markiert, und fehlgeschlagene Anforderungen werden im Leerlauf sogar in einer leeren Warteschlange angezeigt
Beim Betrachten des SDK-Codes haben wir die Besonderheit der Arbeit mit Sitzungen herausgefunden:
- Der Verbraucher erfasst die Sitzung und beginnt, alle verfügbaren Nachrichten darin zu lesen
- Gleichzeitig wird die Anzahl der Sitzungen entsprechend dem Parallelitätsparameter verarbeitet
- unhandled exception — 1 ( ) — re-delivery ? 0 exception — ttl .
- — success abandon. — delay re-delivery
.. abandon — , delivery counter .
delivery count
Infolgedessen haben sie diese Servicebusfunktion aufgegeben und ein Fahrrad geschrieben, und der Servicebus fungiert als Auslöser.
Sobald die Warteschlange für aktivierte Sitzungen abgebrochen wurde, verschwanden die Fehler in der Statistik, die Anforderung an den Servicebus.
Im JMS + Qpid-Bundle ist diese Funktionalität nicht verfügbar.
Mögliche Probleme mit Warteschlangengrößen größer als 1G
Habe mich noch nicht getroffen, habe aber gehört, dass es instabil zu funktionieren beginnt, wenn die Warteschlangengröße mehr als 1 G beträgt.
Wenn Sie darauf stoßen oder umgekehrt, funktioniert alles - schreiben Sie in die Kommentare.
Probleme bei der Rückverfolgung von Anfragen
Der standardmäßige Azure Application Insights Agent kann das Senden von Nachrichten nicht als Abhängigkeit und eingehende Nachrichten als Anforderungen verfolgen.
Ich musste Code hinzufügen.
Ergebnis
Wenn Sie eine Jobwarteschlange mit einer langen Nachrichtenverarbeitungszeit benötigen und keine Warteschlange benötigen, können Sie diese verwenden.
Wenn die Nachrichtenverarbeitung schnell ist, verwenden Sie Azure Event Hub - reguläres Kafka. Der Standardclient funktioniert einwandfrei.