Wechselkurse und Analysen - Verwenden von Wechselkursen im Data Warehouse

Hallo! In Kontakt Artemiy - Analytics Engineer von Wheely.





Heute möchte ich über die Umrechnung von Finanzindikatoren in verschiedene Währungen sprechen. Die Frage ist sehr relevant, da eine große Anzahl von Unternehmen über multinationale Präsenzzonen verfügt, Analysen auf globaler Ebene erstellt und Berichte gemäß internationalen Standards erstellt.





Lassen Sie mich am Beispiel des Wheely-Falls zeigen, wie dieses Problem mit modernen Ansätzen gelöst wird:





















: Open Exchange Rates, Airflow, Redshift Spectrum, dbt.






legacy- - . . , AED ( ). - BTC, ETH, - .





:





  • , API





  • ,





  • ( )





Matrix neuer Anforderungen für die Arbeit mit Wechselkursen

, . , :





  • API









  • ()





  • legacy-





pivot- . , , . 





pandas . (T ELT) , dbt.





, , – https://openexchangerates.org/





Developer :





  • 10.000 ( )









  • ,





API:





API endpoint /latest.json





- :





Airflow

Airflow. Apache Airflow – - , data engineering . 





(DAG):





  • API





  • (, S3)





  • Slack





DAG:





  • (base currency),













DAG shell-:





TS=`date +"%Y-%m-%d-%H-%M-%S-%Z"`
 
curl -H "Authorization: Token $OXR_TOKEN" \
 "https://openexchangerates.org/api/historical/$BUSINESS_DT.json?base=$BASE_CURRENCY&symbols=$SYMBOLS" \
 | aws s3 cp - s3://$BUCKET/$BUCKET_PATH/$BUSINESS_DT-$BASE_CURRENCY-$TS.json
      
      



S3:





25 , :





, (, , ). .





, Developer API endpoint /time-series.json, upgrade .





