Version: 0.18.0 (this is a rendered version of the original spec)
Open Data Fabric is an open protocol specification for decentralized exchange and transformation of semi-structured data that aims to holistically address many shortcomings of the modern data management systems and workflows.
Develop a method of semi-structured data exchange that would:
- Enable worldwide collaboration around data cleaning, enrichment, and transformation.
- Create an environment of verifiable trust between participants without the need for a central authority.
- Enable a high degree of data reuse, making quality data more readily available.
- Reduce the time it takes data to propagate from publishers to consumers.
- Introduce a feedback loop between the consumers and publishers of data that would facilitate improvements around data availability, recency, and design.
- Provide foundation for implementing data anonymization techniques to simplify and speed up the exchange of vital but sensitive data.
- Complete historical account - when data is used to gain insight and to drive decision making discarding or modifying data is akin to rewriting history. The history of all data observed by the system must be preserved.
- Reproducibility - the ability to reproduce the results is a cornerstone of the scientific method without which the process and findings of one party cannot be verified by others. Therefore we require that all the transformations performed within the system must be fully reproducible, and it must be possible to get a reference to data that is frozen in time and never changes to achieve reproducibility of any process that uses it.
- Verifiability - any party that plans to use some data must be able to verify its validity.
- Provenance - regardless of how many transformation stages the data went through, it should be possible to trace any individual data cell back to its ultimate source and understand which data contributed to its existence and its value.
- Transparency - the method itself should be developed in the open, easy to understand, and reason about.
- Timeliness - the method should aim to minimize the propagation time of data through the transformation pipeline. The frequency with which data is presented to consumers should be optimized for their experience and usability, not dictated by limitations of the data pipeline.
- Transactionality - once data is in the system all operations should have ACID properties and exactly-once semantics.
- Decentralization - there must be no entity in the system with any kind of special privileges over data, or abilities to circumvent any of the above requirements.
The primary focus of this specification is the mission-critical data such as:
- Business performance data (sales, CRM)
- Governmental data
- Financial data
- Healthcare data
- Scientific data
- As well as any other data used for decision making
This specification does not target very high volume sources like IoT and sensor data where infinite retention of the entire history may be cost-prohibitive. We do, however, hope to shift the mindset of the data science community towards thinking about such cases as design compromises. Routine data loss should no longer be our default mode of operations.
Nature of Data
In modern data science “data” is a very broad term that includes many shapes and forms of quantitative and qualitative records. We will not attempt to provide a taxonomy here but will claim that the large portion of data published today carries in it the limitations and design compromises of systems that generated it.
Snapshot data - many OLTP systems are designed to only maintain the current state of the information. The data sourced from OLTP systems is usually in the form of periodic database dumps (snapshots) or change data capture logs at best.
Such data has many limitations:
- It’s often accompanied by data loss (e.g. all data changes between the two snapshots are lost).
- Data is anemic and doesn’t carry the business context of the changes.
Aggregated data is the form of data in which information from several measurements is combined and expressed in a collective or summary form. This form of data is very common in governmental and healthcare sources, where aggregations often hide important deprivations and inequalities between gender, ethnic, and other groups.
Although such styles of data have become the norm for many major data publishers today, we believe that such treatment is inadequate and at odds with many of the requirements we put towards the system. Data is our modern-day history book and should be treated as such.
To achieve the desired qualities we choose the following narrow definition of data:
Data is a set of events, observations, or propositions believed to be true at a certain time.
- Data is a set of event records / relational tuples
- Events are immutable
- Event set can only grow
- Every event set can be viewed as a potentially infinite event stream
Such representation poses many additional challenges when working with data, but, as we will show further - the benefits by far outweigh the added complexity, and that complexity can in most cases be addressed by better tooling.
Nature of Transformations
The state-of-the-art approach to transforming data today is to version the source data (using a checksum or a stable reference) and version the code that transforms it (using a version control system). The result of transformations is then uploaded to some shared storage and made available to others. There are many tools that improve the reproducibility of such workflows, but all of them treat data as a mere binary blob, deprived of any of its intrinsic qualities.
This leads to many problems:
- Data is routinely copied
- Modified versions of data essentially create new datasets that cannot be easily traced to their origin
- The history of transformations (provenance) cannot be easily established
- It is practically impossible to verify that no accidental or malicious alterations were made
As a result, our workflows create large quantities of datasets that are disjoint from their sources, that contribute to the overall “noise” when searching for data but cannot be meaningfully reused. Any data scientist that cares about the validity of their results therefore mush begin with a clean slate and work with the primary authoritative source of data.
Modern data science is stuck in a loop where all forward progress is constantly lost because no mechanism exists for making incremental improvements to data.
Components of Trust
We believe that the most important factor in enabling reuse and collaboration on data is trust.
While the implicit trust may be enough within an enterprise, trust on the global scale cannot be blind - it is only possible through transparency, reproducibility, and verifiability of the results. Those qualities cannot be easily layered on top of the existing tools, so we designed the system from the ground up with those qualities at its core.
In our design we achieve trust with following solution criteria:
- When using any data you must be able to trace its source to a trusted authority that originally published it
- All transformations that were applied to it must be known and can be audited
- The results of those transformations can always be reproduced by following the same transformation steps
- Transformations are expressed using Queries written in any of the supported languages.
- Queries combine and transform one or many input data streams into an output stream.
- Queries are executed by the Engines - a software that knows how to interpret and execute a certain query dialect.
- The results of queries must be deterministic - always produce the same results for the same set of inputs.
- Engines run in a “sandbox” environment, where they don’t have access to a network or any other external resources.
- Engines are strictly versioned as their implementation contributes to the total determinism of the operation.
A transformation executed with the same query, on the same inputs, and using the same version of an engine is guaranteed to be reproducible.
This is indeed a very hard departure from the ways of working with data that we are used to. It prohibits using anything external to the system - there can be no API calls, no manual editing of data, no use of 3rd party libraries that are not part of the engine.
The benefits we get in exchange for this strictness are, however, immense:
- Complete transparency on where data comes from.
- Ability to audit all transformations.
- Ability to easily reproduce and verify all results.
All this combined allows us to build verifiable trust between parties.
Derivative Data Transience
Since past data in the input datasets is immutable and all transformations are required to be deterministic - derivative datasets are functionally dependent on their inputs. This means that for any given transformation graph it is possible to fully reconstruct data of all derivative datasets from the data of root datasets by simply re-applying all transformations.
Data in derivative datasets can be considered a form of caching and does not require durable storage.
Stream Processing Model
As we model all data as potentially infinite event streams (see Nature of Data), it’s quite natural to use the stream processing techniques for all data transformations.
When working with temporal events, stream processing has the following benefits:
- Lets users define a query once and potentially run it forever. This nicely addresses the timeliness requirement, allowing us to minimize the latency with which data propagates through the system.
- Streaming queries are expressive and are closer to “which question is being asked” as opposed to “how to compute the result”. They are usually much more concise than equivalent batch queries.
- They can be expressed in a way that is agnostic of how and how often the new data arrives. Whether the data is ingested once a month in Gigabyte batches, in micro-batches every hour, or as a true near real-time stream - processing logic can stay the same, produce the same results, and guarantee the best propagation times possible.
- Modern stream processing techniques make complex problems like handling late and out-of-order data much simpler, more explicit, and significantly less error-prone than equivalent batch operations.
Essential Role of Metadata
Metadata is the cornerstone of our design. It contains every aspect of where the data came from, how it was transformed, and everything that ever influenced how data looks like throughout its entire lifetime. In a way it’s like a digital passport of data using which we can audit and verify its validity (see Data Sharing).
The metadata is always updated whenever data gets updated and stays fully consistent with it. The two are cryptographically linked, making spoofing it near impossible.
Metadata also has an immutable append-only nature. While the rest of the document describes it as a standalone technology, its format is designed to make it easy to integrate with existing ledger-based systems such as blockchain.
Evolution of Data
As the nature of businesses change, as new requirements arrive, as defects are detected and fixed - it’s not a matter of if but when the time comes to make changes to the data. Calling data a potentially infinite stream would not make much sense without providing a way to improve and evolve it over time. Having to create a new dataset every time you need to change the schema or update the transformation would jeopardize the timeliness requirement, as the entire chain of transformations would have to be rebuilt from scratch after every such change.
Our design goals are, therefore:
- To seamlessly support all backward-compatible data schema changes
- To support the evolution of transformations over time
- To provide a way to correct past mistakes in data
While for simplicity we previously considered derivative datasets to be only defined by one transformation query, in fact it can comprise of multiple queries that were active during different periods of time. The metadata keeps track of which parts of inputs were processed by which query and which parts of output were produced as the result, ensuring reproducibility. This idea will be further explored when we discuss the Metadata Chain, but for now, get used to the idea that almost anything that defines a dataset can change over time.
The guiding factors in designing how data is shared were:
- Minimal storage requirements - while a certain degree of replication is necessary to keep data highly available, we don’t want to duplicate data more than it’s necessary.
- Decentralization - there should be no central authority that decides what the “right data” is.
- Verifiable trust - even when data is hosted by a single party any tampering or alterations must still be impossible.
- Minimize the movement of data - when a trusted party keeps data in an environment that also provides compute resources, we would like to be able to process and query data without the need to always download it locally.
The way we designed Metadata plays a crucial role in data sharing.
Metadata is usually several orders of magnitude smaller than the data it describes, so it’s ideally suited to be widely shared. The exact way it’s shared is out of scope of this document, but in most simple cases it can be published openly by the provider of data and cryptographically signed to ensure authenticity.
When the metadata of a certain dataset is reliably known, a peer can then download the data from any untrusted source and use the metadata validate the authenticity of every data slice that composes it (see Hash Function) - this means that we need to only ensure the validity of metadata, while the data can be stored almost anywhere with minimal replication factor, as long as one compromised party does not result in the complete loss of data.
Metadata also provides us with a way to establish the trustworthiness of any dataset by reviewing the transformations declared in it, re-applying those transformations in a trusted environment, and comparing the results to the original data. In a distributed system, having peers cross-validate each others’ published data can guarantee trusted results and allow them to promptly identify and exclude malicious peers from the network.
To achieve a perfect reproducibility the system needs to satisfy very strong transactional properties:
- All coordination operations including things like ingesting, downloading, and sharing data, creating and updating datasets must have ACID properties.
- All data transformations must have exactly-once semantics.
Concepts and Components
As described in the Nature of Data section, the system operates only on data expressed as past events, observations, or propositions believed to be true at a certain time. For simplicity, we will use the term “event” throughout to refer to such data tuples.
Data is a set of Events stored in the system. Since events are immutable data can only grow over time. Conceptually it’s best to think of data as a full log of a potentially infinite event stream.
Data never appears in the system alone as we would not be able to tell where it came from and whether it can be trusted. Data always appears as part of a Dataset.
Schema describes the shape of the data by associating names and data types to columns that data is composed of. Schema can change over time and its changes are tracked in the Metadata Chain.
registration_time TIMESTAMP(3), registration_id UUID, email STRING, first_name STRING, last_name STRING, date_of_birth DATE,
Data arrives into the system as the arbitrary large sets of events. We refer to them as “slices”.
More formally, a slice is a:
- Continuous part of Data
- That has the same Schema
- Defined by its
[start; end]System Time interval
Metadata chain captures all essential information about the Dataset, including:
- Where the data comes from (see Data Ingestion)
- How data was processed (see Query)
- Its Schema
- Log of all modifications made to the data, including information used to verify the integrity of data
- Current Watermark
Just like Data, the metadata chain also has a historical nature. It consists of individual Metadata Blocks that are linked together, forming a full timeline of how data was evolving. Much Events, all metadata blocks are immutable.
Metadata can be extended to carry other kinds of information like:
- Extra meaning and structure of knowledge that data represents (glossary, semantics, ontology)
- Relevant policies, terms, rules, compliance, and regulations (governance)
- License, privacy and security concerns (stewardship)
- Information that aids discovery
- Collaboration information
These extensions are out of scope of this document.
Dataset is the main unit of data exchange in the system. It’s simply a combination of:
Depending on where the data comes from datasets can be of these kinds:
Root datasets are the points of entry of external data into the system. They are usually owned by the organization that has full authority and responsibility over that data, i.e. a trusted source.
Root dataset definition includes:
- Where to fetch the data from - e.g. source URL, a protocol to use, cache control
- How to prepare the binary data - e.g. decompression, file filtering, format conversions
- How to interpret the data - e.g. data format, schema to apply, error handling
- How to combine data ingested in the past with the new data - e.g. append as log or diff as a snapshot of the current state
All this information is stored in the Metadata Chain and can change over time as the dataset evolves.
Derivative datasets are created by transforming/combining one or multiple existing datasets.
They are defined by the combination of:
This information is stored in the Metadata Chain and can change over time as the dataset evolves.
Queries define how input data is combined, modified, and re-shaped to produce new data.
Queries are used in two contexts:
- When defining new Derivative Datasets
- When analyzing and extracting data from an existing Dataset (locally or from a repository)
The system is agnostic to the exact language used to define the query and the set of supported dialects can be extended by implementing a new Engine.
All queries, however, must have the following properties:
- Stream/Batch agnostic
In other words, they should be guaranteed to always produce the same result for the same input data, without side effects.
Example windowed aggregation query in streaming SQL:
SELECT TUMBLE_ROWTIME(event_time, INTERVAL '1' MONTH) as event_time, sku_id, min(price) as min_monthly_price, max(price) as max_monthly_price, avg(price) as avg_monthly_price FROM sku_prices GROUP BY TUMBLE(event_time, INTERVAL '1' MONTH), sku_id
Engine is an interface shared by all specific implementations of a Query dialect. Engine implementations are responsible for applying defined queries to input data and returning the result. For example, some engines allows you to query data using a series of streaming SQL statements.
Engines run in a sandboxed environments and are not permitted to use any external resources to guarantee the reproducibility of all operations.
As Engines are in the full control of all data transformations, they are also responsible for answering the Provenance queries.
Checkpoints are used by the Engines to store the computation state between the different invocations of a Query. They are fully engine-specific and opaque to the system. They are however an essential durable part of a Dataset as they are necessary to be able to pause and resume the streaming queries, and are essential in implementing “exactly-once” processing semantics.
Coordinator is an application that implements the common Dataset management logic.
- Handles all Metadata Chain operations
- Splits the transformation work into batches based on the dataset’s evolution timeline
- Collects relevant data slices of the input datasets
- Delegates data processing to the Engines, making all transformations look to them as conventional stream processing
- Commits the resulting data slices and new metadata blocks
Ingestion is the process by which external data gets into the system. Typical ingestion steps that describe how data is obtained and read (e.g. fetching data from some URL on the web, decompressing it, and reading it as CSV) are a part of the Root Dataset definition.
By design, the system only stores data in the append-only event log format to preserve the entire history. Unfortunately, a lot of data in the world is not stored or exposed this way. Some organizations may expose their data in the form of periodic database dumps, while some choose to provide it as a log of changes between current and the previous export.
When ingesting data from external sources, the Root Datasets can choose between different Merge Strategies that define how to combine the newly-ingested data with the existing one.
For example, when dealing with the daily database dumps, a user can choose the merge strategy that performs change data capture, transforming dumps into a set of events that signify record creation, update, or deletion.
Cryptographic hash functions are used by the system in these two scenarios:
- For computing a checksum of a Data Slice.
- For computing a checksum of a MetadataBlock.
Whenever new events are appended to the Data the Metadata Chain will also be extended with a block containing a checksum of the new data slice. The checksum provides a very quick and reliable way to later validate that the data matches the one that has been written earlier.
The new MetadataBlock will also be cryptographically signed to guarantee its integrity - this excludes any malicious or accidental alterations to the block.
- If the Metadata Chain of a certain dataset is reliably known (e.g. available from many independent peers) a peer can then download the Data from any untrusted source and use the hash function to validate the authenticity of every data slice that composes it.
- The trustworthiness of any Dataset can be established by reviewing the transformations it claims to be performing on data (contained in the Metadata Chain), re-applying those transformations in a trusted environment, and then comparing the checksums of the result slices.
Data provenance describes the origins and the history of data and adds value to data by explaining how it was obtained.
Metadata Chain alone can already significantly narrow down the search space when you want to explain how a certain piece of data came to be, as it keeps track of all the inputs and queries used to create a dataset. But the goal of the provenance system is to make this type of inquiries effortless.
We differentiate the following kinds of provenance:
- Why-provenance - tells us which input data elements were inspected to decide that an element should be present in the output at all - i.e. defines a sufficient set of elements needed to produce the output.
- How-provenance - tells us the process by which the elements of why-provenance caused the output to appear
- Where-provenance - narrows down why-provenance to input data elements that were copied or transformed to determine the output element value.
Since the Engines are responsible for all data transformations, it’s also the Engine’s responsibility to answer provenance queries.
There are many different ways to implement provenance:
- By statically analyzing the queries
- By inversing transformations
- By repeating the computations and logging the data used at every step
- By propagating provenance data through all computations
Depending on the language used by an Engine one approach may work better in one situation than the other, so we avoid prescribing the exact method to use but instead standardize the language used for provenance-related queries and responses.
The system applies the idea of bitemporal data modelling to the event streams. It differentiates two kinds of time:
- System time - tells us when some event was observed by the system
- Event time - tells when some event occurred from the perspective of the outside world
Every record in the system has exactly one system time associated with it upon the ingestion but can have zero to many event times.
System time gives us a reference point for when something has occurred from the perspective of the system.
Projecting the data onto the system time axis answers the question: “what did the system know at the time T?”, meaning that such projections effectively freeze data in time, providing the natural way to achieve reproducibility.
For all intents and purposes system time is treated as ordered monotonically non-decreasing value that lets us establish a before-after relationship between certain events. Note, however, that before-after relationship is only meaningful for data within one Dataset and its upstream dependencies. System time cannot be used to establish an exact before-after relationship between the events of the independent datasets.
Event time tells us when something happened from the outside world’s perspective. This time, therefore, is usually the most useful one for querying and joining data.
There are no restrictions on the event time in the system - there can be many event times associated with any record, and unlike system time, event times don’t have to be monotonic. This allows the system to support many kinds and varieties of event time use, like:
- Post-dated events and predictions - with event time set into the future
- Back-dated events and corrections - with event time set into the past
Depending on the type of transformations these restrictions may be more strict, e.g. joining datasets based on event time may require it to be quasi-monotonic increasing to know when the join operation can be considered complete (see Watermarks).
A watermark defines the point in Event Time for which with a high probability we’ve already observed all preceding events.
When performing time-based windowed operation, aggregations, or joins it is important to know when a certain time window can be considered closed. Watermark tells the system “You most likely will not get event with time less than
T any more”.
In the Root Dataset events can still arrive even after their time interval has been already been closed by the watermark. Such events are considered “late” and it’s up to the individual Queries to decide how to handle them. They can be simply ignored, emitted into a side output, or still considered by emitting the “correction” event into the output.
Watermarks in the system are defined per every Metadata Block. By default the Root Dataset will assign the watermark to the maximum observed Event Time in the Data Slice. You can and should override this behavior if you expect events to arrive out-of-order to some degree, e.g. offsetting the watermark by
1 day prior to last seen event.
Watermarks can also be set based on the System Time manually or semi-automatically. This is valuable for the slow moving Datasets where it’s normal not to see any events in days or even months. Setting the watermark explicitly allows all computations based on such stream to proceed, knowing that there were no events for that time period, where otherwise the output would be stalled assuming the Dataset was not updated for a while and old data can still arrive.
Repositories let participants of the system exchange Datasets with one another.
Repository definition includes:
- Location where the repository can be reached (URL)
- Protocols that it supports
- Credentials needed to access it
- Any necessary protocol-specific configuration
In the most basic form, a Repository can simply be a location where the dataset files are hosted over one of the supported file or object-based data transfer protocols. The owner of a dataset will have push privileges to this location, while other participants can pull data from it.
An advanced repository can support more functionality like:
- Push data API for publishers
- Subscription API for consumers
- Query API for making use of repository’s compute resources and reducing the amount of transferred data
In relational algebra, a projection is an operation that removes one or many dimensions from a data tuple. In the context of our system the most common projections are temporal projections involving the System Time and Event Time dimensions.
Depending on the time axis, we arrive at two most important types of projections in bitemporal data modelling:
- AS AT or “As we knew at that time”. This projection collapses the system time dimension and shows us what the state of the data was at that time to the best knowledge of the system.
- AS OF or “As we should’ve known at the time”. This projection collapses the event time dimension and shows us what should’ve happened at that time if we knew about all potential corrections and compensating events that were added since then.
Understanding the difference between these projections is essential when working with time series data and for achieving the reproducibility of results.
Identity formats described below are used to unambiguously refer to a certain dataset. Depending on the context we differentiate the following formats of dataset identity:
- Local Format - used to refer to a dataset within a local workspace
- Remote Format - used to refer to a dataset located in a known Repository provider.
- Remote Multi-Tenant Format - used to refer to a dataset located in a known Repository and belonging to a particular tenant of that provider.
As you will see in the examples below, we recommend (but not require) using the reverse domain name notation for Dataset names - this style has proven to be easy to work with, carries an additional value in identifying the authoritative source of the information or the entity that maintains the Derivative Dataset, and it largely excludes the possibility of name collisions when working with similarly named datasets from different providers.
// Local Format - DatasetID is highlighted ca.vancouver.opendata.property.parcel-polygons org.geonames.cities com.naturalearthdata.admin0.countries.10m // Remote Format - Repository prefix is highlighted statcan.gc.ca/ca.gc.statcan.census.2016.population nyc.gov/us.cityofnewyork.public-safety.ems-incident-dispatch // Remote Multi-tenant Format - Username infix is highlighted opendata.ca/statcan/ca.gc.statcan.census.2016.population data.gov/ny-newyork/us.cityofnewyork.public-safety.ems-incident-dispatch
Full PEG grammar:
DatasetRef = (Repository "/")? (Username "/")? DatasetID DatasetID = Hostname Repository = Hostname Username = Subdomain Hostname = Subdomain ("." Subdomain)* Subdomain = [a-zA-Z0-9]+ ("-" [a-zA-Z0-9]+)*
We differentiate two data formats by their purpose:
- In-memory format - used when passing data around between the subsystems (e.g. when Coordinator communicates with the Engine).
- On-disk format - used for data at rest and by query engines when querying data from the entire dataset.
For our in-memory format we chose Apache Arrow, as it is:
- Purposefully designed for minimal overhead interoperability between data processing systems
- Minimizes copying of data
- Hardware efficient
- Supports streaming
For our on-disk format we choose Apache Parquet, as it is:
- Fast to decode into Apache Arrow
- Space and IO efficient thanks to the built-in compression
- Efficient for querying thanks to the columnar structure and built-in basic indexing
Data on disk is stored in the multiple “part” files. Once a part file is written it is immutable for the entire lifetime of the dataset.
We chose SQL-like DDL syntax for defining data Schemas, as it:
- Operates with logical types, abstracting the physical data layout from the user
- Widely familiar
- Has high interoperability with modern data processing systems
registration_time TIMESTAMP(3), registration_id UUID, email STRING, first_name STRING, last_name STRING, date_of_birth DATE,
|DDL Type||Parquet Type|
- Standardize DDL for nested data structures (nested data support highly varies between vendors)
- Investigate support for lists/arrays
- Investigate support for schema-less data (JSON/BSON)
Common Data Schema
All data in the system is guaranteed to have the following columns:
||System Time denotes when an event first appeared in the dataset. This will be an ingestion time for events in the Root Dataset or transformation time in the Derivative Dataset|
||Event Time denotes when to our best knowledge an event has ocurred in the real world. This time is used for most time-based windowed computations, aggregations, and joins|
The requirements we put towards the metadata format are:
- Fast read performance
- Complex type support (particularly nested structures and unions)
- Allows for forward and backward compatibility
- Allows for a controlled way of making breaking changes
- Possibility to read and write in human-friendly easily editable format
- Schema-based validation for user-specified data
We use the following combination of formats to satisfy these requirements:
- YAML is used for human-friendly input and output
- This format is concise and very readable and widely supported
- Only a JSON-compatible subset of
YAMLwill be used
- JSON Schema may be used for validating user-specified
- Widely supported and expressive grammar for validating
- Generates readable and easy to understand errors
- Widely supported and expressive grammar for validating
- FlatBuffers format is used for binary representation
- Highly efficient and cross-platform serialization format
JSON Schemas and
FlatBuffers Schemas for all metadata objects are provided as part of this specification (see Metadata Reference).
The recommended layout of the dataset on disk is:
This layout must be used when sharing datasets via file or object-based Repositories (e.g. local FS, S3, IPFS, DAT…).
When a Dataset is imported locally, the exact layout is left entirely up to the Coordinator implementation, as we expect all interactions with the Dataset to go through it.
This section provides the details on the contract between an Engine and the Coordinator.
Engine executes in a very restricted environment (a “sandbox”) that prevents it from accessing any external resources other than those explicitly provided by the Coordinator. Isolating the Engine from making network calls and accessing any random files ensures both that the Data being processed cannot be leaked, and that non-deterministic behavior is harder to introduce accidentally.
Our isolation technology of choice is OCI containers - a lightweight mechanism that provides good performance and strong isolation guarantees. We rely on OCI-compatible images as the distribution mechanism for the engines.
TODO: Add exact
dockercommands to reproduce the sandboxed environment
This section describes the RPC mechanism used in communications between the Coordinator and the Engine.
The requirements we put towards the RPC mechanism are:
- Use common network protocols
- Wide support in many programming languages
- Support streaming responses (e.g. for streaming back the operation status and logs)
Our RPC technology of choice is gRPC because:
- It’s cross-platform, mature, and available in many languages
- Natively supports FlatBuffers which is our metadata format of choice, letting us reuse object schemas
- Supports streaming
- For our narrow use case, its convenience is more important than interoperability, as we don’t expect to have many server and client implementations
Engine implementations must expose the
gRPC API on port
Note: When starting an Engine container, the Coordinator will consider it fully initialized when it can establish a TCP connection to the port
The exchange of raw data happens out-of-band of the
gRPC API. Input and output Data Slices are exchanged between the Coordinator and the Engine using the memory-mapped files (
mmap) containing data in the common in-memory format. This avoids any intermediate IO and minimizes the copying of data for the best performance.
- Needs more details on accessing input data and writing output
Engine implementation should support the following operations:
- Validate query - Validates the user-specified query for basic syntax and schema correctness.
- Execute query - Performs the next iteration of the transformation.
- Migrate query - Updates the transformation state from one query to another.
- Derive Provenance - Explains the origin of some data produced in the past.
This operation may be used by the Coordinator when creating a new Derivative Dataset or when changing the Query of an existing one to validate the user-specified query for basic syntax and schema correctness.
- Schemas of the input Datasets
- (optional) Previous Query in case of an existing Dataset modification
- Schema of the result
- (alternatively) Validation errors
This operation is used by the Coordinator to perform the next iteration of the transformation.
The Coordinator is designed to isolate the Engines from the complexity of managing the Metadata Chain and make processing look as close as possible to a regular stream processing.
- Transaction ID
- Input Data Slices
- Input Watermakrs
- Previous Checkpoint
- Output Data Slice
- Output Watermark
- New Checkpoint
- Operation progress, status, and logs
This operation must be idempotent (see Implementing Exactly-Once Semantics).
When an operation is committed by the Coordinator, the Engine will not see the same input data again. If due to the nature of the query (e.g. windowing or watermarking configuration) Engine cannot fully process and discard the input data - it should use the Checkpoint directory to buffer it.
Implementing Exactly-Once Semantics
Because the Engine passes back the resulting data in memory (see Data Exchange) it is possible that the Coordinator will crash before fully committing it. This may result in an inconsistent state where the Engine has already updated the persistent Checkpoint data on disk, but the result was lost.
To support the “exactly-once” semantics the Coordinator includes the
Transaction ID parameter that uniquely identifies the processing step being performed. The Engine should maintain its Checkpoint data in a way that permits re-running the last transaction multiple times while producing the same result (i.e. recovery by rolling forward).
Only the last processing step can be retried like this using the same exact input parameters. Upon seeing a different
Transaction ID the Engine is free to clean up whatever Checkpoint state it used to implement the idempotence.
This operation is used by the Coordinator when data processing hits the point where one transformation Query is being replaced by another one. It gives the Engine all information needed to handle the transition as gracefully as possible, e.g. by reconciling the existing Checkpoints with the new Query, or, at the very least, finalizing the processing of the old Query and clearing the Checkpoints before the new one starts to execute.
- Transaction ID
- Previous Query
- Next Query
- Previous Checkpoint
- New Checkpoint
- (optional) Data that has been produced when finalizing the old Query
This operations is used to trace back a set of events in the output Dataset to input events that contributed to their values or their existence (see Provenance).
TODO: The design of this operation is in progress.
As described previously, to guarantee the reproducibility and verifiability of the results a transformation must be associated with an exact version of an Engine that is used to perform it. We want to exclude any possibility that the code changes in the Engine will break this guarantee.
Whenever the Coordinator uses an Engine to execute a query it must specify the full digest of the OCI container image in the resulting Metadata Block. Engine maintainers are therefore responsible for treating the images as immutable and ensure old versions are never lost.
The main functions of the Coordinator are:
- Maintain the invariants and the semantics of the system
- Guarantee the validity and integrity of the Metadata
- Provide means for ingesting external data into the system
- Interface with the Engines to implement data transformations
- Enable data sharing functionality via Repositories
- Provide additional functionality to simplify working with Datasets and their metadata
Common Metadata Operations
This section describes the operations performed by the Coordinator as part of the usual Metadata Chain maintenance.
The procedure for calculating the stable Hash of a Data Slice is:
- Drop the
- Sort all rows based on the
- Convert rows to the canonical string representation
- Calculate SHA3-256 digest of the file
TODO: This is a stop-gap implementation. Release version of the hashing function should have a streaming nature while also be tolerant of row reordering, as many processing engine are concurrent and don’t enforce ordering between outputs of independent calculations.
Metadata Block Hashing
MetadataBlocks are cryptographically secured through the following procedure:
- The block is serialized into FlatBuffers following a two-step process to ensure that all variable-size buffers are layed out in memory in a consistent order:
- First, we iterate over all fields of the block in the same order they appear in the schemas serializing into buffers all vector-like and variable-size fields and recursing into nested data structures (tables) in the depth-first order.
- Second, we iterate over all fields again this time serializing all leftover fixed-size fields
- Since the
blockHashfield appears first in the MetadataBlock schema it will be the very first buffer to be added into the
flatbuffer. Knowing that
flatbuffersare serialized in the back-to-front order and that
Sha3-256data always takes up 32 bytes - we can easily exclude the
blockHashfield contents when computing the hash.
- The resulting
flatbuffer(excluding the last 32 bytes) is fed into SHA3-256 digest algorithm.
- The resulting digest can now be directly copied into the last 32 bytes of the
flatbufferto secure the block, or compared to the hash that is already stored there to validate it.
It should be made clear that it’s not the goal of this document to standardize the data ingestion techniques. Information here is given only to illustrate how new data can be continuously added into the system in alignment with the properties we want to see as a result.
The process of ingestion itself plays a very limited role in the system. When interacting with external data we cannot make any assumptions about the guarantees that the external source provides - we cannot rely on the external data being immutable, on being highly-available, we can’t even be sure the domain that hosts it will still be there the next day. So the ingestion step is all about getting the data into the system as fast as possible, where all its properties can be preserved.
The reason we include the ingest configuration in this document at all is that we see it as an important part of the data Provenance.
Pull vs. Push
Although we aspire to reach a state where all authoritative data publishers push new events into the system as soon as those occur, we realize that this level of integration is hard to achieve. We believe that for a long time the vast majority of data will be ingested via the pull model, where the system periodically scans a remote data source and ingests the latest updates.
Ingestion is composed of the following phases:
- Fetch phase - Obtains the data from some external source (e.g. HTTP/FTP) in its original raw form.
- Prepare phase (optional) - Transforms the raw data into one of the supported formats. This can include extracting an archive, filtering and rearranging files, using external tools to convert between formats.
- Read phase - Reads the data into a structured form.
- Preprocess phase (optional) - Shapes the data into the presentable form. This can include renaming columns, type conversions, nesting, etc.
- Merge phase - Combines the new data from the source with the history of previously seen data.
Merge Strategies deserve special attention as they bridge the gap between the wide variety of data in the external world with the strict event-based world of our system.
Append Merge Strategy
Under this strategy, the new data will be appended to the Dataset in its original form without any modifications.
Ledger Merge Strategy
This strategy should be used for data sources containing append-only event streams. New data exports can have new rows added, but once data already made it into one export it should never change or disappear. A user-specified primary key is used to identify which events were already seen, not to duplicate them.
Snapshot Merge Strategy
This strategy can be used for data exports that are taken periodically and contain only the latest state snapshot of the observed entity or system. Over time such exports can have new rows added, and old rows either removed or modified.
This strategy transforms snapshot data into an append-only event stream by performing the change data capture. It relies on a user-specified primary key to correlate the rows between the two snapshots. A new event is added into the output stream whenever:
- A row with a certain primary key appears for the first time
- A row with a certain key disappears from the snapshot
- A row data associated with a certain key has changed
Each event will have an additional column that signifies the kind of observation that was encountered.
Snapshot strategy also requires special treatment in regards to the Event Time. Since snapshot-style data exports represent the state of some system at a certain time - it is important to know what that time was. This time is usually captured in some form of metadata (e.g. in the name of the snapshot file, in the URL, or the HTTP caching headers. It should be possible to extract and propagate this time into a data column.
Previously we looked at how an Engine executes the transformation query. Here we will look at what happens on the Coordinator side.
Here are the steps that the Coordinator performs during the transformation:
- Batch step - Analyze the Metadata Chains of the Dataset being transformed and all of the inputs. The goal here is to decide how far the processing can progress before hitting one of the special conditions, such as a change of schema in one of the inputs or a change of the transformation query.
- Run migrations (when needed) - If a special condition is encountered - call the Engine’s Migrate Query operation to make necessary adjustments for the new transformation parameters.
- Run query - Pass the input Data Slices into the Engine’s Execute Query operation
- Hash resulting data - Obtain a stable Hash of the output Data Slice (see Data Hashing)
- Prepare commit - Creates the next Metadata Block describing the output data
- Commit - Atomically adds the new Data Slice and the Metadata Block to the Dataset
Most parts of the Dataset can change over time. To protect downstream consumers of data from braking changes the Coordinator has to validate the changes for backward compatibility before accepting them. This section covers the allowed transitions for different parts of the Metadata.
Dataset Schema may change when:
- A different Ingestion process is specified in the Root Dataset
- A transformation Query of the Derivative Dataset is changed
- A Schema of one of the inputs of the Query changes
In general, Schema changes are restricted to only adding new columns. Removing and renaming columns or changing data types would break downstream data consumers and therefore not allowed. A migration procedure should be used instead when it’s desired to introduce breaking changes in the Schema.
The procedure for modifying the Query of a Derivative Dataset is as following:
- The Coordinator invokes the Engine’s Validate Query operation to validate the new Query and determine the validity of the transition according to the engine-specific rules.
- The new Schema returned in the previous step is validated according to the Schema Evolution rules.
- A new Metadata Block is created and committed.
TODO: Specify how Schema changes in the upstream datasets should be handled
- Do we stop processing and ask user to validate that the query takes into account the new columns?
Dataset sharing involves uploading the data to some Repository where it can be discovered and accessed by other peers. Due to immutable nature of data and metadata it is very easy to keep shared data up-to-date as only new blocks and part files need to be uploaded.
It is important to ensure the atomicity of the sharing, or at least create the perception of atomicity to someone who happens to be downloading the data concurrently with the upload.
For Repositories that do not support atomic file/object operations the following sequence of operations can ensure that concurrent downloads will always see the Dataset in a consistent state:
- Upload data part files
- Upload metadata blocks
- Update references in
The Derivative Dataset validation confirms that the event data is in fact produced by applying all the transformations stored in the Metadata to the inputs and was not maliciously altered.
The process of validation is almost identical to the Derivative Transformation except instead of the
Commit phase the Coordinator compares the output data hash of the local transform to the hash stored in the Metadata.
Pinning the exact Engine version used by every transformation guarantees the reproducibility and verifiability of the results (see Components of Trust). In the long run, however, this creates a problem where in order to validate a dataset the Coordinator will have to use a very large number of different versions of an Engine that might’ve accumulated over the years. This slows down the validation procedure and can result in a significant disk space required for storing these images.
To mitigate this problem the Coordinator offers the engine deprecation operation that works as follows:
- The Coordinator reconstructs the dataset from scratch by repeating all transformation in the Metadata Chain.
- Instead of the Engine version specified in the block, it uses the latest (or provided) version.
- If the results produced in every step match the recorded ones - we can safely consider that the newer version of the Engine is fully compatible with the older one.
- A special Metadata record is created to record this compatibility, therefore, allowing all users to use the latest version of the Engine instead of downloading all the versions seen previously.
- Supported protocols
- Querying data in advanced repository
- Portal Dataset
- Exposes any dataset as root without disclosing details