> ## Documentation Index
> Fetch the complete documentation index at: https://docs.kamu.dev/llms.txt
> Use this file to discover all available pages before exploring further.

# Overdue Order Shipments

export const Diagram = ({src, alt}) => {
  return <div style={{
    display: "flex",
    "flex-direction": "column",
    "align-items": "center"
  }}>
    <img src={src} alt={alt} style={{
    background: "#dddddd",
    "margin-bottom": 0
  }} />
    <span>{alt}</span>
  </div>;
};

export const YouTubeList = ({id}) => {
  const src = `https://www.youtube.com/embed/videoseries?list=${id}`;
  return <iframe className="w-full aspect-video rounded-xl" src={src} allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture" allowFullScreen></iframe>;
};

export const YouTube = ({id, width}) => {
  const src = `https://www.youtube.com/embed/${id}`;
  return <iframe className="w-full aspect-video rounded-xl" src={src} allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture" allowFullScreen width={width}></iframe>;
};

export const Schema = ({t, id}) => {
  const anchor = id ? id : t.toLowerCase().replace(/\s+/g, "-");
  const link = `/odf/schemas#${anchor}`;
  return <a class="schema-object" href={link}>{t}</a>;
};

export const Term = ({t, id}) => {
  const anchor = id ? id : t.toLowerCase().replace(/\s+/g, "-");
  const link = `/general/glossary#${anchor}`;
  return <a class="glossary-term" href={link}>{t}</a>;
};

**Topics covered:**

* Stream-to-Stream joins
* Temporal aggregations
* Watermarks

## Summary

In this example we will take a look at an imaginary e-commerce company `acme.com` committed to shipping their products to customers as soon as possible. To maintain their level of service they have decided to create an alerting system using `kamu` that would notify them if any order did not ship within a certain time window, so that they could investigate the root cause of the delay.

## Steps

### Getting Started

