FunkenschemaEvolution in der Praxis

Liebe Leser, guten Tag!



In diesem Artikel beschreibt der führende Berater des Geschäftsbereichs Big Data Solutions von Neoflex ausführlich die Optionen zum Erstellen von Storefronts mit variabler Struktur mithilfe von Apache Spark.



Im Rahmen eines Datenanalyseprojekts entsteht häufig die Aufgabe, Marts auf Basis lose strukturierter Daten zu erstellen.



Normalerweise sind dies Protokolle oder Antworten von verschiedenen Systemen, die als JSON oder XML gespeichert sind. Die Daten werden auf Hadoop hochgeladen, und Sie müssen daraus ein Schaufenster erstellen. Wir können den Zugriff auf die erstellte Storefront beispielsweise über Impala organisieren.



In diesem Fall ist das Layout der Ziel-Storefront bisher unbekannt. Darüber hinaus kann das Diagramm immer noch nicht im Voraus erstellt werden, da es von den Daten abhängt, und wir haben es mit diesen sehr schwach strukturierten Daten zu tun.



Zum Beispiel wird heute die folgende Antwort protokolliert:



{source: "app1", error_code: ""}


und morgen kommt die folgende Antwort aus demselben System:



{source: "app1", error_code: "error", description: "Network error"}


Infolgedessen sollte der Storefront-Beschreibung ein weiteres Feld hinzugefügt werden, und niemand weiß, ob es kommen wird oder nicht.



Die Aufgabe, einen Mart für solche Daten zu erstellen, ist ziemlich Standard, und Spark verfügt über eine Reihe von Tools dafür. Sowohl JSON als auch XML werden zum Parsen von Rohdaten unterstützt, und die Unterstützung von schemaEvolution wird für ein zuvor unbekanntes Schema bereitgestellt.



Auf den ersten Blick sieht die Lösung einfach aus. Wir müssen einen Ordner mit JSON nehmen und ihn in einen Datenrahmen einlesen. Spark erstellt ein Schema und wandelt die verschachtelten Daten in Strukturen um. Dann muss alles in Parkett gespeichert werden, was auch in Impala unterstützt wird, indem das Schaufenster im Hive-Metastore registriert wird.



Alles scheint einfach zu sein.



Aus den kurzen Beispielen in der Dokumentation geht jedoch nicht hervor, was mit einer Reihe von Problemen in der Praxis zu tun ist.



In der Dokumentation wird ein Ansatz beschrieben, der nicht zum Erstellen einer Storefront, sondern zum Einlesen von JSON oder XML in einen Datenrahmen dient.



Es wird nämlich einfach angegeben, wie JSON gelesen und analysiert wird:



df = spark.read.json(path...)


Dies reicht aus, um die Daten für Spark verfügbar zu machen.



In der Praxis ist das Szenario viel komplizierter als nur das Lesen von JSON-Dateien aus einem Ordner und das Erstellen eines Datenrahmens. Die Situation sieht folgendermaßen aus: Es gibt bereits ein bestimmtes Schaufenster, jeden Tag kommen neue Daten, sie müssen dem Schaufenster hinzugefügt werden, nicht zu vergessen, dass das Schema möglicherweise anders ist.



Das übliche Schema zum Erstellen einer Storefront lautet wie folgt:



Schritt 1. Die Daten werden in Hadoop geladen, anschließend täglich neu geladen und einer neuen Partition hinzugefügt. Es stellt sich heraus, dass der Ordner mit den anfänglichen Daten nach Tagen aufgeteilt ist.



Schritt 2.Während des Initialisierungsstarts wird dieser Ordner von Spark gelesen und analysiert. Der resultierende Datenrahmen wird in einem für die Analyse verfügbaren Format gespeichert, z. B. in Parkett, das dann in Impala importiert werden kann. Dadurch wird eine Zielpräsentation mit allen Daten erstellt, die sich bis zu diesem Zeitpunkt angesammelt haben.



Schritt 3. Es wird ein Download erstellt, der die Storefront jeden Tag aktualisiert.

Es stellt sich die Frage nach dem inkrementellen Laden, der Notwendigkeit der Aufteilung des Schaufensters und der Frage nach der Unterstützung des allgemeinen Schemas des Schaufensters.



Geben wir ein Beispiel. Angenommen, der erste Schritt zum Erstellen des Speichers wird implementiert und der Export von JSON-Dateien in einen Ordner konfiguriert.



Es ist kein Problem, daraus einen Datenrahmen zu erstellen und ihn dann als Schaufenster zu speichern. Dies ist der allererste Schritt, den Sie leicht in der Spark-Dokumentation finden können:



df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)


Alles scheint in Ordnung zu sein.



