Ich möchte alles über den Kunden wissen! Oder wie Sie trockene DWH-Fakten mit digitalen Pfaden und Client-Eigenschaften von Amplitude bereichern können

Das Unternehmens-Repository der Betting League wurde lange vor der Einführung von Amplitude erstellt . Es wird hauptsächlich von Analysten und Forschern verwendet. Produkte und Vermarkter wandten sich an Analysten, um Analysen aus dem Lager zu erhalten, da hierfür Programmierkenntnisse erforderlich sind.







DWH Facts hat es immer an einer digitalen Vision für Lebensmittel gefehlt, die Kunden ausspioniert und uns Einblick in ihre Wege gibt. Mit dem Aufkommen von Amplitude im Unternehmen haben wir begonnen, den Wert der im System gesammelten Daten zu verstehen, und es ist sehr cool, sie in Amplitude selbst zu verwenden, aber die Symbiose der beiden DWH- und Amplitude-Systeme hat keine Ruhe gegeben. Natürlich haben wir die Mechanismen der Datenübertragung von Amplitude für die interne Analyse in einem Unternehmenslager implementiert und Anweisungen zum Einrichten der Datenübertragung von Amplitude zu DWH gegeben. Wir laden Sie auch zum Webinar Betting League und Adventum über die Analyse und Optimierung von Conversions im Produkt ein .







Bild







Wie DWH-Datenaggregation helfen kann



1. . DWH, .







2. . .







3. . , API . .







Amplitude DWH



Amplitude API . . . , , , . . , , UTC — , .







. Python, SQL . ! Amplitude , .







, — Amplitude . , CSV, ETL .







ETL — Extract, Transform, Load. , , DWH .







. , . , , .







Python 3.7 . , flow- (, , dag), , Windows. .bat ( ). , .







1.



# 
import os
import requests
import pandas as pd
import zipfile
import gzip
import time
from tqdm import tqdm
import pymssql
import datetime
import pyodbc
from urllib.parse import quote_plus
from sqlalchemy import create_engine, event
      
      





2.



, , . , .







#   
os.chdir("C:\Agents\AMPL\TEMP") #    
dts1 = time.time() #       
a = time.time() #  
now = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") #    
      
      





3. API Amplitude



, (Settings => Project = > General).







#    API 
api_key = ''
secret_key = '  '
      
      





4. ()



, , . SQL , . yyyymmddThh (. . ). API , .







#     DWH (SQL)
server = " "
user = ""
password = ""

#     
conn = pymssql.connect(server, user, password, " ")
cursor = conn.cursor()
cursor.execute("   .    select")
      
      





5.



API Amplitude. .







#        
for row in cursor: 
    dt = row[0]
conn.close()   
      
      





6.



, . , , , , .







#   ,     
filename = 'AMPL_PROD_'+ dt + '_' + now

#  ,     \\  WIN
#      ,      os.chdir
working_dir = os.getcwd() + '\\'

      
      





7. SQL



SQL. , .







#    DWH (SQL). ,           
server = ' '
database = ' '
schema = ' '
table_to_export = ' '

#    DWH (SQL)
params = 'DRIVER= {SQL Server};SERVER='+server+';DATABASE='+database+';User='+user+';Password='+password+';'
quoted = quote_plus(params)
new_con = 'mssql+pyodbc:///?odbc_connect={}'.format(quoted)
      
      





8. Amplitude



Amplitude , , json .







#     API ,     json
class GetFile():

    def __init__(self, now, dt, api_key, secret_key, filename, working_dir):

        self.now = now
        self.dt = dt
        self.api_key = api_key
        self.secret_key = secret_key
        self.filename = filename
        self.working_dir = working_dir

    def response(self):
        """
           
        """
        print('   !', end='\n')
        count = 0
        while True:
            count += 1
            print(f' {count}.....', end='')
            try:
                response = requests.get('https://amplitude.com/api/2/export?start='+self.dt+'&end='+self.dt,
                                        auth=(self.api_key, self.secret_key),
                                        timeout=10)
                print('', end='\n') 
                print('1.    ', end='\n')
                break
            except:
                print('', end='\n')
                time.sleep(1)

        return response

    def download(self):
        '''
           
        '''
        with open(working_dir + self.filename + '.zip', "wb") as code:
            file = self.response()
            print('2.    .....', end='')           
            code.write(file.content)
        print('OK', end='\n')

    def extraction(self):
        '''
             
        '''
        z = zipfile.ZipFile(self.working_dir + self.filename + '.zip', 'r')
        z.extractall(path=self.working_dir + self.filename)
        print('3.         ' + self.filename)

    def getfilename(self):
        '''
            
        '''
        return ': {} \n : {}'.format(self.filename, self.working_dir + self.filename + '.zip')

