Purpose

Open Data Fabric by design stores all data in the append-only event log format, always preserving the entire history. Unfortunately, a lot of data in the world is not stored or exposed this way. Some organizations may expose their data in the form of periodic database dumps, while some choose to provide it as a log of changes between current and the previous export.

When ingesting data from external sources, the Root Datasets can choose between different merge strategies that define how to combine the newly-ingested data with the existing one.

Types

Ledger

This strategy should be used for data sources containing append-only event streams. New data exports can have new rows added, but once data already made it into one export it should never change or disappear. A user-specified primary key is used to identify which events were already seen, not to duplicate them.

For more details see Schema Reference

Example

Imagine a data publisher that exposes dataset with historical populations of major cities around the world, for example:

Year,Country,City,Population
2019,CA,Vancouver,2581000
2019,US,Seattle,3433000

This dataset is temporal (has a year column), so it’s a good sign that we’re dealing with a ledger data.

Note that having a time column doesn’t always mean that the dataset is a ledger - to be a true ledger all previous records should be immutable and never change.

So we use the following root dataset manifest to ingest it:

kind: DatasetSnapshot
version: 1
content:
  name: cities-population
  kind: root
  metadata:
    - kind: setPollingSource
      ...
      merge:
        kind: ledger
        primaryKey:
          - country
          - city
    - kind: setVocab
      eventTimeColumn: year

Notice that we specify ledger merge strategy with composite primary key (country, city), and also set eventTimeColumn to use year as the source of event times.

The resulting dataset when ingested will look like this:

system_time event_time country city population
s1 2019 CA Vancouver 2581000
s1 2019 US Seattle 3433000

Now let’s say after a census in Vancouver our source data changes to:

Country,City,Population
2019,CA,Vancouver,2581000
2019,US,Seattle,3433000
2020,CA,Vancouver,2606000

So far this is a valid ledger data - history is preserved and changes are append-only.

Pulling the dataset will now result in the following history:

system_time event_time country city population
s1 2019 CA Vancouver 2581000
s1 2019 US Seattle 3433000
s1 2020 CA Vancouver 2606000

Note that the old events from 2019 were recognized as ones we already seen and were not added again.

Snapshot

This strategy can be used for data exports that are taken periodically and contain only the latest state snapshot of the observed entity or system. Over time such exports can have new rows added, and old rows either removed or modified.

It’s important to understand that publishers that use such formats routinely lose information. When a record in the database is updated, or one DB dump is replaced with another we not only lose the history of previous values, but you also lose the context of why those changes happened. This is a really bad default!

The snapshot strategy transforms such data sources into a history of changes by performing the change data capture. It relies on a user-specified primary key to correlate the rows between the two snapshots.

A new event is added into the output stream whenever:

  • A row with a certain primary key appears for the first time
  • A row with a certain key disappears from the snapshot
  • A row data associated with a certain key has changed

Each event will have an additional column that signifies the kind of observation that was encountered.

The Snapshot strategy also requires special treatment in regards to the event time. Since snapshot-style data exports represent the state of some system at a certain time - it is important to know what that time was. This time is usually captured in some form of metadata (e.g. in the name of the snapshot file, in the URL, or the HTTP caching headers. It should be possible to extract and propagate this time into a data column.

For more details see Schema Reference

Example

Imagine a data publisher that exposes dataset with current populations of major cities around the world, for example:

Country,City,Population
CA,Vancouver,2581000
US,Seattle,3433000

This dataset is non-temporal (doesn’t have any date/time columns), so it’s a clear sign we’re dealing with a snapshot data.

So we use the following root dataset manifest to ingest it:

kind: DatasetSnapshot
version: 1
content:
  name: cities-population
  kind: root
  metadata:
    - kind: setPollingSource
      fetch:
        kind: url
        url: https://...
        eventTime:
          kind: fromMetadata
      ...
      merge:
        kind: snapshot
        primaryKey:
        - country
        - city

Notice that we specify snapshot merge strategy with composite primary key (country, city). We also specify the eventTime of kind fromMetadata, instructing the ingest to use time from the caching headers as the event time of new records.

The resulting dataset when ingested will look like this:

system_time event_time observed country city population
s1 e1 I CA Vancouver 2581000
s1 e1 I US Seattle 3433000

Notice that since it’s the first time we ingested data both records have observed == 'I' values, i.e. “insert”.

Now let’s say after a census in Vancouver our source data changes to:

Country,City,Population
CA,Vancouver,2606000
US,Seattle,3433000

Pulling the dataset will now result in the following history:

system_time event_time observed country city population
s1 e1 I CA Vancouver 2581000
s1 e1 I US Seattle 3433000
s2 e2 U CA Vancouver 2606000

Append

Under this strategy, the new data will be appended to the Dataset in its original form without any modifications.

This strategy is rarely used in real datasets and mostly present for testing.