Hallo! Ich möchte ein interessantes, aber leider kein Thema in der Spark-Dokumentation ausführlich behandeln: Wie trainiere ich ein Modell in PySpark ML auf einem Datensatz mit verschiedenen Datentypen (Zeichenfolgen und Zahlen)? Der Wunsch, diesen Artikel zu schreiben, wurde durch die Notwendigkeit verursacht, mehrere Tage im Internet nach dem erforderlichen Artikel mit dem Code zu suchen, da das offizielle Tutorial von Spark ein Beispiel für die Arbeit nicht nur mit den Zeichen eines Datentyps, sondern im Allgemeinen mit einem Zeichen, sondern auch Informationen zur Arbeitsweise bietet Mehrere Spalten, je unterschiedlicher Datentypen, es gibt keine. Nachdem ich mich jedoch eingehend mit den Funktionen von PySpark für die Arbeit mit Daten befasst hatte, gelang es mir, Arbeitscode zu schreiben und zu verstehen, wie alles abläuft, was ich mit Ihnen teilen möchte. Also volle Kraft voraus, Freunde!
Lassen Sie uns zunächst alle für die Arbeit erforderlichen Bibliotheken importieren, und dann werden wir den Code im Detail analysieren, damit jede "rostige Teekanne" mit Selbstachtung, wie ich übrigens kürzlich, alles verstehen werde:
#
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pyspark.sql.functions as sf
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
#other types of regression models
#
#from pyspark.ml.regression import LinearRegression
#from pyspark.ml.regression import RandomForestRegressor
#from pyspark.ml.regression import GeneralizedLinearRegression
#from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
Erstellen wir nun einen (lokalen) Spark-Kontext und eine Spark-Sitzung und überprüfen Sie, ob alles funktioniert, indem Sie es auf dem Bildschirm anzeigen. Das Erstellen einer Spark-Sitzung ist der Ausgangspunkt für die Arbeit mit Datasets in Spark:
#
sc = SparkContext('local')
spark = SparkSession(sc)
spark
Es gibt ein Tool zum Arbeiten mit Daten. Laden Sie es jetzt. Der Artikel verwendet einen Datensatz, der von der Website des Kaggle-Wettbewerbs für maschinelles Lernen stammt:
https://www.kaggle.com/unitednations/international-greenhouse-gas-emissions
, der nach dem Herunterladen in path_csv im CSV-Format gespeichert wird und folgende Optionen bietet:
- Header: Wenn in unserer Datei die erste Zeile ein Header ist, setzen Sie "true".
- Trennzeichen: Wir setzen ein Zeichen, das die Daten einer Zeile durch Zeichen trennt. Oft ist es "," oder ";"
- inferSchema: Wenn true, erkennt PySpark automatisch den Typ jeder Spalte, andernfalls müssen Sie ihn selbst schreiben
# .csv path_csv
path_csv = 'greenhouse_gas_inventory_data_data.csv'
data = spark.read.format("csv")\
.option("header", "true")\
.option("delimiter", ",")\
.option("inferSchema", "true")\
.load(path_csv)
Um besser zu verstehen, mit welcher Art von Daten wir es zu tun haben, schauen wir uns einige ihrer Zeilen an:
#
data.show()
Mal sehen, wie viele Zeilen wir im Datensatz haben:
#
data.select('year').count()
Lassen Sie uns abschließend die Arten unserer Daten ableiten, die PySpark, wie wir uns erinnern, mit der Option ("inferSchema", "true") automatisch ermitteln sollte:
#
data.printSchema()
Fahren wir nun mit unserem Hauptgericht fort - der Arbeit mit verschiedenen Zeichen unterschiedlicher Datentypen. Spark kann das Modell anhand der transformierten Daten trainieren, wobei die vorhergesagte Spalte ein Vektor ist und die Spalten mit Features auch ein Vektor sind, was die Aufgabe kompliziert. Wir geben jedoch nicht auf. Um das Modell in PySpark zu trainieren, verwenden wir Pipeline, in die wir einen bestimmten Aktionsplan (Variable) übergeben Stufen):
- Schritt label_stringIdx: Wir transformieren die Spalte des Wertdatensatzes, den wir vorhersagen möchten, in eine Spark-Vektorzeichenfolge und benennen sie um, um sie mit dem Parameter handleInvalid = 'keep' zu kennzeichnen. Dies bedeutet, dass unsere vorhergesagte Spalte null unterstützt
- stringIndexer-Schritt: Konvertieren Sie String-Spalten in kategoriale Spark-Strings
- encoder: ()
- assembler: Spark, , VectorAssembler(), ( ) (assemblerInputs) «features»
- gbt: PySpark ML GBTRegressor,
#value - -
stages = []
label_stringIdx = StringIndexer(inputCol = 'value', outputCol = 'label', handleInvalid = 'keep')
stages += [label_stringIdx]
#depend on categorical columns: country and types of emission
# :
categoricalColumns = ['country_or_area', 'category']
for categoricalCol in categoricalColumns:
#
stringIndexer = StringIndexer(inputCol = categoricalCol,
outputCol = categoricalCol + 'Index',
handleInvalid = 'keep')
encoder = OneHotEncoder(inputCol=stringIndexer.getOutputCol(),
outputCol=categoricalCol + "classVec")
stages += [stringIndexer, encoder]
# :
numericCols = ['year']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
# - -
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]
Teilen wir unseren Datensatz in Trainings- und Testmuster im bevorzugten Verhältnis von 70% zu 30% auf und beginnen mit dem Training des Modells mithilfe eines Gradientenregressions-Boosting-Baums (GBTRegressor), der den "Label" -Vektor basierend auf den zuvor in einem "Features" -Vektor kombinierten Merkmalen vorhersagen sollte mit iterierbarem Grenzwert maxIter = 10:
# (30% )
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# ( )
gbt = GBTRegressor(labelCol="label", featuresCol="features", maxIter=10)
stages += [gbt]
# stages
pipeline = Pipeline(stages=stages)
Und jetzt müssen wir dem Computer nur noch einen Aktionsplan und einen Trainingsdatensatz senden:
#
model = pipeline.fit(trainingData)
#
predictions = model.transform(testData)
Speichern wir unser Modell, damit wir es ohne erneutes Training wieder verwenden können:
#
pipeline.write().overwrite().save('model/gbtregr_model')
Und wenn Sie das trainierte Modell erneut für Vorhersagen verwenden möchten, schreiben Sie einfach:
#
load_model = pipeline.read().load('model/gbtregr_model')
Daher haben wir uns angesehen, wie in einem Tool für die Arbeit mit Big Data in der Python-Sprache PySpark die Arbeit mit mehreren Feature-Spalten unterschiedlicher Datentypen implementiert wird.
Jetzt ist es Zeit, dies auf Ihre Modelle anzuwenden ...