Wir lesen und analysieren den JSON, speichern dann den Datenrahmen als Parkett und registrieren ihn auf bequeme Weise bei Hive:



df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')


Wir bekommen ein Schaufenster.



Am nächsten Tag wurden jedoch neue Daten aus der Quelle hinzugefügt. Wir haben einen Ordner mit JSON und ein Schaufenster, das auf diesem Ordner basiert. Nach dem Laden des nächsten Datenblocks aus der Quelle gehen dem Datamart für einen Tag die Daten aus.



Eine logische Lösung wäre, die Storefront nach Tag zu partitionieren, sodass jeden nächsten Tag eine neue Partition hinzugefügt werden kann. Der Mechanismus hierfür ist ebenfalls bekannt. Mit Spark können Sie Partitionen separat schreiben.



Zuerst laden wir die Initialisierung, speichern die Daten wie oben beschrieben und fügen nur die Partitionierung hinzu. Diese Aktion wird als Storefront-Initialisierung bezeichnet und nur einmal ausgeführt:



df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)


Am nächsten Tag laden wir nur eine neue Partition:



df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")


Sie müssen sich lediglich erneut bei Hive registrieren, um das Schema zu aktualisieren.

Hier treten jedoch Probleme auf.



Erstes Problem. Früher oder später kann das resultierende Parkett nicht gelesen werden. Dies hängt damit zusammen, wie unterschiedlich Parkett und JSON leere Felder angehen.



Betrachten wir eine typische Situation. Zum Beispiel kommt JSON gestern an:



 1: {"a": {"b": 1}},


und heute sieht der gleiche JSON so aus:



 2: {"a": null}


Angenommen, wir haben zwei verschiedene Partitionen mit jeweils einer Zeile.

Wenn wir die gesamten Rohdaten lesen, kann Spark den Typ bestimmen und verstehen, dass "a" ein Feld vom Typ "Struktur" mit einem verschachtelten Feld "b" vom Typ INT ist. Wenn jedoch jede Partition separat gespeichert wurde, wird ein Parkett mit inkompatiblen Partitionsschemata erhalten:



df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)


Diese Situation ist allgemein bekannt. Daher wurde eine spezielle Option hinzugefügt. Entfernen Sie beim Parsen der Anfangsdaten leere Felder:



df = spark.read.json("...", dropFieldIfAllNull=True)


In diesem Fall besteht Parkett aus Partitionen, die zusammen gelesen werden können.

Obwohl diejenigen, die dies in der Praxis getan haben, bitter lachen werden. Warum? Weil wahrscheinlich zwei weitere Situationen auftreten werden. Oder drei. Oder vier. Der erste, der mit ziemlicher Sicherheit auftreten wird, ist, dass numerische Typen in verschiedenen JSON-Dateien unterschiedlich aussehen. Zum Beispiel {intField: 1} und {intField: 1.1}. Wenn solche Felder in einem Teil gefunden werden, liest die Schema-Zusammenführung alles korrekt, was zum genauesten Typ führt. Wenn dies jedoch anders ist, hat einer intField: int und der andere intField: double.



Es gibt das folgende Flag, um diese Situation zu behandeln:



df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)


Jetzt haben wir einen Ordner, in dem sich Partitionen befinden, die in einen einzelnen Datenrahmen und ein gültiges Parkett für die gesamte Storefront eingelesen werden können. Ja? Nein.



Denken Sie daran, dass wir den Tisch in Hive registriert haben. Hive unterscheidet bei Feldnamen nicht zwischen Groß- und Kleinschreibung, während Parkett zwischen Groß- und Kleinschreibung unterscheidet. Daher sind Partitionen mit den Schemas: field1: int und Field1: int für Hive identisch, jedoch nicht für Spark. Denken Sie daran, die Feldnamen in Kleinbuchstaben zu schreiben.



Danach scheint alles in Ordnung zu sein.



Allerdings nicht alles so einfach. Ein zweites, ebenfalls bekanntes Problem tritt auf. Da jede neue Partition separat gespeichert wird, befinden sich die Spark-Dienstdateien im Partitionsordner, z. B. das Erfolgsflag _SUCCESS. Dies wird beim Versuch, Parkett zu verwenden, einen Fehler auslösen. Um dies zu vermeiden, müssen Sie die Konfiguration einrichten, indem Sie Spark daran hindern, dem Ordner Servicedateien hinzuzufügen:



hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")


Es scheint, dass jetzt jeden Tag eine neue Parkettpartition zum Ziel-Storefront-Ordner hinzugefügt wird, in der die analysierten Daten für den Tag gespeichert werden. Wir haben im Voraus sichergestellt, dass es keine Partitionen mit einem Datentypkonflikt gibt.



