Topics covered:Documentation Index
Fetch the complete documentation index at: https://docs.kamu.dev/llms.txt
Use this file to discover all available pages before exploring further.
- Stream-to-Stream joins
- Temporal aggregations
- Watermarks
Summary
In this example we will take a look at an imaginary e-commerce companyacme.com committed to shipping their products to customers as soon as possible. To maintain their level of service they have decided to create an alerting system using kamu that would notify them if any order did not ship within a certain time window, so that they could investigate the root cause of the delay.
Steps
Getting Started
To follow this example checkoutkamu-cli repository and navigate into examples/overdue_shipments sub-directory.
Create a temporary kamu workspace in that folder using:
Root Datasets
We will be using two root datasets:com.acme.orders- contains a log of orders as they are being placed in the system. To make it more interesting we will assume that each order can request different quantity of “the product” that can be shipped to the customer in one or several shipments.com.acme.shipments- contains a log of shipments as they are being dispatched to the customers. Every shipment links to the original order and specifies the quantity of “the product” that has been shipped.
data/ sub-directory. Once comfortable with this example you can try adding more data in orders-xxx.csv and shipments-xxx.csv to see how fileGlob data sources work and test the queries with any edge cases you can come up with.
Let’s add the root datasets and ingest the data:
tail command or the SQL shell:
Joining Shipments with Orders
The goal of thecom.acme.shipments.overdue dataset that we will be creating is to detect orders that have not been fulfilled at all or were fulfilled only partially within a specified time interval - let’s say 2 DAYS. We want the alert to go off as soon such condition occurs - there’s no use investigating the root cause of the late order when few weeks have already passed since then.
We can define our goal as: an order placed in the system is considered “late” if during the interval of 2 DAYS after the order has been placed the total quantity of product shipped doesn’t equal the quantity that has been ordered.
First we need to get these two quantities somehow in one dataset, so let’s join the orders and shipments data using order_id as the key:
order_id - this is pretty standard and something you would do in a regular SQL with non-temporal data. If we only have this condition the query would already work, but since kamu doesn’t know how far apart in time the orders and shipments can be it will expect that order 1 can match a shipment that happens a century or even a billion years from now, so it will have to keep track of all the orders forever.
To prevent that we add another condition that restricts the shipment.event_time to a time interval [order.event_time, order.event_time + 2 DAYS). This tells kamu that all orders that are older than two days can be forgotten since they will never match this condition.
Also note that we are especially interested in orders that had no shipments at all, so we are using LEFT JOIN that will produce a record even if nothing on the right side has matched our criteria. In this case all s.* fields will be NULL, so we are using COALESCE to replace NULL values with a zero when computing shipped_quantity.
Also note that we need to CAST the s.event_time to TIMESTAMP. This is a specific of Flink SQL which treats event time of stream as a special type. Every stream is allowed to have only one event time so when two streams are joined we need to cast one of the times to a plain TIMESTAMP type.
Computing total quantity of shipped product
The query above will produce a new event for every shipment. Since one order can have multiple shipment events and we’re only interested in total quantity shipped - we need to aggregate them.order_shipments) we emit results using order.event_time as the event time - all records for the same order will have the same event time. Even when event times match exactly Flink requires us to use some form of windowing in aggregations so we use a 1-day interval and a tumbling window. As long as our interval is less than the time we chose for considering the shipment “late” - we will not delay the emission of the results.
Detecting late shipments
The final step is very straightforward - we want to emit only records for orders that were not fully fulfilled:Reminder about Watermarks
If you look at the resulting data you will notice that we didn’t get the alert fororder_id == 4 which shipped in two batches, last of which was late by one day. This is because the watermark of the orders dataset did not advance far enough to cover that period. Try setting the watermark manually and then running the processing again: