Motivation

Errors in source data are inevitable and require a mechanism for correcting them post factum. Unlike databases, where one could issue DELETE or UPDATE queries, our core data model is an immutable append-only stream, and thus requires a different mechanism to issue retractions and corrections for past events.

In cases when stream processing operations encounter late data (beyond the current watermark), or retractions and corrections in the input, they may also need to issue corrections or retractions for previously produced results that were influenced by these events.

We consider the correction / retraction model that works across all stages of data pipelines essential to making processing maximally autonomous. In turn, only by making data processing autonomous can collaborative data pipelines be sustainable at global scale.

Scenario

Consider a video game that writes events to a match_scores dataset as players complete the matches:

match_timematch_idplayer_namescore
t11Alice100
t11Bob80
t22Alice70
t22Charlie90
t33Bob60
t33Charlie110

Let’s say we want to create a dataset that continuously tracks the top two (for simplicity) highest-scoring players - a leader board.

This is the case of Top-N query and Window Top-N, which can be written in Apache Flink SQL as:

select
  *
from (
  select
    ROW_NUMBER() OVER (ORDER BY score desc) AS place,
    match_time,
    player_name,
    score,
  from match_scores
) where place <= 2

Representing changes in streams

There are several ways to represent how this query behaves when applied to the above input stream.

Retract Stream

The most generic way is a “retract stream” that only uses append +A and retract -R operations (note the new op column):

opplacematch_timeplayer_namescore
+A1t1Alice100
+A2t1Bob80
-R2t1Bob80
+A2t2Charlie90
-R1t1Alice100
-R2t2Charlie90
+A1t3Charlie110
+A2t3Alice100

In this model the updated state of the leader board is compared with the previous state after each new event, and necessary records first get retracted before being replaced with appends.

Upsert Stream

Using the knowledge that the place column plays the role of a unique key of the resulting state we could also represent the above as an “upsert stream” using only upsert +A and retract -R operations:

opplacematch_timeplayer_namescore
+A1t1Alice100
+A2t1Bob80
+A2t2Charlie90
+A1t3Charlie110
+A2t3Alice100

This additional knowledge allows us to significantly compact the stream.

Although the retract operation does not appear in our example, it is needed for completeness, e.g. imagine if the source stream retracted the result of the first match right after t1 - this would require us to empty the leader board with retractions too.

Changelog Stream (single event)

Some systems produce “changelog streams” containing append +A, retract -R, and correct +C operations with update carrying both the new values and the old values of the record being changed:

opplacematch_timeplayer_namescorematch_time_oldold_player_nameold_score
+A1t1Alice100
+A2t1Bob80
+C2t2Charlie90t1Bob80
+C1t3Charlie110t1Alice100
+C2t1Alice100t2Charlie90

This format is also used by CDC systems like Debezium and as an internal data representation in Arroyo.

This format is the most “informative” one, as it differentiates retractions from corrections, and provides access to both the new and the previous state within one event. The drawback is that it significantly impacts the schema to essentially allow carrying two events (old and new) in one record.

Changelog Stream (two events)

The Apache Flink’s “changelog streams” variant is using append +A, retract -R, correct-from -C, and correct-to +C operations. Here, the “correct-from” events carry the previous values of the record about to be updated and “correct-to” events carry the new values, with the restriction that these events must always appear side by side and in order.

opplacematch_timeplayer_namescore
+A1t1Alice100
+A2t1Bob80
-C2t1Bob80
+C2t2Charlie90
-C1t1Alice100
+C1t3Charlie110
-C2t2Charlie90
+C2t1Alice100

By splitting the update operation in two events this format does not require extending the schema with multiple columns.

Due to the restriction that correct from/to events appear side by side, this format can be easily converted into single-event changelog stream form upon reading.

ODF Model

ODF uses the two-event changelog stream as its base model.

Just like in our example above a special op column carries the operation type of each record.

Retractions and corrections are supported on root dataset level via merge strategies.

We are currently working to integrate their support into all processing engines for seamless propagation across derivative datasets.