Aber vor uns liegt das dritte Problem. Jetzt ist das allgemeine Schema nicht bekannt, außerdem ist in Hive die Tabelle mit dem falschen Schema bekannt, da jede neue Partition höchstwahrscheinlich eine Verzerrung in das Schema einführte.



Sie müssen die Tabelle neu registrieren. Dies kann einfach erfolgen: Lesen Sie das Storefront-Parkett erneut, nehmen Sie das Schema und erstellen Sie eine darauf basierende DDL, mit der der Ordner in Hive als externe Tabelle neu registriert und das Ziel-Storefront-Schema aktualisiert wird.



Wir stehen vor einem vierten Problem. Als wir den Tisch zum ersten Mal registrierten, vertrauten wir auf Spark. Jetzt machen wir es selbst und Sie müssen sich daran erinnern, dass Parkettfelder mit Zeichen beginnen können, die für Hive nicht gültig sind. Beispielsweise wirft Spark Zeilen aus, die nicht in das Feld "korrupter_Datensatz" analysiert werden konnten. Ein solches Feld kann nicht ohne Flucht bei Hive registriert werden.



Wenn wir das wissen, erhalten wir das Schema:



f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)


Code ("_corrupt_record", "` _corrupt_record` ") +" "+ f [1] .replace (": "," `:"). Replace ("<", "<` "). Replace (", " , ",` "). replace (" array <`", "array <") macht DDL sicher, dh anstelle von:



create table tname (_field1 string, 1field string)


Bei Feldnamen wie "_field1, 1field" wird eine sichere DDL erstellt, in der Feldnamen maskiert werden: create table `tname` (` _field1` string, `1field` string).



Es stellt sich die Frage: Wie wird der Datenrahmen mit dem vollständigen Schema korrekt abgerufen (in pf-Code)? Wie bekomme ich diesen pf? Dies ist das fünfte Problem. Lesen Sie das Schema aller Partitionen aus dem Ordner mit Parkettdateien der Ziel-Storefront erneut? Dies ist die sicherste, aber schwierigste Methode.



Der Schaltplan befindet sich bereits in Hive. Sie können ein neues Schema erhalten, indem Sie das Schema der gesamten Tabelle und die neue Partition kombinieren. Sie müssen also das Tabellenschema aus Hive übernehmen und es mit dem neuen Partitionsschema kombinieren. Dies kann erreicht werden, indem die Testmetadaten aus Hive gelesen, in einem temporären Ordner gespeichert und beide Partitionen gleichzeitig mit Spark gelesen werden.



Grundsätzlich haben Sie alles, was Sie brauchen: das ursprüngliche Tabellenschema in Hive und eine neue Partition. Wir haben auch die Daten. Sie müssen lediglich ein neues Schema abrufen, das das Storefront-Schema und neue Felder aus der erstellten Partition kombiniert:



from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")


Als Nächstes erstellen wir die DDL zum Registrieren der Tabelle wie im vorherigen Snippet.

Wenn die gesamte Kette korrekt funktioniert, nämlich - es gab eine Initialisierungslast und in Hive gibt es eine korrekt erstellte Tabelle, erhalten wir ein aktualisiertes Tabellenschema.



Und das letzte Problem ist, dass Sie der Hive-Tabelle nicht einfach eine Partition hinzufügen können, da diese beschädigt wird. Sie müssen Hive zwingen, die Partitionsstruktur zu reparieren:



from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)


Die einfache Aufgabe, JSON zu lesen und daraus eine Storefront zu erstellen, führt zur Überwindung einer Reihe impliziter Schwierigkeiten, für die Sie separat nach Lösungen suchen müssen. Obwohl diese Lösungen einfach sind, dauert es lange, sie zu finden.



Um den Aufbau des Schaukastens umzusetzen, musste ich:



  • Fügen Sie der Storefront Partitionen hinzu, um Servicedateien zu entfernen
  • Behandeln Sie leere Felder in den Originaldaten, die Spark eingegeben hat
  • Wirf einfache Typen in Strings
  • Konvertieren Sie Feldnamen in Kleinbuchstaben
  • Separater Datendump und Tabellenregistrierung in Hive (DDL-Erstellung)
  • Denken Sie daran, Feldnamen zu umgehen, die möglicherweise nicht mit Hive kompatibel sind
  • Erfahren Sie, wie Sie die Registrierung einer Tabelle in Hive aktualisieren


Zusammenfassend stellen wir fest, dass die Entscheidung, Vitrinen zu bauen, mit vielen Fallstricken behaftet ist. Wenn daher Schwierigkeiten bei der Implementierung auftreten, ist es besser, einen erfahrenen Partner mit erfolgreichem Fachwissen zu kontaktieren.



Vielen Dank, dass Sie diesen Artikel gelesen haben. Wir hoffen, dass Sie die Informationen nützlich finden.



All Articles