Leaderboard
Topics covered:
- Top-N aggregation
- Retractions / corrections
- Push ingest
Summary
Suppose we have a game where players compete in matches against one another and get assigned an overall match score. In this example we will see:
- How to create a dataset that represents a leaderboard of top-scoring players in the game using “Top-N” class of queries
- How such dataset tracks the entire history of changes
- And how “Top-N” queries manifest into a mechanism called retractions and corrections.
Steps
Getting Started
To follow this example checkout kamu-cli
repository and navigate into examples/leaderboard sub-directory.
Create a temporary kamu workspace in that folder using:
kamu init
You can either follow the example steps below or fast-track through it by running:
./init.sh
Scores Dataset
We start with the player_scores
dataset that will ingest the scores of all players who played a match. It will have a very simple schema:
match_time TIMESTAMP
match_id BIGINT
player_id STRING
score BIGINT
Our game will be sending scores data directly to the node hosting this dataset, so we will use AddPushSource metadata event to describe the format that data will come in.
We’ll use NdJson :
kind: DatasetSnapshot
version: 1
content:
name: player-scores
kind: Root
metadata:
- kind: AddPushSource
sourceName: default
read:
kind: NdJson
schema:
- "match_time TIMESTAMP"
- "match_id BIGINT"
- "player_id STRING"
- "score BIGINT"
merge:
kind: Ledger
primaryKey:
- match_id
- player_id
- kind: SetVocab
eventTimeColumn: match_time
Create the dataset now:
kamu add player-scores.yaml
To get data in we will ingest it from a file, but note that it could similarly be any other “push” protocol like REST, MQTT, Kafka…
Let’s add the first match scores in:
kamu ingest player-scores ./data/1.ndjson
You can see the new data via:
kamu tail player-scores
┌────────┬────┬─────┬──────────┬───────────┬───────┐
│ offset │ op │ ... │ match_id │ player_id │ score │
├────────┼────┼─────┼──────────┼───────────┼───────┤
│ 0 │ +A │ ... │ 1 │ Alice │ 100 │
│ 1 │ +A │ ... │ 1 │ Bob │ 80 │
└────────┴────┴─────┴──────────┴───────────┴───────┘
Leaderboard Dataset
Having all the historical scores we could already build a leaderboard using batch computations as simple as:
kamu sql
select
*
from (
select
row_number() over (order by score desc) as place,
match_time,
match_id,
player_id,
score
from "player-scores";
) where place <= 2
But there are many benefits to expressing it as a streaming pipeline, which we will discuss shortly.
For now let’s create a streaming leaderboard dataset. Luckily the query is almost identical to the batch variant:
kind: DatasetSnapshot
version: 1
content:
name: leaderboard
kind: Derivative
metadata:
- kind: SetTransform
inputs:
- datasetRef: player-scores
alias: player_scores
transform:
kind: Sql
engine: risingwave
queries:
- alias: leaderboard
# Note we are using explicit `crate materialized view` statement below
# because RW does not currently support Top-N queries directly on sinks.
#
# Note `partition by 1` is currently required by RW engine
# See: https://docs.risingwave.com/docs/current/window-functions/#syntax
query: |
create materialized view leaderboard as
select
*
from (
select
row_number() over (partition by 1 order by score desc) as place,
match_time,
match_id,
player_id,
score
from player_scores
)
where place <= 2
- query: |
select * from leaderboard
- kind: SetVocab
eventTimeColumn: match_time
Create this dataset:
kamu add leaderboard.yaml
Let’s “pull” the leaderboard
dataset to process all the data that is already in player-scores
and check the results:
kamu pull leaderboard
kamu tail leaderboard
┌────────┬────┬─────┬───────┬──────────┬───────────┬───────┐
│ offset │ op │ ... │ place │ match_id │ player_id │ score │
├────────┼────┼─────┼───────┼──────────┼───────────┼───────┤
│ 0 │ +A │ ... │ 1 │ 1 │ Alice │ 100 │
│ 1 │ +A │ ... │ 2 │ 1 │ Bob │ 80 │
└────────┴────┴─────┴───────┴──────────┴───────────┴───────┘
Introducing Retractions
Suppose we played another match with scores like these:
Alice: 70
Charlie: 90
Let’s see what happens to the leaderboard
dataset:
kamu ingest player-scores ./data/2.ndjson
kamu pull leaderboard
kamu tail leaderboard
Pay attention to the op
column:
┌────────┬────┬─────┬───────┬──────────┬───────────┬───────┐
│ offset │ op │ ... │ place │ match_id │ player_id │ score │
├────────┼────┼─────┼───────┼──────────┼───────────┼───────┤
│ 0 │ +A │ ... │ 1 │ 1 │ Alice │ 100 │
│ 1 │ +A │ ... │ 2 │ 1 │ Bob │ 80 │
│ 2 │ -R │ ... │ 2 │ 1 │ Bob │ 80 │
│ 3 │ +A │ ... │ 2 │ 2 │ Charlie │ 90 │
└────────┴────┴─────┴───────┴──────────┴───────────┴───────┘
Now this is very interesting:
- We still see the history of Alice and Bob being ranked 1 and 2 initially
- Then we see a “retraction” (
-R
) record that says “Bob is no longer ranked 2nd” - And an “append” (
+A
) record that says “Charlie is now ranked 2nd”
If we repeat this again with scores:
Bob: 60
Charlie: 110
As usual:
kamu ingest player-scores ./data/3.ndjson
kamu pull leaderboard
kamu tail leaderboard
As Alice was displaced by Charlie from the 1st place we see (starting from row offset: 4
) how both records are retracted and replaced by new values:
┌────────┬────┬─────┬───────┬──────────┬───────────┬───────┐
│ offset │ op │ ... │ place │ match_id │ player_id │ score │
├────────┼────┼─────┼───────┼──────────┼───────────┼───────┤
│ 0 │ +A │ ... │ 1 │ 1 │ Alice │ 100 │
│ 1 │ +A │ ... │ 2 │ 1 │ Bob │ 80 │
│ 2 │ -R │ ... │ 2 │ 1 │ Bob │ 80 │
│ 3 │ +A │ ... │ 2 │ 2 │ Charlie │ 90 │
│ 4 │ -R │ ... │ 1 │ 1 │ Alice │ 100 │
│ 5 │ -R │ ... │ 2 │ 2 │ Charlie │ 90 │
│ 6 │ +A │ ... │ 1 │ 3 │ Charlie │ 110 │
│ 7 │ +A │ ... │ 2 │ 1 │ Alice │ 100 │
└────────┴────┴─────┴───────┴──────────┴───────────┴───────┘
You are witnessing the retractions and corrections mechanism of ODF datasets!
So, why not Batch?
So what do we gain from having a changelog stream with retractions? Why not run a batch query every once in a while to get the current leaderboard?
The answer is: automation and infinite composability.
As your game evolves, you might want to start rewarding people at top spots with prizes. You may also want to reset the leaderboard each month to make people prove themselves again. All this logic can be expressed as additional steps in the streaming data pipeline.
If you write services that batch-query data at specific points in time and distribute rewards imagine what happens:
- if some player is caught cheating and has to be retroactively stripped of its match scores (input retraction)
- if score ingestion pipeline goes down for a day and people who did in fact get top scores end up not accounted for during the reward distribution (backfill)
- if your game is very high-stakes and your have to prove during an audit how the rewards were issued (provenance).
These situations that we call “abnormal” are actually very frequent and in batch world require highly error-prone manual intervention.
In the world of ODF streaming - late data, backfills, and input retractions are just normal occurrences. ODF pipelines know how to react to these events and can automatically reconcile their state without human intervention, and maintaining a verifiable provenance.
If you haven’t already - make sure to check out the Stock Market Trading example that introduces the watermark - another really important mechanism related to streams.