def unzip_and_create_df(working_dir, filename):
        '''
         JSON.gz   json     (   )
         ,    .
        '''
        directory = working_dir + filename + '\\274440'
        files = os.listdir(directory)
        df = pd.DataFrame()
        print('  :')
        time.sleep(1)
        for i in tqdm(files):
            with gzip.open(directory + '\\' + i) as f:
                add = pd.read_json(f, lines=True)
            df = pd.concat([df, add], axis=0)
        time.sleep(1)    
        print('4. JSON         dataframe')
        return df

#    
file = GetFile(now, dt, api_key, secret_key, filename, working_dir)

#   (      )
file.download()

#  gz-   
file.extraction()

#   DataFrame    json.gz
adf = unzip_and_create_df(working_dir, filename)

      
      





9. ()



, . . , SQL. .







#    
print('5.    ,  , , .....', end='')

#   DWH        
#       -   
sql_query_columns = ("""
                        '             '
                    """)

settdf = pd.read_sql_query(sql_query_columns, new_con)

#   lower()  (= )   SAVE_COLUMN_NAME  dwh
#   , lower()       
settdf['SAVE_COLUMN_NAME'] = settdf['SAVE_COLUMN_NAME'].apply(lambda x: x.lower())
adf.columns = [''.join(j.title() for j in i.split('_')).lower() for i in adf.columns]

#   
needed_columns = [i for i in settdf['SAVE_COLUMN_NAME'].to_list()]

#     
needed_columns.append('DOWNLOAD_FILE_NAME')

#    DF c  
adf['DOWNLOAD_FILE_NAME'] = filename

#   ( , ,  )
adf.reset_index(inplace=True)

#    ( )   ,   
adf = adf.astype('unicode_').where(pd.notnull(adf), None)

#  DataFrame    
df_to_sql = adf[needed_columns]

#     
print('OK', end='\n')
      
      





10.



. .







#    DWH
#   
dts2 = time.time()
print('6.    ...', end='')

#      DWH
connection = pyodbc.connect(params)
engine = create_engine(new_con)

#   ()    DWH (   -  )
@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
    if executemany:
        cursor.fast_executemany = True

#  None  RAM
df_to_sql.to_sql(table_to_export, engine, schema=schema, if_exists='append', chunksize=100000, index=False)

#    
connection.close() 
print('OK', end='\n')

dtf = time.time()
diff1, diff2 = [str(int((dtf - i) // 60)) + '  ' + str(int((dtf - i) % 60)) + ' ' for i in (dts1, dts2)]
print(f' : {diff1},   : {diff2}')
print(' ,   ')

      
      





11.



! . .







#    
#     
conn2 = pymssql.connect(server, user, password, "  ")
cursor2 = conn2.cursor()
query = "      ,  ")

#  
cursor2.execute(query)

#    
conn2.commit()
conn2.close()

      
      





12.



, . . , ? , ETL , .







print('  ')

#       ETL       
conn3 = pymssql.connect(server, user, password, "  ")
cursor3 = conn3.cursor()
query = " ETL   .  EXEC dbo.SP"

cursor3.execute(query)

conn3.commit()
conn3.close()
      
      





13.



, .







#      
b = time.time()
diff = b-a
minutes = diff//60
print('  : {:.0f} ()'.format( minutes))

      
      





. — , .







, ETL, . , - , .







, Amplitude, . s2s , .







20 17:00 . . , .








All Articles