Hallo! In Kontakt Artemiy - Analytics Engineer von Wheely.
Heute möchte ich über die Umrechnung von Finanzindikatoren in verschiedene Währungen sprechen. Die Frage ist sehr relevant, da eine große Anzahl von Unternehmen über multinationale Präsenzzonen verfügt, Analysen auf globaler Ebene erstellt und Berichte gemäß internationalen Standards erstellt.
Lassen Sie mich am Beispiel des Wheely-Falls zeigen, wie dieses Problem mit modernen Ansätzen gelöst wird:
: Open Exchange Rates, Airflow, Redshift Spectrum, dbt.
legacy- - . . , AED ( ). - BTC, ETH, - .
:
, API
,
( )
, . , :
API
()
legacy-
pivot- . , , .
pandas . (T ELT) , dbt.
, , – https://openexchangerates.org/
Developer :
10.000 ( )
,
API:
API endpoint /latest.json
- :
Airflow
Airflow. Apache Airflow – - , data engineering .
(DAG):
API
(, S3)
Slack
DAG:
(base currency),
DAG shell-:
TS=`date +"%Y-%m-%d-%H-%M-%S-%Z"`
curl -H "Authorization: Token $OXR_TOKEN" \
"https://openexchangerates.org/api/historical/$BUSINESS_DT.json?base=$BASE_CURRENCY&symbols=$SYMBOLS" \
| aws s3 cp - s3://$BUCKET/$BUCKET_PATH/$BUSINESS_DT-$BASE_CURRENCY-$TS.json
S3:
25 , :
, (, , ). .
, Developer API endpoint /time-series.json, upgrade .
/historical/*.json API :
#!/bin/bash
d=2011-01-01
while [ "$d" != 2021-02-19 ]; do
echo $d
curl -H "Authorization: Token $TOKEN" "https://openexchangerates.org/api/historical/$d.json?base=AED&symbols=AED,GBP,EUR,RUB,USD" > ./export/$d.json
d=$(date -j -v +1d -f "%Y-%m-%d" $d +%Y-%m-%d)
done
, , :
legacy- X ( -) .
, . - .
Data Lake. , :
legacy pivot-
PARQUET AWS S3
S3 PARQUET
CREATE EXTERNAL TABLE spectrum.currencies_cbrf
STORED AS PARQUET
LOCATION 's3://<BUCKET>/dwh/currencies_cbrf/' AS
WITH base AS (
SELECT 'EUR' AS base_currency
UNION ALL
SELECT 'GBP'
UNION ALL
SELECT 'RUB'
UNION ALL
SELECT 'USD'
)
SELECT
"day" AS business_dt
,b.base_currency
,CASE b.base_currency
WHEN 'EUR' THEN 1
WHEN 'GBP' THEN gbp_to_eur
WHEN 'RUB' THEN rub_to_eur
WHEN 'USD' THEN usd_to_eur
ELSE NULL
END AS eur
,CASE b.base_currency
WHEN 'EUR' THEN eur_to_gbp
WHEN 'GBP' THEN 1
WHEN 'RUB' THEN rub_to_gbp
WHEN 'USD' THEN usd_to_gbp
ELSE NULL
END AS gbp
,CASE b.base_currency
WHEN 'EUR' THEN eur_to_rub
WHEN 'GBP' THEN gbp_to_rub
WHEN 'RUB' THEN 1
WHEN 'USD' THEN usd_to_rub
ELSE NULL
END AS rub
,CASE b.base_currency
WHEN 'EUR' THEN eur_to_usd
WHEN 'GBP' THEN gbp_to_usd
WHEN 'RUB' THEN rub_to_usd
WHEN 'USD' THEN 1
ELSE NULL
END AS usd
FROM ext.currencies c
CROSS JOIN base b
;
, S3 , - , . .
DWH S3 External Table
– Amazon Redshift , .
– EXTERNAL TABLE, SQL- , S3. JSON, AVRO, ORC, PARQUET . Redshift Spectrum SQL- Amazon Athena, Presto.
CREATE EXTERNAL TABLE IF NOT EXISTS spectrum.currencies_oxr (
"timestamp" bigint
, base varchar(3)
, rates struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>
)
ROW format serde 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://<BUCKET>/dwh/currencies/'
;
rates struct.
dbt. dbt-external-tables EXTERNAL TABLES :
- name: external
schema: spectrum
tags: ["spectrum"]
loader: S3
description: "External data stored in S3 accessed vith Redshift Spectrum"
tables:
- name: currencies_oxr
description: "Currency Exchange Rates fetched from OXR API https://openexchangerates.org"
freshness:
error_after: {count: 15, period: hour}
loaded_at_field: timestamp 'epoch' + "timestamp" * interval '1 second'
external:
location: "s3://<BUCKET>/dwh/currencies/"
row_format: "serde 'org.openx.data.jsonserde.JsonSerDe'"
columns:
- name: timestamp
data_type: bigint
- name: base
data_type: varchar(3)
- name: rates
data_type: struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>
– source freshness test . . , .
– 15 – Slack.
() ( API) currencies:
{{
config(
materialized='table',
dist='all',
sort=["business_dt", "base_currency"]
)
}}
with cbrf as (
select
business_dt
, null as business_ts
, base_currency
, aed
, eur
, gbp
, rub
, usd
from {{ source('external', 'currencies_cbrf') }}
where business_dt <= '2021-02-18'
),
oxr_all as (
select
(timestamp 'epoch' + o."timestamp" * interval '1 second')::date as business_dt
, (timestamp 'epoch' + o."timestamp" * interval '1 second') as business_ts
, o.base as base_currency
, o.rates.aed::decimal(10,4) as aed
, o.rates.eur::decimal(10,4) as eur
, o.rates.gbp::decimal(10,4) as gbp
, o.rates.rub::decimal(10,4) as rub
, o.rates.usd::decimal(10,4) as usd
, row_number() over (partition by base_currency, business_dt order by business_ts desc) as rn
from {{ source('external', 'currencies_oxr') }} as o
where business_dt > '2021-02-18'
),
oxr as (
select
business_dt
, business_ts
, base_currency
, aed
, eur
, gbp
, rub
, usd
from {{ ref('stg_currencies_oxr_all') }}
where rn = 1
),
united as (
select
business_dt
, business_ts
, base_currency
, aed
, eur
, gbp
, rub
, usd
from cbrf
union all
select
business_dt
, business_ts
, base_currency
, aed
, eur
, gbp
, rub
, usd
from oxr
)
select
business_dt
, business_ts
, base_currency
, aed
, eur
, gbp
, rub
, usd
from united
Redshift .
, , . API, - JSON S3, . :
select
-- price_details
, r.currency
, {{ convert_currency('price', 'currency') }}
, {{ convert_currency('discount', 'currency') }}
, {{ convert_currency('insurance', 'currency') }}
, {{ convert_currency('tips', 'currency') }}
, {{ convert_currency('parking', 'currency') }}
, {{ convert_currency('toll_road', 'currency') }}
from {{ ref('requests') }} r
left join {{ ref('stg_currencies') }} currencies on r.completed_dt_utc = currencies.business_dt
and r.currency = currencies.base_currency
, :
-- currency conversion macro
{% macro convert_currency(convert_column, currency_code_column) -%}
( {{ convert_column }} * aed )::decimal(18,4) as {{ convert_column }}_aed
, ( {{ convert_column }} * eur )::decimal(18,4) as {{ convert_column }}_eur
, ( {{ convert_column }} * gbp )::decimal(18,4) as {{ convert_column }}_gbp
, ( {{ convert_column }} * rub )::decimal(18,4) as {{ convert_column }}_rub
, ( {{ convert_column }} * usd )::decimal(18,4) as {{ convert_column }}_usd
{%- endmacro %}
-
– . . , .
Data Engineer OTUS, .
, . – :
Data Architecture
Data Lake
Data Warehouse
NoSQL / NewSQL
MLOps
.