Übersicht über die neue Benutzeroberfläche für strukturiertes Streaming in Apache Spark ™ 3.0

Die Übersetzung des Artikels wurde am Vorabend des Beginns des Data Engineer- Kurses vorbereitet .










Strukturiertes Streaming wurde erstmals in Apache Spark 2.0 eingeführt. Diese Plattform hat sich als beste Wahl für die Erstellung verteilter Streaming-Anwendungen etabliert. Die Vereinheitlichung der SQL / Dataset / DataFrame-API und der integrierten Spark-Funktionen erleichtert Entwicklern die Implementierung ihrer komplexen Grundlagen wie Streaming-Aggregation, Stream-Stream-Join und Fensterunterstützung erheblich. Seit der Veröffentlichung von Structured Streaming ist es eine beliebte Anfrage von Entwicklern, die Streaming-Kontrolle zu verbessern, genau wie wir es bei Spark Streaming (wie DStream) getan haben. In Apache Spark 3.0 haben wir eine neue Benutzeroberfläche für strukturiertes Streaming veröffentlicht.



Das neue strukturierte Streaming auf der Benutzeroberfläche bietet eine einfache Möglichkeit, alle Streaming-Jobs mit umsetzbaren Erkenntnissen und Statistiken zu überwachen. Dies erleichtert die Behebung von Problemen beim Debuggen und verbessert die Sichtbarkeit der Produktion mit Echtzeitmetriken. Die Benutzeroberfläche enthält zwei Statistiksätze: 1) aggregierte Informationen zu einem Streaming-Abfragejob und 2) detaillierte statistische Informationen zu Streaming-Anforderungen, einschließlich Eingaberate, Prozessrate, Eingabereihen, Stapeldauer, Betriebsdauer usw.



Aggregierte Informationen zu Streaming-Abfragejobs



Wenn ein Entwickler eine Streaming-SQL-Abfrage sendet, wird diese auf der Registerkarte Strukturiertes Streaming angezeigt, die sowohl aktive als auch abgeschlossene Streaming-Abfragen enthält. Die Ergebnistabelle enthält einige grundlegende Informationen zu Streaming-Anforderungen, einschließlich Anforderungsname, Status, ID, Ausführungs-ID, Sendezeit, Anforderungsdauer, ID des letzten Pakets sowie aggregierte Informationen wie durchschnittliche Empfangsrate und durchschnittliche Verarbeitungsrate. Es gibt drei Arten von Streaming-Anforderungsstatus: RUNNING, FINISHED und FAILED. Alle FINISHED- und FAILED-Anforderungen werden in der Tabelle mit den vollständigen Streaming-Anforderungen aufgeführt. In der Spalte Fehler werden die Details der Ausnahme für fehlgeschlagene Anforderungen angezeigt.







Wir können detaillierte Statistiken der Streaming-Anfrage anzeigen, indem wir auf den Link Run ID klicken.



Detaillierte statistische Informationen



Auf der Statistikseite werden Metriken wie Aufnahme- / Verarbeitungsrate, Latenz und detaillierte Betriebsdauer angezeigt, die hilfreich sind, um den Status Ihrer Streaming-Anforderungen zu verstehen und das Debuggen von Anomalien bei der Anforderungsverarbeitung zu vereinfachen.









Es enthält die folgenden Metriken:



  • Eingaberate : Aggregierte (über alle Quellen hinweg) Rate der Datenankunft.
  • Prozessrate : Die aggregierte Rate (über alle Quellen hinweg), mit der Spark Daten verarbeitet.
  • Batch Dauer : Die Dauer der einzelnen Charge.
  • Betriebsdauer : Die Zeit, die benötigt wird, um verschiedene Vorgänge in Millisekunden auszuführen.


Die überwachten Transaktionen sind unten aufgeführt:



  • addBatch: Zeitaufwand für das Lesen der Eingabedaten des Mikrobatches aus Quellen, deren Verarbeitung und das Schreiben der Ausgabedaten des zu synchronisierenden Stapels. Dies nimmt normalerweise den größten Teil der Mikrobatch-Zeit in Anspruch.
  • getBatch: Zeit, die benötigt wird, um eine logische Anforderung zum Lesen der Eingabedaten des aktuellen Mikropakets aus Quellen vorzubereiten.
  • getOffset: Zeitaufwand für die Frage nach Quellen, ob sie neue Eingaben haben.
  • walCommit: Schreibt Offsets in Metadatenprotokolle.
  • queryPlanning: Erstellen Sie einen Ausführungsplan.


Es ist zu beachten, dass nicht alle aufgelisteten Vorgänge in der Benutzeroberfläche angezeigt werden. Es gibt verschiedene Vorgänge mit unterschiedlichen Arten von Datenquellen, sodass einige der aufgelisteten Vorgänge in einer Streaming-Anforderung ausgeführt werden können.



Fehlerbehebung bei der Streaming-Leistung über die Benutzeroberfläche



In diesem Abschnitt werden einige Fälle betrachtet, in denen das neue strukturierte Streaming der Benutzeroberfläche darauf hinweist, dass etwas Außergewöhnliches passiert. Eine Demo-Anfrage auf hoher Ebene sieht folgendermaßen aus, und in jedem Fall gehen wir von einigen Voraussetzungen aus:



import java.util.UUID