To follow this example checkout `kamu-cli` repository and navigate into [examples/overdue\_shipments](https://github.com/kamu-data/kamu-cli/tree/master/examples/overdue_shipments) sub-directory.

Create a temporary kamu workspace in that folder using:

```sh theme={null}
kamu init
```

You can either follow the example steps below or fast-track through it by running:

```sh theme={null}
# Add all dataset manifests found in the current directory
kamu add -recursive .
kamu pull --all
```

### Root Datasets

We will be using two root datasets:

* `com.acme.orders` - contains a log of orders as they are being placed in the system. To make it more interesting we will assume that each order can request different quantity of "the product" that can be shipped to the customer in one or several shipments.
* `com.acme.shipments` - contains a log of shipments as they are being dispatched to the customers. Every shipment links to the original order and specifies the quantity of "the product" that has been shipped.

Both datasets are sourcing their data from files located in `data/` sub-directory. Once comfortable with this example you can try adding more data in `orders-xxx.csv` and `shipments-xxx.csv` to see how `fileGlob` data sources work and test the queries with any edge cases you can come up with.

Let's add the root datasets and ingest the data:

```sh theme={null}
kamu add com.acme.orders.yaml com.acme.shipments.yaml
kamu pull --all
```

You can verify the result using `tail` command or the SQL shell:

```bash theme={null}
kamu sql
```

```sql theme={null}
0: kamu> select * from `com.acme.orders`;
+-------------+------------+-----------+-----------+
| system_time | event_time | order_id  | quantity  |
+-------------+------------+-----------+-----------+
|     ...     | 2020-01-01 | 1         | 10        |
|     ...     | 2020-01-01 | 2         | 120       |
|     ...     | 2020-01-05 | 3         | 9         |
|     ...     | 2020-01-10 | 4         | 110       |
+-------------+------------+-----------+-----------+
4 rows selected (0.11 seconds)

0: kamu> select * from `com.acme.shipments`;
+-------------+------------+--------------+-----------+-----------+
| system_time | event_time | shipment_id  | order_id  | quantity  |
+-------------+------------+--------------+-----------+-----------+
|     ...     | 2020-01-01 | 1            | 1         | 4         |
|     ...     | 2020-01-02 | 3            | 2         | 120       |
|     ...     | 2020-01-02 | 2            | 1         | 6         |
|     ...     | 2020-01-08 | 4            | 3         | 9         |
|     ...     | 2020-01-11 | 5            | 4         | 50        |
|     ...     | 2020-01-13 | 6            | 4         | 60        |
+-------------+------------+--------------+-----------+-----------+
```

### Joining Shipments with Orders

The goal of the `com.acme.shipments.overdue` dataset that we will be creating is to detect orders that have not been fulfilled at all or were fulfilled only partially within a specified time interval - let's say `2 DAYS`. We want the alert to go off as soon such condition occurs - there's no use investigating the root cause of the late order when few weeks have already passed since then.

We can define our goal as: an order placed in the system is considered "late" if during the interval of `2 DAYS` after the order has been placed the total quantity of product shipped doesn't equal the quantity that has been ordered.

First we need to get these two quantities somehow in one dataset, so let's join the orders and shipments data using `order_id` as the key:

```sql theme={null}
-- Using Flink SQL dialect
SELECT
  o.event_time as order_time,
  o.order_id,
  o.quantity as order_quantity,
  CAST(s.event_time as TIMESTAMP) as shipped_time,
  COALESCE(s.quantity, 0) as shipped_quantity
FROM `com.acme.orders` as o
LEFT JOIN `com.acme.shipments` as s
ON
  o.order_id = s.order_id
  AND s.event_time BETWEEN o.event_time AND o.event_time + INTERVAL '2' DAY
```

This is a Stream-to-Stream join which uses two join conditions.

Firstly, we match orders and shipments on `order_id` - this is pretty standard and something you would do in a regular SQL with non-temporal data. If we only have this condition the query would already work, but since `kamu` doesn't know how far apart in time the orders and shipments can be it will expect that order `1` can match a shipment that happens a century or even a billion years from now, so it will have to keep track of all the orders forever.

To prevent that we add another condition that restricts the `shipment.event_time` to a time interval `[order.event_time, order.event_time + 2 DAYS)`. This tells `kamu` that all orders that are older than two days can be forgotten since they will never match this condition.

Also note that we are especially interested in orders that had no shipments at all, so we are using `LEFT JOIN` that will produce a record even if nothing on the right side has matched our criteria. In this case all `s.*` fields will be `NULL`, so we are using `COALESCE` to replace `NULL` values with a zero when computing `shipped_quantity`.

Also note that we need to `CAST` the `s.event_time` to `TIMESTAMP`. This is a specific of Flink SQL which treats event time of stream as a special type. Every stream is allowed to have only one event time so when two streams are joined we need to cast one of the times to a plain `TIMESTAMP` type.

### Computing total quantity of shipped product

The query above will produce a new event for every shipment. Since one order can have multiple shipment events and we're only interested in total quantity shipped - we need to aggregate them.

```sql theme={null}
SELECT
  TUMBLE_START(order_time, INTERVAL '1' DAY) as order_time,
  order_id,
  count(*) as num_shipments,
  min(shipped_time) as first_shipment,
  max(shipped_time) as last_shipment,
  max(order_quantity) as order_quantity,
  sum(shipped_quantity) as shipped_quantity_total
FROM order_shipments
GROUP BY TUMBLE(order_time, INTERVAL '1' DAY), order_id
```

Since in our previous query (which we aliased as `order_shipments`) we emit results using `order.event_time` as the event time - all records for the same order will have the same event time. Even when event times match exactly Flink requires us to use some form of windowing in aggregations so we use a 1-day interval and a tumbling window. As long as our interval is less than the time we chose for considering the shipment "late" - we will not delay the emission of the results.

### Detecting late shipments

The final step is very straightforward - we want to emit only records for orders that were not fully fulfilled:

```sql theme={null}
SELECT *
FROM order_shipments_agg
WHERE order_quantity <> shipped_quantity_total
```

Putting all of this together, our final dataset will look like this:

```yaml theme={null}
version: 1
kind: DatasetSnapshot
content:
  name: com.acme.shipments.overdue
  kind: Derivative
  metadata:
    - kind: SetTransform
      inputs:
        - datasetRef: com.acme.orders
        - datasetRef: com.acme.shipments
      transform:
        kind: Sql
        engine: flink
        queries:
          # This is a Stream-to-Stream join that matches orders and shipments on order_id, 
          # restricting the shipment.event_time to a time interval [order.event_time, order.event_time + 2 DAYS).
          # Note that the cast of s.event_time is necessary to tell Flink to no longer treat it as `rowtime`.
          - alias: order_shipments
            query: |
              SELECT
                o.event_time as order_time,
                o.order_id,
                o.quantity as order_quantity,
                CAST(s.event_time as TIMESTAMP(3)) as shipped_time,
                COALESCE(s.quantity, 0) as shipped_quantity
              FROM `com.acme.orders` as o
              LEFT JOIN `com.acme.shipments` as s
              ON
                o.order_id = s.order_id
                AND s.event_time BETWEEN o.event_time AND o.event_time + INTERVAL '2' DAY
          # This is a windowed aggregation that computes the total quantity of shipped product
          - alias: order_shipments_agg
            query: |
              SELECT
                TUMBLE_START(order_time, INTERVAL '1' DAY) as order_time,
                order_id,
                count(*) as num_shipments,
                min(shipped_time) as first_shipment,
                max(shipped_time) as last_shipment,
                max(order_quantity) as order_quantity,
                sum(shipped_quantity) as shipped_quantity_total
              FROM order_shipments
              GROUP BY TUMBLE(order_time, INTERVAL '1' DAY), order_id
          # Return only records for orders that were not fully fulfilled
          - query: |
              SELECT *
              FROM order_shipments_agg
              WHERE order_quantity <> shipped_quantity_total
    - kind: SetVocab
      eventTimeColumn: order_time
```

Add this dataset and let's compute the result:

```sh theme={null}
kamu add com.acme.shipments.overdue.yaml
kamu pull com.acme.shipments.overdue
```

### Reminder about Watermarks

If you look at the resulting data you will notice that we didn't get the alert for `order_id == 4` which shipped in two batches, last of which was late by one day. This is because the watermark of the `orders` dataset did not advance far enough to cover that period. Try setting the watermark manually and then running the processing again:

```sh theme={null}
kamu pull com.acme.orders --set-watermark 2020-01-13T00:00:00Z
kamu pull --all
```

More detailed explanation of watermarks is given in the [Stock Market Trading example](/examples/stock-trading).
