Open Data Fabric
Event
As described in the Nature of Data section, the system operates only on data expressed as past events, observations, or propositions believed to be true at a certain time. For simplicity, we will use the term “event” throughout to refer to such data tuples.Data
Data is a set of stored in the system. Since events are immutable data can only grow over time. Conceptually it’s best to think of data as a full log of a potentially infinite event stream. Data never appears in the system alone as we would not be able to tell where it came from and whether it can be trusted. Data always appears as part of a . See also:Schema
Schema describes the shape of the by associating names and data types to columns that data is composed of. Schema can change over time and its changes are tracked in the . Example:Offset
Offset is a monotonically increasing sequential numeric identifier that is assigned to every record and represents its position relative to the beginning of the dataset. Offsets are used to uniquely identify any record in the dataset. Offset of the first record in a dataset is0.
Operation Type
Since past are immutable, if some event is deemed incorrect later on it can only be rectified by issuing an explicit . Retraction and corrections are also represented as in the same stream of and differentiated by a special “operation type” field. See also:Data Slice
arrives into the system as the arbitrary large sets of events. We refer to them as “slices”. More formally, a slice is a:- Continuous part of
- That has the same
- Defined by its
[start; end]interval
Metadata
Refers to information about a stored in its .Metadata Chain
Metadata Chain captures all essential information about the , including:- Where the data comes from (see Data Ingestion)
- How data was processed (see )
- Its
- Log of all modifications made to the data, including information used to verify the integrity of data
- Current
head reference that indicates the current last block in the Metadata Chain. Using multiple references the metadata chain can be organized into a directed acyclic graph that can form branches, allowing for example to stage some subset of events for review or an automated QA process before they are accepted into the main chain.
In addition to core events like adding data, running a query, and change of schema the Metadata Chain is designed to be extended to carry other kinds of information like:
- Extra meaning and structure of knowledge that data represents (glossary, semantics, ontology)
- Relevant policies, terms, rules, compliance, and regulations (governance)
- License, privacy and security concerns (stewardship)
- Information that aids discovery
- Collaboration information
Dataset
Dataset is the main unit of data exchange in the system. It’s simply a combination of: Depending on where the data comes from datasets can be of these kinds:Root Dataset
Root datasets are the points of entry of external data into the system. They are usually owned by the organization that has full authority and responsibility over that data, i.e. a trusted source. Root dataset definition includes:- Where to fetch the data from - e.g. source URL, a protocol to use, cache control
- How to prepare the binary data - e.g. decompression, file filtering, format conversions
- How to interpret the data - e.g. data format, schema to apply, error handling
- How to combine data ingested in the past with the new data - e.g. append as log or diff as a snapshot of the current state
Derivative Dataset
Derivative datasets are created by transforming/combining one or multiple existing datasets. They are defined by the combination of:- Input datasets
- A to apply to those
- An used to execute the query
Query
Queries define how input data is combined, modified, and re-shaped to produce new data. Queries are used in two contexts:- When defining new
- When analyzing and extracting data from an existing (locally or from a )
- Deterministic
- Pure
- Stream/Batch agnostic
Engine
Engine is an interface shared by all specific implementations of a dialect. Engine implementations are responsible for applying defined queries to input data and returning the result. For example, some engines allows you to query data using a series of streaming SQL statements. Engines run in a sandboxed environments and are not permitted to use any external resources to guarantee the reproducibility of all operations. As Engines are in the full control of all data transformations, they are also responsible for answering the queries. See also:Checkpoint
Checkpoints are used by the to store the computation state between the different invocations of a . They are fully engine-specific and opaque to the system. They are however an essential durable part of a as they are necessary to be able to pause and resume the streaming queries, and are essential in implementing “exactly-once” processing semantics.Coordinator
Coordinator is an application that implements the common management logic. Core responsibilities:- Handles all operations
- Splits the transformation work into batches based on the dataset’s evolution timeline
- Collects relevant data slices of the input datasets
- Delegates data processing to the , making all transformations look to them as conventional stream processing
- Commits the resulting data slices and new metadata blocks
Ingestion
Ingestion is the process by which external data gets into the system. Typical ingestion steps that describe how data is obtained and read (e.g. fetching data from some URL on the web, decompressing it, and reading it as CSV) are a part of the definition. See also:Merge Strategy
By design, the system only stores data in the append-only event log format to preserve the entire history. Unfortunately, a lot of data in the world is not stored or exposed this way. Some organizations may expose their data in the form of periodic database dumps, while some choose to provide it as a log of changes between current and the previous export. When from external sources, the can choose between different Merge Strategies that define how to combine the newly-ingested data with the existing one. For example, when dealing with the daily database dumps, a user can choose the merge strategy that performs change data capture, transforming dumps into a set of events that signify record creation, update, or deletion. See also:Hash
Cryptographic hash functions are used by the system in these three scenarios:- Computing a logical hash sum of a .
- Computing a physical hash sum of a .
- Computing a hash sum of a .
- If the of a certain dataset is reliably known (e.g. available from many independent peers) a peer can then download the from any untrusted source and use the hash function to validate the authenticity of every data slice that composes it.
- The trustworthiness of any can be established by reviewing the transformations it claims to be performing on data (contained in the ), re-applying those transformations in a trusted environment, and then comparing the hash sums of the result slices.
Provenance
Data provenance describes the origins and the history of data and adds value to data by explaining how it was obtained. alone can already significantly narrow down the search space when you want to explain how a certain piece of data came to be, as it keeps track of all the inputs and queries used to create a dataset. But the goal of the provenance system is to make this type of inquiry effortless. We differentiate the following kinds of provenance:- Why-provenance - tells us which input data elements were inspected to decide that an element should be present in the output at all - i.e. defines a sufficient set of elements needed to produce the output.
- How-provenance - tells us the process by which the elements of why-provenance caused the output to appear
- Where-provenance - narrows down why-provenance to input data elements that were copied or transformed to determine the output element value.
- By statically analyzing the queries
- By inverting transformations
- By repeating the computations and logging the data used at every step
- By propagating provenance data through all computations
Verifiability
In the scope of this specification, verifiability of data means the ability to establish:- The ultimate source(s) of data:
- Which the data is coming from
- Who these datasets belong to (ownership)
- And which actor has added the specific records (accountability)
- The transformations performed to create this data:
- The graph of upstream to the one being verified
- Authorship of those datasets (accountability)
- And finally, that the data in fact corresponds to performing declared transformations on the source data.
A, B and a derivative dataset C = f(A, B):
- The data in
A(and similarly inB) is verifiable if:- of
Ais valid- Metadata block hashes are valid, forming a valid chain
- Blocks point to and with valid hashes
- of
- The data in
Cis verifiable if:- of
Cis valid - Data in
Ccorresponds to applyingf(A, B)according to all transformation steps declared in the .
- of
f(A, B) = C can be achieved by several means:
- Reproducibility - by applying same transformations and comparing the results
- Verifiable computing - different types of proofs that can attest to validity of results without redoing the computations.
- Trusted Execution Environments (TEEs)
- Non-interactive Proofs (including “zero-knowledge”).
- One could understand whether data is coming from reputable sources - sources they can trust (attribution)
- One could review all derivative transformations applied to the data by intermediate actors (auditability).
- First in the form authority - organizations putting their name behind the data they publish
- Secondly in the form of reputation - trusting the sources or pipelines used by large parts of the community
- Thirdly in the form of cross-validation - e.g. performing outlier detection on data from several similar publishers to establish common truth.
Time
The system applies the idea of bitemporal data modelling to the event streams. It differentiates two kinds of time:- - tells us when some event was observed by the system
- - tells when some event occurred from the perspective of the outside world
System Time
System time gives us a reference point for when something has occurred from the perspective of the system. the data onto the system time axis answers the question: “what did the system know at the time T?”, meaning that such projections effectively freeze data in time, providing the natural way to achieve reproducibility. For all intents and purposes system time is treated as ordered monotonically non-decreasing value that lets us establish a before-after relationship between certain events. Note, however, that before-after relationship is only meaningful for data within one and its upstream dependencies. System time cannot be used to establish an exact before-after relationship between the events of the independent datasets.Event Time
Event time tells us when something happened from the outside world’s perspective. This time, therefore, is usually the most useful one for querying and joining data. There are no restrictions on the event time in the system - there can be many event times associated with any record, and unlike system time, event times don’t have to be monotonic. This allows the system to support many kinds and varieties of event time use, like:- Post-dated events and predictions - with event time set into the future
- Back-dated events and corrections - with event time set into the past
Watermark
A watermark defines the point in for which with a high probability we’ve already observed all preceding events. When performing time-based windowed operation, aggregations, or joins it is important to know when a certain time window can be considered closed. Watermark tells the system “You most likely will not get event with time less thanT anymore”.
In the events can still arrive even after their time interval has been already been closed by the watermark. Such events are considered “late” and it’s up to the individual to decide how to handle them. They can be simply ignored, emitted into a side output, or still considered by emitting the “correction” event into the output.
Watermarks in the system are defined per every . By default, the will assign the watermark to the maximum observed in the . You can and should override this behavior if you expect events to arrive out-of-order to some degree, e.g. offsetting the watermark by 1 day prior to last seen event.
Watermarks can also be set based on the manually or semi-automatically. This is valuable for the slow moving where it’s normal not to see any events in days or even months. Setting the watermark explicitly allows all computations based on such stream to proceed, knowing that there were no events for that time period, where otherwise the output would be stalled assuming the was not updated for a while and old data can still arrive.
Retractions and Corrections
Errors in source data are inevitable and require a mechanism for correcting them. Unlike databases, where one could issueDELETE or UPDATE queries, ODF’s core data model is an immutable append-only stream, and thus requires a different mechanism.
Retractions and corrections are explicit events that can appear in datasets to signify that some previous event was published in error, or to correct some of its fields. They are differentiated from regular events by the special field.
Retractions and corrections can also naturally occur in datasets in cases when a stream processing operation encounters late data (data arriving past the current ). In such cases streaming transformation may publish corrections or retractions for previously produced result records that were influenced by the late events.
Retractions and corrections model is fundamental to making data processing maximally autonomous.
See also:
Repository
Repositories let participants of the system exchange with one another. Repository definition includes:- Location where the repository can be reached (URL)
- Protocols that it supports
- Credentials needed to access it
- Any necessary protocol-specific configuration
- Push data API for publishers
- Subscription API for consumers
- Query API for making use of repository’s compute resources and reducing the amount of transferred data
Projection
In relational algebra, a projection is an operation that removes one or many dimensions from a data tuple. In the context of our system the most common projections are temporal projections involving the and dimensions. Depending on the time axis, we arrive at two most important types of projections in bitemporal data modelling:- AS AT or “As we knew at that time”. This projection collapses the system time dimension and shows us what the state of the data was at that time to the best knowledge of the system.
- AS OF or “As we should’ve known at the time”. This projection collapses the event time dimension and shows us what should’ve happened at that time if we knew about all potential corrections and compensating events that were added since then.
Manifest
When objects are saved on disk or transmitted on the network the typically wrapped in an extra layer.Kamu
Dataset Snapshot
A with schema.Workspace
A workspace in the context of Kamu CLI is a directory that contains a hidden.kamu sub-directory where kamu stores local copies of datasets and configuration, including connection parameters to remote ODF repositories.
Node
Kamu Node is a set of Kubernetes applications that can be installed in a distributed environment to operate data pipelines, verify computations done by other parties, and execute queries on co-located data. It is a horizontally-scalable back-end implementation of ODF protocol. Operating a node is the primary way to contribute compute resources to the ODF network.Oracle
Blockchain Oracles provide information from the outside-world sources to the on-chain smart contracts. The need for oracles arises because in typical blockchain implementations nodes cannot call any external APIs, therefore are limited to operate only on the information that is stored on chain. Working in combination with ODF smart contracts, Kamu Node implements an ODF oracle provider component that allows requesting data from the ODF network and supplying it to the smart contracts through a blockchain transaction. Using ODF network oracle, smart contracts gain the ability to:- access outside-world data through ODF nodes
- execute very complex and resource-intensive data queries off-chain, for significant cost savings
- get results along with the and reproducibility information.