Currency Conversion
Topics covered:
- Temporal table joins
Summary
Currency conversion is a great example of a problem that cannot be solved without considering the time dimension. As exchange rates vary significantly every day using an average rate for a certain period is a gross oversimplification of a problem that can give very inaccurate results. This example shows how kamu
and streaming SQL can be used to join your overseas trading account data with an exchange rate stream to bring your calculations to the common currency denominator.
Steps
Getting Started
To follow this example checkout kamu-cli
repository and navigate into examples/currency_conversion sub-directory.
Create a temporary kamu workspace in that folder using:
kamu init
You can either follow the example steps below or fast-track through it by running:
kamu add --recursive .
kamu pull --all
Root Datasets
We will be using two root datasets:
ca.bankofcanada.exchange-rates.daily
- contains daily exchange rates betweenUSD
andCAD
.my.trading.transactions
- contains a transaction ledger from a fake trading account similar to what you’d get in data export from your trading platform.
The my.trading.transactions
is sourcing its data from a file located in data/
sub-directory, while the exchange rates are obtained from an external source - the Bank of Canada historical exchange rates dataset.
Let’s add the root datasets and ingest data:
kamu add ca.bankofcanada.exchange-rates.daily.yaml my.trading.transactions.yaml
kamu pull --all
You can verify the result using tail
command or the SQL shell:
kamu tail ca.bankofcanada.exchange-rates.daily
Or explore the dataset via SQL shell:
kamu sql
select * from "ca.bankofcanada.exchange-rates.daily" limit 5;
+-------------+------------+---------------+-----------------+--------------+
| system_time | date | currency_base | currency_target | rate |
+-------------+------------+---------------+-----------------+--------------+
| ... | 1999-01-04 | USD | CAD | 1.5271863602 |
| ... | 1999-01-05 | USD | CAD | 1.5237489398 |
| ... | 1999-01-06 | USD | CAD | 1.5082176616 |
| ... | 1999-01-07 | USD | CAD | 1.5132393398 |
| ... | 1999-01-08 | USD | CAD | 1.5132515653 |
+-------------+------------+---------------+-----------------+--------------+
Remember, that after adding a dataset from a yaml
file kamu
creates an internal representation of it in the workspace .kamu
directory, so to make any changes to the dataset you will need to re-add the dataset again after changing the yaml
file.
kamu delete my.trading.transactions
# <Make changes to the yaml file>
kamu add my.trading.transactions.yaml
# Alternatively
kamu add --replace my.trading.transactions.yaml
Converting transaction values into CAD
If you look at my.trading.transactions
dataset - it contains the trading activity of a US account, so all prices and settlement amounts there are listed in US Dollars (USD
). But let’s imagine that you live in Canada and all of your other accounts and your credit cards are all in Canadian Dollars (CAD
). It would be a lot easier for you to monitor your financials if they were in one currency.
If you analyze your financials on a quarterly or yearly basis you might’ve dealt with this problem by looking up an average exchange rate for a certain time period and used it in your calculations. This is of course an oversimplification that may give misleading results - your transactions are most likely not distributed uniformly throughout the time period, and if you live in a country with volatile economy or trading crypto - the exchange rate may fluctuate significantly.
Ideally we would like to take every single transaction and convert the price using the exchange rate for the corresponding date. Thankfully, kamu
can make this process very easy using the temporal table joins.
Temporal table joins (see this great explanation in Apache Flink’s blog) take one of the streams and represent it as a three-dimensional table. When joining to such a table you need to pass in the time argument to tell join to consider the version of the table that would exist at that point in time.
Let’s have a look at the my.trading.transactions.cad
dataset:
kind: DatasetSnapshot
version: 1
content:
name: my.trading.transactions.cad
kind: Derivative
metadata:
- kind: SetTransform
inputs:
- datasetRef: ca.bankofcanada.exchange-rates.daily
- datasetRef: my.trading.transactions
transform:
kind: Sql
engine: flink
temporalTables:
- name: ca.bankofcanada.exchange-rates.daily
primaryKey:
- currency_base
query: |
SELECT
tr.`event_time`,
tr.`symbol`,
tr.`quantity`,
tr.`price` as `price_usd`,
tr.`price` * exc.`rate` as `price_cad`,
tr.`settlement` as `settlement_usd`,
tr.`settlement` * exc.`rate` as `settlement_cad`
FROM `my.trading.transactions` as tr
LEFT JOIN `ca.bankofcanada.exchange-rates.daily` FOR SYSTEM_TIME AS OF tr.`event_time` as exc
ON tr.`currency` = exc.`currency_base` AND exc.`currency_target` = 'CAD'
Using the temporalTables
section we instruct the Flink engine to use ca.bankofcanada.exchange-rates.daily
event stream to create a temporal table of the same name.
Here’s how this three-dimensional table would look like when sliced at different points in time:
ca.bankofcanada.exchange-rates.daily:
@ 1999-01-04
+---------------+-----------------+--------------+
| currency_base | currency_target | rate |
+---------------+-----------------+--------------+
| USD | CAD | 1.5271863602 |
+---------------+-----------------+--------------+
@ 1999-01-05
+---------------+-----------------+--------------+
| currency_base | currency_target | rate |
+---------------+-----------------+--------------+
| USD | CAD | 1.5237489398 |
+---------------+-----------------+--------------+
@ 1999-01-06
+---------------+-----------------+--------------+
| currency_base | currency_target | rate |
+---------------+-----------------+--------------+
| USD | CAD | 1.5082176616 |
+---------------+-----------------+--------------+
It basically remembers the last observed value of every column grouped by the provided primaryKey
(currency_base
in our case).
The LEFT JOIN ca.bankofcanada.exchange-rates.daily FOR SYSTEM_TIME AS OF tr.event_time
part can be interpreted as us taking every transaction event from my.trading.transactions
, indexing the temporal table ca.bankofcanada.exchange-rates.daily
at this event’s event_time
and then joining the same event with the resulting (now ordinary two-dimensional) table.
SYSTEM_TIME AS OF
syntax is a relic of SQL:2011 standard and should not be confused with Kamu’s system_time
column. The actual join is performed in the event time space. You can learn more about temporal event time joins in Flink documentation.With theory out of the way, it’s time to give this a try:
kamu add my.trading.transactions.cad.yaml
kamu pull my.trading.transactions.cad
The results should be:
kamu tail my.trading.transactions.cad
+-------------+------------+--------+----------+-----------+---------------------+
| system_time | event_time | symbol | quantity | price_usd | price_cad |
+-------------+------------+--------+----------+-----------+---------------------+
| ... | 2016-01-04 | SPY | 1 | 201.0200 | 279.85643605283600 |
| ... | 2016-02-01 | SPY | 1 | 193.6500 | 271.38399945165000 |
| ... | 2016-03-01 | SPY | 1 | 198.1100 | 267.84574042498800 |
| ... | 2016-04-01 | SPY | 1 | 206.9200 | 269.58244227661600 |
| ... | 2016-05-02 | SPY | 1 | 207.9700 | 260.35607413671100 |
+-------------+------------+--------+----------+-----------+---------------------+
The best thing about this is that you may never have to touch this SQL query again. Each time you run kamu pull
in the future the latest transaction data will be ingested along with latest exchange rates, producing new transactions with converted prices. This is the “write once - run forever” philosophy of kamu
that combines best propagation times with the accuracy of solving temporal problems without taking any shortcuts.
Once you master this example - make sure to check out the Stock Market Trading example that introduces another really important mechanism related to temporal joins - the watermark.