- Temporal table joins
Summary
Currency conversion is a great example of a problem that cannot be solved without considering the time dimension. As exchange rates vary significantly every day using an average rate for a certain period is a gross oversimplification of a problem that can give very inaccurate results. This example shows howkamu and streaming SQL can be used to join your overseas trading account data with an exchange rate stream to bring your calculations to the common currency denominator.
Steps
Getting Started
To follow this example checkoutkamu-cli repository and navigate into examples/currency_conversion sub-directory.
Create a temporary kamu in that folder using:
Root Datasets
We will be using two root datasets:ca.bankofcanada.exchange-rates.daily- contains daily exchange rates betweenUSDandCAD.my.trading.transactions- contains a transaction ledger from a fake trading account similar to what you’d get in data export from your trading platform.
my.trading.transactions is sourcing its data from a file located in data/ sub-directory, while the exchange rates are obtained from an external source - the Bank of Canada historical exchange rates dataset.
Let’s add the and ingest data:
tail command or the SQL shell:
yaml file kamu creates an internal representation of it in the .kamu directory, so to make any changes to the dataset you will need to re-add the dataset again after changing the yaml file.
Converting transaction values into CAD
If you look atmy.trading.transactions dataset - it contains the trading activity of a US account, so all prices and settlement amounts there are listed in US Dollars (USD). But let’s imagine that you live in Canada and all of your other accounts and your credit cards are all in Canadian Dollars (CAD). It would be a lot easier for you to monitor your financials if they were in one currency.
If you analyze your financials on a quarterly or yearly basis you might’ve dealt with this problem by looking up an average exchange rate for a certain time period and used it in your calculations. This is of course an oversimplification that may give misleading results - your transactions are most likely not distributed uniformly throughout the time period, and if you live in a country with volatile economy or trading crypto - the exchange rate may fluctuate significantly.
Ideally we would like to take every single transaction and convert the price using the exchange rate for the corresponding date. Thankfully, kamu can make this process very easy using the temporal table joins.
Temporal table joins (see this great explanation in Apache Flink’s blog) take one of the streams and represent it as a three-dimensional table. When joining to such a table you need to pass in the time argument to tell join to consider the version of the table that would exist at that point in time.
Let’s have a look at the my.trading.transactions.cad dataset:
The excessive use of back ticks is currently caused by the SQL parser used by Apache Flink which is overly sensitive to reserved words - this should improve in future versions.
temporalTables section we instruct the Flink engine to use ca.bankofcanada.exchange-rates.daily event stream to create a temporal table of the same name.
Here’s how this three-dimensional table would look like when sliced at different points in time:
primaryKey (currency_base in our case).
The LEFT JOIN ca.bankofcanada.exchange-rates.daily FOR SYSTEM_TIME AS OF tr.event_time part can be interpreted as us taking every transaction event from my.trading.transactions, indexing the temporal table ca.bankofcanada.exchange-rates.daily at this event’s event_time and then joining the same event with the resulting (now ordinary two-dimensional) table.
Note that
SYSTEM_TIME AS OF syntax is a relic of SQL:2011 standard and should not be confused with Kamu’s system_time column. The actual join is performed in the . You can learn more about temporal event time joins in Flink documentation.kamu pull in the future the latest transaction data will be ingested along with latest exchange rates, producing new transactions with converted prices. This is the “write once - run forever” philosophy of kamu that combines best propagation times with the accuracy of solving temporal problems without taking any shortcuts.
Once you master this example - make sure to check out the Stock Market Trading example that introduces another really important mechanism related to temporal joins - the .