/historical/*.json API :





#!/bin/bash
 
d=2011-01-01
while [ "$d" != 2021-02-19 ]; do
 echo $d
 curl -H "Authorization: Token $TOKEN" "https://openexchangerates.org/api/historical/$d.json?base=AED&symbols=AED,GBP,EUR,RUB,USD" > ./export/$d.json
 d=$(date -j -v +1d -f "%Y-%m-%d" $d +%Y-%m-%d)
done
      
      



, , :





legacy- X ( -) .





, . - .





Data Lake. , :





  • legacy pivot-





  • PARQUET AWS S3





S3 PARQUET
CREATE EXTERNAL TABLE spectrum.currencies_cbrf
STORED AS PARQUET
LOCATION 's3://<BUCKET>/dwh/currencies_cbrf/' AS
WITH base AS (
   SELECT 'EUR' AS base_currency
   UNION ALL
   SELECT 'GBP'
   UNION ALL
   SELECT 'RUB'
   UNION ALL
   SELECT 'USD'
)
SELECT
   "day" AS business_dt
   ,b.base_currency
   ,CASE b.base_currency
       WHEN 'EUR' THEN 1
       WHEN 'GBP' THEN gbp_to_eur
       WHEN 'RUB' THEN rub_to_eur
       WHEN 'USD' THEN usd_to_eur
       ELSE NULL
     END AS eur
   ,CASE b.base_currency
       WHEN 'EUR' THEN eur_to_gbp
       WHEN 'GBP' THEN 1
       WHEN 'RUB' THEN rub_to_gbp
       WHEN 'USD' THEN usd_to_gbp
       ELSE NULL
     END AS gbp
   ,CASE b.base_currency
       WHEN 'EUR' THEN eur_to_rub
       WHEN 'GBP' THEN gbp_to_rub
       WHEN 'RUB' THEN 1
       WHEN 'USD' THEN usd_to_rub
       ELSE NULL
     END AS rub
   ,CASE b.base_currency
       WHEN 'EUR' THEN eur_to_usd
       WHEN 'GBP' THEN gbp_to_usd
       WHEN 'RUB' THEN rub_to_usd
       WHEN 'USD' THEN 1
       ELSE NULL
     END AS usd     
FROM ext.currencies c
   CROSS JOIN base b
;
      
      



, S3 , - , . .





DWH S3 External Table

– Amazon Redshift , .





– EXTERNAL TABLE, SQL- , S3. JSON, AVRO, ORC, PARQUET . Redshift Spectrum SQL- Amazon Athena, Presto.





CREATE EXTERNAL TABLE IF NOT EXISTS spectrum.currencies_oxr (
   "timestamp" bigint
   , base varchar(3)
   , rates struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>
)
ROW format serde 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://<BUCKET>/dwh/currencies/'
;

      
      



rates struct.





dbt. dbt-external-tables EXTERNAL TABLES :





   - name: external
     schema: spectrum
     tags: ["spectrum"]
     loader: S3
     description: "External data stored in S3 accessed vith Redshift Spectrum"
     tables:
       - name: currencies_oxr
         description: "Currency Exchange Rates fetched from OXR API https://openexchangerates.org"
         freshness:
           error_after: {count: 15, period: hour}
         loaded_at_field: timestamp 'epoch' + "timestamp" * interval '1 second'
         external:
           location: "s3://<BUCKET>/dwh/currencies/"
           row_format: "serde 'org.openx.data.jsonserde.JsonSerDe'"
         columns:
           - name: timestamp
             data_type: bigint
           - name: base
             data_type: varchar(3)
           - name: rates
             data_type: struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>
      
      



source freshness test . . , .





– 15 – Slack.





() ( API) currencies:





{{
   config(
       materialized='table',
       dist='all',
       sort=["business_dt", "base_currency"]
   )
}}
 
with cbrf as (
 
 select
 
     business_dt
   , null as business_ts
   , base_currency
   , aed
   , eur
   , gbp
   , rub
   , usd
 
 from {{ source('external', 'currencies_cbrf') }}
 where business_dt <= '2021-02-18'
 ),
 
oxr_all as (
 
   select
 
     (timestamp 'epoch' + o."timestamp" * interval '1 second')::date as business_dt
   , (timestamp 'epoch' + o."timestamp" * interval '1 second') as business_ts
   , o.base as base_currency
   , o.rates.aed::decimal(10,4) as aed
   , o.rates.eur::decimal(10,4) as eur
   , o.rates.gbp::decimal(10,4) as gbp
   , o.rates.rub::decimal(10,4) as rub
   , o.rates.usd::decimal(10,4) as usd
   , row_number() over (partition by base_currency, business_dt order by business_ts desc) as rn
 
   from {{ source('external', 'currencies_oxr') }} as o
   where business_dt > '2021-02-18'
 
),
 
oxr as (
 
 select
 
     business_dt
   , business_ts
   , base_currency
   , aed
   , eur
   , gbp
   , rub
   , usd
 
 from {{ ref('stg_currencies_oxr_all') }}
 where rn = 1
 ),
 
united as (
 
 select
 
     business_dt
   , business_ts
   , base_currency
   , aed
   , eur
   , gbp
   , rub
   , usd
  from cbrf
 
 union all
 
 select
 
     business_dt
   , business_ts
   , base_currency
   , aed
   , eur
   , gbp
   , rub
   , usd
  from oxr
 
)
 
select
 
   business_dt
 , business_ts
 , base_currency
 , aed
 , eur
 , gbp
 , rub
 , usd
 
from united
      
      



Redshift   .





, , . API, - JSON S3,   . :





   select
 
       -- price_details
       , r.currency
       , {{ convert_currency('price', 'currency') }}
       , {{ convert_currency('discount', 'currency') }}
       , {{ convert_currency('insurance', 'currency') }}
       , {{ convert_currency('tips', 'currency') }}
       , {{ convert_currency('parking', 'currency') }}
       , {{ convert_currency('toll_road', 'currency') }}
 
   from {{ ref('requests') }} r
       left join {{ ref('stg_currencies') }} currencies on r.completed_dt_utc = currencies.business_dt
           and r.currency = currencies.base_currency

      
      



, :





-- currency conversion macro
{% macro convert_currency(convert_column, currency_code_column) -%}
 
     ( {{ convert_column }} * aed )::decimal(18,4) as {{ convert_column }}_aed
   , ( {{ convert_column }} * eur )::decimal(18,4) as {{ convert_column }}_eur
   , ( {{ convert_column }} * gbp )::decimal(18,4) as {{ convert_column }}_gbp
   , ( {{ convert_column }} * rub )::decimal(18,4) as {{ convert_column }}_rub
   , ( {{ convert_column }} * usd )::decimal(18,4) as {{ convert_column }}_usd
 
{%- endmacro %}

      
      



-

– . . , .





Data Engineer OTUS, .





, . – :





  • Data Architecture





  • Data Lake





  • Data Warehouse





  • NoSQL / NewSQL





  • MLOps





.





- Technology Enthusiast.





.








All Articles