Open Data Fabric
As described in the Nature of Data section, the system operates only on data expressed as past events, observations, or propositions believed to be true at a certain time. For simplicity, we will use the term “event” throughout to refer to such data tuples.
Data is a set of Events stored in the system. Since events are immutable data can only grow over time. Conceptually it’s best to think of data as a full log of a potentially infinite event stream.
Data never appears in the system alone as we would not be able to tell where it came from and whether it can be trusted. Data always appears as part of a Dataset.
Offset is a monotonically increasing sequential numeric identifier that is assigned to every record and represents its position relative to the beginning of the dataset. Offsets are used to uniquely identify any record in the dataset. Offset of the first record in a dataset is
Since past Events are immutable, if some event is deemed incorrect later on it can only be rectified by issuing an explicit retraction or correction. Retraction and corrections are also represented as Events in the same stream of Data and differentiated by a special “operation type” field.
Data arrives into the system as the arbitrary large sets of events. We refer to them as “slices”.
More formally, a slice is a:
Metadata Chain captures all essential information about the Dataset, including:
- Where the data comes from (see Data Ingestion)
- How data was processed (see Query)
- Its Schema
- Log of all modifications made to the data, including information used to verify the integrity of data
- Current Watermark
Just like Data, the metadata chain also has a temporal nature. It consists of individual Metadata Blocks that refer to the previous block in the chain, forming a singly-linked list. Every block carries one of Metadata Events that describes how data evolved over time.
All Metadata Blocks are immutable and changes by appending new blocks. With blocks, data, and checkpoints named after and referenced by the hash of their content - a dataset forms a type of content-addressable system, where having a reference to the last Metadata Block one can traverse the entire chain to discover all the components of the dataset.
Metadata Chain also supports Block References that assign a certain symbolic name to a block hash, effectively acting as a named pointer. At the minimum all datasets have a
head reference that indicates the current last block in the Metadata Chain. Using multiple references the metadata chain can be organized into a directed acyclic graph that can form branches, allowing for example to stage some subset of events for review or an automated QA process before they are accepted into the main chain.
In addition to core events like adding data, running a query, and change of schema the Metadata Chain is designed to be extended to carry other kinds of information like:
- Extra meaning and structure of knowledge that data represents (glossary, semantics, ontology)
- Relevant policies, terms, rules, compliance, and regulations (governance)
- License, privacy and security concerns (stewardship)
- Information that aids discovery
- Collaboration information
These extensions are out of scope of this document.
Dataset is the main unit of data exchange in the system. It’s simply a combination of:
Depending on where the data comes from datasets can be of these kinds:
Root datasets are the points of entry of external data into the system. They are usually owned by the organization that has full authority and responsibility over that data, i.e. a trusted source.
Root dataset definition includes:
- Where to fetch the data from - e.g. source URL, a protocol to use, cache control
- How to prepare the binary data - e.g. decompression, file filtering, format conversions
- How to interpret the data - e.g. data format, schema to apply, error handling
- How to combine data ingested in the past with the new data - e.g. append as log or diff as a snapshot of the current state
All this information is stored in the Metadata Chain and can change over time as the dataset evolves.
Derivative datasets are created by transforming/combining one or multiple existing datasets.
They are defined by the combination of:
This information is stored in the Metadata Chain and can change over time as the dataset evolves.
Queries define how input data is combined, modified, and re-shaped to produce new data.
Queries are used in two contexts:
- When defining new Derivative Datasets
- When analyzing and extracting data from an existing Dataset (locally or from a repository)
The system is agnostic to the exact language used to define the query and the set of supported dialects can be extended by implementing a new Engine.
All queries, however, must have the following properties:
- Stream/Batch agnostic
In other words, they should be guaranteed to always produce the same result for the same input data, without side effects.
Example windowed aggregation query in streaming SQL:
TUMBLE_ROWTIME(event_time, INTERVAL '1' MONTH) as event_time,
min(price) as min_monthly_price,
max(price) as max_monthly_price,
avg(price) as avg_monthly_price
GROUP BY TUMBLE(event_time, INTERVAL '1' MONTH), sku_id
Engine is an interface shared by all specific implementations of a Query dialect. Engine implementations are responsible for applying defined queries to input data and returning the result. For example, some engines allows you to query data using a series of streaming SQL statements.
Engines run in a sandboxed environments and are not permitted to use any external resources to guarantee the reproducibility of all operations.
As Engines are in the full control of all data transformations, they are also responsible for answering the Provenance queries.
Checkpoints are used by the Engines to store the computation state between the different invocations of a Query. They are fully engine-specific and opaque to the system. They are however an essential durable part of a Dataset as they are necessary to be able to pause and resume the streaming queries, and are essential in implementing “exactly-once” processing semantics.
Coordinator is an application that implements the common Dataset management logic.
- Handles all Metadata Chain operations
- Splits the transformation work into batches based on the dataset’s evolution timeline
- Collects relevant data slices of the input datasets
- Delegates data processing to the Engines, making all transformations look to them as conventional stream processing
- Commits the resulting data slices and new metadata blocks
Ingestion is the process by which external data gets into the system. Typical ingestion steps that describe how data is obtained and read (e.g. fetching data from some URL on the web, decompressing it, and reading it as CSV) are a part of the Root Dataset definition.
By design, the system only stores data in the append-only event log format to preserve the entire history. Unfortunately, a lot of data in the world is not stored or exposed this way. Some organizations may expose their data in the form of periodic database dumps, while some choose to provide it as a log of changes between current and the previous export.
For example, when dealing with the daily database dumps, a user can choose the merge strategy that performs change data capture, transforming dumps into a set of events that signify record creation, update, or deletion.
Cryptographic hash functions are used by the system in these three scenarios:
- Computing a logical hash sum of a Data Slice.
- Computing a physical hash sum of a Data Slice.
- Computing a hash sum of a MetadataBlock.
Whenever new events are appended to the Data the Metadata Chain will also be extended with a block containing a hash sum of the new data slice. The hash sum provides a very quick and reliable way to later validate that the data matches the one that has been written earlier.
The new MetadataBlock will also be cryptographically signed to guarantee its integrity - this excludes any malicious or accidental alterations to the block.
- If the Metadata Chain of a certain dataset is reliably known (e.g. available from many independent peers) a peer can then download the Data from any untrusted source and use the hash function to validate the authenticity of every data slice that composes it.
- The trustworthiness of any Dataset can be established by reviewing the transformations it claims to be performing on data (contained in the Metadata Chain), re-applying those transformations in a trusted environment, and then comparing the hash sums of the result slices.
Data provenance describes the origins and the history of data and adds value to data by explaining how it was obtained.
Metadata Chain alone can already significantly narrow down the search space when you want to explain how a certain piece of data came to be, as it keeps track of all the inputs and queries used to create a dataset. But the goal of the provenance system is to make this type of inquiry effortless.
We differentiate the following kinds of provenance:
- Why-provenance - tells us which input data elements were inspected to decide that an element should be present in the output at all - i.e. defines a sufficient set of elements needed to produce the output.
- How-provenance - tells us the process by which the elements of why-provenance caused the output to appear
- Where-provenance - narrows down why-provenance to input data elements that were copied or transformed to determine the output element value.
Since the Engines are responsible for all data transformations, it’s also the Engine’s responsibility to answer provenance queries.
There are many different ways to implement provenance:
- By statically analyzing the queries
- By inverting transformations
- By repeating the computations and logging the data used at every step
- By propagating provenance data through all computations
Depending on the language used by an Engine one approach may work better in one situation than the other, so we avoid prescribing the exact method to use but instead standardize the language used for provenance-related queries and responses.
In the scope of this specification, verifiability of data means the ability to establish:
- The ultimate source(s) of data:
- Which Root Datasets the data is coming from
- Who these datasets belong to (ownership)
- And which actor has added the specific records (accountability)
- The transformations performed to create this data:
- The graph of Derivative Datasets upstream to the one being verified
- Authorship of those datasets (accountability)
- And finally, that the data in fact corresponds to performing declared transformations on the source data.
In other words, having root datasets
B and a derivative dataset
C = f(A, B):
- The data in
A(and similarly in
B) is verifiable if:
- The data in
Cis verifiable if:
The last step of ensuring that
f(A, B) = C can be achieved by several means:
- Reproducibility - by applying same transformations and comparing the results
- Verifiable computing - different types of proofs that can attest to validity of results without redoing the computations.
Examples of verifiable computing can include:
Verifiability should not be confused with trustworthiness or reality of data. Verifying a dataset doesn’t prove that the data in it is either truthful or more “real” than other data. The value of verifiability comes from establishing the provenance of data so that:
- One could understand whether data is coming from reputable sources - sources they can trust (attribution)
- One could review all derivative transformations applied to the data by intermediate actors (auditability).
Verifiability provides the foundation upon which trust in data can be built:
- First in the form authority - organizations putting their name behind the data they publish
- Secondly in the form of reputation - trusting the sources or pipelines used by large parts of the community
- Thirdly in the form of cross-validation - e.g. performing outlier detection on data from several similar publishers to establish common truth.
The system applies the idea of bitemporal data modelling to the event streams. It differentiates two kinds of time:
- System time - tells us when some event was observed by the system
- Event time - tells when some event occurred from the perspective of the outside world
Every record in the system has exactly one system time associated with it upon the ingestion but can have zero to many event times.
System time gives us a reference point for when something has occurred from the perspective of the system.
Projecting the data onto the system time axis answers the question: “what did the system know at the time T?”, meaning that such projections effectively freeze data in time, providing the natural way to achieve reproducibility.
For all intents and purposes system time is treated as ordered monotonically non-decreasing value that lets us establish a before-after relationship between certain events. Note, however, that before-after relationship is only meaningful for data within one Dataset and its upstream dependencies. System time cannot be used to establish an exact before-after relationship between the events of the independent datasets.
Event time tells us when something happened from the outside world’s perspective. This time, therefore, is usually the most useful one for querying and joining data.
There are no restrictions on the event time in the system - there can be many event times associated with any record, and unlike system time, event times don’t have to be monotonic. This allows the system to support many kinds and varieties of event time use, like:
- Post-dated events and predictions - with event time set into the future
- Back-dated events and corrections - with event time set into the past
Depending on the type of transformations these restrictions may be more strict, e.g. joining datasets based on event time may require it to be quasi-monotonic increasing to know when the join operation can be considered complete (see Watermarks).
A watermark defines the point in Event Time for which with a high probability we’ve already observed all preceding events.
When performing time-based windowed operation, aggregations, or joins it is important to know when a certain time window can be considered closed. Watermark tells the system “You most likely will not get event with time less than
In the Root Dataset events can still arrive even after their time interval has been already been closed by the watermark. Such events are considered “late” and it’s up to the individual Queries to decide how to handle them. They can be simply ignored, emitted into a side output, or still considered by emitting the “correction” event into the output.
Watermarks in the system are defined per every Metadata Block. By default, the Root Dataset will assign the watermark to the maximum observed Event Time in the Data Slice. You can and should override this behavior if you expect events to arrive out-of-order to some degree, e.g. offsetting the watermark by
1 day prior to last seen event.
Watermarks can also be set based on the System Time manually or semi-automatically. This is valuable for the slow moving Datasets where it’s normal not to see any events in days or even months. Setting the watermark explicitly allows all computations based on such stream to proceed, knowing that there were no events for that time period, where otherwise the output would be stalled assuming the Dataset was not updated for a while and old data can still arrive.
Retractions and Corrections
Errors in source data are inevitable and require a mechanism for correcting them. Unlike databases, where one could issue
UPDATE queries, ODF’s core data model is an immutable append-only stream, and thus requires a different mechanism.
Retractions and corrections are explicit events that can appear in Root datasets to signify that some previous event was published in error, or to correct some of its fields. They are differentiated from regular events by the special Operation Type field.
Retractions and corrections can also naturally occur in Derivative datasets in cases when a stream processing operation encounters late data (data arriving past the current Watermark). In such cases streaming transformation may publish corrections or retractions for previously produced result records that were influenced by the late events.
Retractions and corrections model is fundamental to making data processing maximally autonomous.
Repositories let participants of the system exchange Datasets with one another.
Repository definition includes:
- Location where the repository can be reached (URL)
- Protocols that it supports
- Credentials needed to access it
- Any necessary protocol-specific configuration
In the most basic form, a Repository can simply be a location where the dataset files are hosted over one of the supported file or object-based data transfer protocols. The owner of a dataset will have push privileges to this location, while other participants can pull data from it.
An advanced repository can support more functionality like:
- Push data API for publishers
- Subscription API for consumers
- Query API for making use of repository’s compute resources and reducing the amount of transferred data
In relational algebra, a projection is an operation that removes one or many dimensions from a data tuple. In the context of our system the most common projections are temporal projections involving the System Time and Event Time dimensions.
Depending on the time axis, we arrive at two most important types of projections in bitemporal data modelling:
- AS AT or “As we knew at that time”. This projection collapses the system time dimension and shows us what the state of the data was at that time to the best knowledge of the system.
- AS OF or “As we should’ve known at the time”. This projection collapses the event time dimension and shows us what should’ve happened at that time if we knew about all potential corrections and compensating events that were added since then.
Understanding the difference between these projections is essential when working with time series data and for achieving the reproducibility of results.
A workspace in the context of Kamu CLI is a directory that contains a hidden
.kamu sub-directory where
kamu stores local copies of datasets and configuration, including connection parameters to remote ODF repositories.
Kamu Node is a set of Kubernetes applications that can be installed in a distributed environment to operate data pipelines, verify computations done by other parties, and execute queries on co-located data. It is a horizontally-scalable back-end implementation of ODF protocol. Operating a node is the primary way to contribute compute resources to the ODF network.