PySpark. Lösen des Problems, Sitzungen zu finden

Guten Tag, liebe Leser! Vor einigen Tagen las Anthony Molinaro das Buch „SQL. Eine Sammlung von Rezepten “In einem der Kapitel stieß ich auf ein Thema, das sich mit der Bestimmung von Anfang und Ende des Bereichs aufeinanderfolgender Werte befasste. Nachdem ich das Material kurz gelesen hatte, fiel mir sofort ein, dass ich diese Frage bereits als eine der Testaufgaben gesehen hatte, aber dann wurde das Thema als „Die Aufgabe, Sitzungen zu finden“ deklariert. Der Trick des technischen Interviews war nicht eine Überprüfung der durchgeführten Arbeit, sondern eine der Fragen des Interviewers, wie mit Spark ähnliche Werte erzielt werden können. Als ich mich auf das Interview vorbereitete, wusste ich nicht, dass das Unternehmen Apache Spark verwendet (oder vielleicht auch nicht ...), und sammelte daher zu diesem Zeitpunkt keine Informationen über ein neues Tool für mich. Es blieb nur die Hypothese aufzustellen, dass die gewünschte Lösung wie ein Skript sein könnte,die mit der Pandas-Bibliothek geschrieben werden kann. Obwohl sehr entfernt, habe ich das Ziel immer noch erreicht, aber ich habe es nicht geschafft, in dieser Organisation zu arbeiten.





Um fair zu sein, möchte ich darauf hinweisen, dass ich im Laufe der Jahre beim Erlernen von Apache Spark kaum Fortschritte gemacht habe. Trotzdem möchte ich den Lesern die Best Practices mitteilen, da viele Analysten auf dieses Tool überhaupt nicht gestoßen sind und andere möglicherweise ein ähnliches Interview führen. Wenn Sie ein Spark-Profi sind, können Sie in den Kommentaren zum Beitrag immer einen optimaleren Code vorschlagen.





Dies war eine Präambel. Fahren wir direkt mit der Analyse dieses Themas fort. Lassen Sie uns zuerst ein SQL-Skript schreiben. Aber zuerst erstellen wir eine Datenbank und füllen sie mit Werten. Da dies ein Demo-Beispiel ist, empfehle ich die Verwendung von SQLite. Diese Datenbank ist leistungsfähigeren "Kollegen im Shop" unterlegen, aber ihre Fähigkeiten zur Skriptentwicklung reichen uns in vollem Umfang aus. Um die obigen Operationen zu automatisieren, habe ich den folgenden Code in Python geschrieben.





#  
import sqlite3

#     
projects = [
    ('2020-01-01', '2020-01-02'),
    ('2020-01-02', '2020-01-03'),
    ('2020-01-03', '2020-01-04'),
    ('2020-01-04', '2020-01-05'),
    ('2020-01-06', '2020-01-07'),
    ('2020-01-16', '2020-01-17'),
    ('2020-01-17', '2020-01-18'),
    ('2020-01-18', '2020-01-19'),
    ('2020-01-19', '2020-01-20'),
    ('2020-01-21', '2020-01-22'),
    ('2020-01-26', '2020-01-27'),
    ('2020-01-27', '2020-01-28'),
    ('2020-01-28', '2020-01-29'),
    ('2020-01-29', '2020-01-30')
]

try:
    #  
    con = sqlite3.connect("projects.sqlite")
    #  
    cur = con.cursor()
    #  
    cur.execute("""CREATE TABLE IF NOT EXISTS projects (
                    proj_id INTEGER PRIMARY KEY AUTOINCREMENT,
                    proj_start TEXT,
                    proj_end TEXT)""")
    #  
    cur.executemany("INSERT INTO projects VALUES(NULL, ?,?)", projects)
    #  
    con.commit()
    #  
    cur.close()
except sqlite3.Error as err:
    print("  ", err)
finally:
    #  
    con.close()
    print("  ")

      
      



. DBeaver. , SQL .





, , , . , - . , . ().





select 
      p3.proj_group, 
      min(p3.proj_start) as date_start,
      max(p3.proj_end) as date_end,
      julianday(max(p3.proj_end))-julianday( min(p3.proj_end))+1 as delta
from
    (select 
	     p2.*,
	     sum(p2.flag)over(order by p2.proj_id) as proj_group
	from 
		(select 
		      p.proj_id , 
		      p.proj_start, 
		      p.proj_end, 
		      case 
		      when lag(p.proj_end)over(order by p.proj_id) = p.proj_start then 0 else 1 
		      end as flag
		from projects as p) as p2) as p3
group by p3.proj_group
      
      



, . . , : . , . , , lag. 0, 1. , . . , .  . , ( julianday SQLite). . Spark.





, Apache Spark         ,  Hadoop. Java, Scala R, Spark PySpark. . Google Colab, . - , . , .





Linux OpenJDK, Spark. . findspark. , .





SQLite , . , .





Spark , . , . -, , , -, . , “ Spark. ”, , , , .





, , SQL. : , ( datediff).





, . , - , , , SQL Spark. , , . .





from pyspark.sql.functions import lag
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Equivalent of Pandas.dataframe.shift() method
w = Window().partitionBy().orderBy(col("proj_id"))
df_dataframe = df.withColumn('lag', F.lag("proj_end").over(w))
#...
# Equivalent of SQL- CASE WHEN...THEN...ELSE... END
df_dataframe = df_dataframe.withColumn('flag',F.when(df_dataframe["proj_start"] == df_dataframe["lag"],0).otherwise(1))
#...
# Cumsum by column flag
w = Window().partitionBy().orderBy(col("proj_id"))
df_dataframe = df_dataframe.withColumn("proj_group", F.sum("flag").over(w))
#...
# Equivalent of SQL - GROUP BY
from pyspark.sql.functions import  min, max
df_group = df_dataframe.groupBy("proj_group").agg(min("proj_start").alias("date_start"), \
                                                  max("proj_end").alias("date_end"))
df_group = df_group.withColumn("delta", F.datediff(df_group.date_end,df_group.date_start))
df_group.show()
      
      



.





  1. , . . , “” , .





  2. Auch wenn Sie noch nie mit Spark zusammengearbeitet haben, ist dies kein Grund, die Konkurrenz um eine freie Stelle abzulehnen. Die Grundlagen von PySpark können in kurzer Zeit beherrscht werden, vorausgesetzt, der Hintergrund verfügt bereits über Programmiererfahrung mit der Pandas-Bibliothek.





  3. An Büchern über Spark mangelt es nicht.





Das ist alles. Alle Gesundheit, viel Glück und beruflichen Erfolg!








All Articles