val bootstrapServers = ...
val topics = ...
val checkpointLocation = "/tmp/temporary-" + UUID.randomUUID.toString

val lines = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", bootstrapServers)
    .option("subscribe", topics)
    .load()
    .selectExpr("CAST(value AS STRING)")
    .as[String]

val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()

val query = wordCounts.writeStream
    .outputMode("complete")
    .format("console")
    .option("checkpointLocation", checkpointLocation)
    .start()


Erhöhte Latenz aufgrund unzureichender Verarbeitungsleistung



Im ersten Fall führen wir eine Anfrage aus, um Apache Kafka-Daten so schnell wie möglich zu verarbeiten. Für jeden Stapel verarbeitet ein Streaming-Job alle verfügbaren Daten in Kafka. Wenn die Verarbeitungsleistung nicht ausreicht, um die Paketdaten zu verarbeiten, steigt die Latenz schnell an. Das intuitivste Urteil ist, dass die Eingabezeilen und die Stapeldauer linear wachsen. Der Parameter Input Rows gibt an, dass der Streaming-Job maximal 8000 Schreibvorgänge pro Sekunde verarbeiten kann. Die aktuelle Eingaberate beträgt jedoch ungefähr 20.000 Datensätze pro Sekunde. Wir können dem Threading-Job mehr Ressourcen zur Verfügung stellen, um ihn auszuführen, oder wir können genügend Partitionen hinzufügen, um alle Verbraucher zu verwalten, die erforderlich sind, um mit den Produzenten Schritt zu halten.







Stabile aber hohe Latenz



Wie unterscheidet sich dieser Fall vom vorherigen? Die Latenz erhöht sich nicht, bleibt aber stabil, wie im folgenden Screenshot gezeigt:







Wir haben festgestellt, dass die Prozessrate bei derselben Eingaberate stabil bleiben kann. Dies bedeutet, dass die Verarbeitungsleistung des Jobs ausreicht, um die Eingabedaten zu verarbeiten. Die Verarbeitungszeit für jede Charge, d. H. Die Verzögerung, beträgt jedoch immer noch 20 Sekunden. Der Hauptgrund für die hohe Latenz sind zu viele Daten in jeder Charge. Normalerweise können wir die Latenz reduzieren, indem wir die Parallelität dieses Jobs erhöhen. Nach dem Hinzufügen von 10 weiteren Kafka-Partitionen und 10 Kernen für Spark-Aufgaben ergab sich eine Latenz von etwa 5 Sekunden - viel besser als 20 Sekunden.







Verwenden Sie die Tabelle zur Betriebsdauer zur Fehlerbehebung



Das Diagramm Operationsdauer zeigt die Zeit an, die für die Ausführung verschiedener Operationen in Millisekunden aufgewendet wurde. Dies ist nützlich, um das Timing jeder Charge zu verstehen und die Fehlerbehebung zu vereinfachen. Verwenden wir als Beispiel die Arbeit zur Leistungsverbesserung " SPARK-30915 : Vermeiden Sie das Lesen der Metadatenprotokolldatei, wenn Sie nach der neuesten Stapel-ID suchen" in der Apache Spark-Community.

Vor dieser Verbesserung dauerte jeder nachfolgende Stapel nach der Komprimierung länger als andere Stapel, wenn das komprimierte Metadatenprotokoll sehr groß wird.







Nach Prüfung des Codes wurde unnötiges Lesen der komprimierten Protokolldatei gefunden und behoben. Das folgende Diagramm zur Betriebsdauer bestätigt den erwarteten Effekt:







Pläne für die Zukunft



Wie oben gezeigt, hilft das neue strukturierte UI-Streaming Entwicklern, ihre Streaming-Jobs besser zu steuern, indem es viel nützlichere Informationen zu Streaming-Anforderungen enthält. Als frühe Version befindet sich die neue Benutzeroberfläche noch in der Entwicklung und wird in zukünftigen Versionen verbessert. Es gibt verschiedene Funktionen, die in nicht allzu ferner Zukunft implementiert werden können, einschließlich, aber nicht beschränkt auf Folgendes:



  • Weitere Informationen zur Ausführung von Streaming-Abfragen: verspätete Daten, Wasserzeichen, Datenstatusmetriken und mehr.
  • Unterstützung der strukturierten Streaming-Benutzeroberfläche auf dem Spark History Server.
  • Weitere auffällige Hinweise auf ungewöhnliches Verhalten: Latenz usw.


Probieren Sie eine neue Benutzeroberfläche aus



Probieren Sie diese neue Spark-Streaming-Benutzeroberfläche in Apache Spark 3.0 in der neuen Databricks Runtime 7.1 aus. Wenn Sie Databricks-Notizbücher verwenden, können Sie auf diese Weise auch auf einfache Weise den Status einer Streaming-Anfrage im Notizbuch anzeigen und Ihre Anfragen verwalten . Sie können sich bei Databricks für ein kostenloses Konto anmelden und innerhalb von Minuten kostenlos und ohne Kreditinformationen loslegen .






Die Datenqualität in DWH ist die Konsistenz des Data Warehouse. kostenloses Webinar.






Literatur-Empfehlungen:



Data Build Tool oder was Data Warehouse und Smoothie gemeinsam haben

Delta Lake Dive: Schema-Durchsetzung und Evolution

Hochgeschwindigkeits-Apache-Parkett in Python mit Apache Arrow



All Articles