

An object that wraps the metadata resources providing versioning and type identification. All root-level resources are wrapped with a manifest when serialized to disk.

kindstring✔️multicodecType of the resource.
versioninteger✔️Major version number of the resource contained in this manifest. It provides the mechanism for introducing compatibility breaking changes.
contentstring✔️flatbuffersResource data.

Represents a projection of the dataset metadata at a single point in time. This type is typically used for defining new datasets and changing the existing ones.

namestring✔️dataset-aliasAlias of the dataset.
kindDatasetKind✔️Type of the dataset.
metadataarray(MetadataEvent)✔️An array of metadata events that will be used to populate the chain. Here you can define polling and push sources, set licenses, add attachments etc.

An individual block in the metadata chain that captures the history of modifications of a dataset.

systemTimestring✔️date-timeSystem time when this block was written.
prevBlockHashstringmultihashHash sum of the preceding block.
sequenceNumberinteger✔️uint64Block sequence number, starting from zero at the seed block.
eventMetadataEvent✔️Event data.

Metadata Events


Represents a transaction that occurred on a dataset.

Union TypeDescription
AddDataIndicates that data has been ingested into a root dataset.
ExecuteTransformIndicates that derivative transformation has been performed.
SeedEstablishes the identity of the dataset. Always the first metadata event in the chain.
SetPollingSourceContains information on how externally-hosted data can be ingested into the root dataset.
SetTransformDefines a transformation that produces data in a derivative dataset.
SetVocabLets you manipulate names of the system columns to avoid conflicts.
SetAttachmentsAssociates a set of files with this dataset.
SetInfoProvides basic human-readable information about a dataset.
SetLicenseDefines a license that applies to this dataset.
SetDataSchemaSpecifies the complete schema of Data Slices added to the Dataset following this event.
AddPushSourceDescribes how to ingest data into a root dataset from a certain logical source.
DisablePushSourceDisables the previously defined source.
DisablePollingSourceDisables the previously defined polling source.

Indicates that data has been ingested into a root dataset.

prevCheckpointstringmultihashHash of the checkpoint file used to restore ingestion state, if any.
prevOffsetintegeruint64Last offset of the previous data slice, if any. Must be equal to the last non-empty newData.offsetInterval.end.
newDataDataSliceDescribes output data written during this transaction, if any.
newCheckpointCheckpointDescribes checkpoint written during this transaction, if any. If an engine operation resulted in no updates to the checkpoint, but checkpoint is still relevant for subsequent runs - a hash of the previous checkpoint should be specified.
newWatermarkstringdate-timeLast watermark of the output data stream, if any. Initial blocks may not have watermarks, but once watermark is set - all subsequent blocks should either carry the same watermark or specify a new (greater) one. Thus, watermarks are monotonically non-decreasing.
newSourceStateSourceStateThe state of the source the data was added from to allow fast resuming. If the state did not change but is still relevant for subsequent runs it should be carried, i.e. only the last state per source is considered when resuming.

Describes how to ingest data into a root dataset from a certain logical source.

sourceNamestring✔️Identifies the source within this dataset.
readReadStep✔️Defines how data is read into structured format.
preprocessTransformPre-processing query that shapes the data.
mergeMergeStrategy✔️Determines how newly-ingested data should be merged with existing history.

Disables the previously defined polling source.


Disables the previously defined source.

sourceNamestring✔️Identifies the source to be disabled.

Indicates that derivative transformation has been performed.

queryInputsarray(ExecuteTransformInput)✔️Defines inputs used in this transaction. Slices corresponding to every input dataset must be present.
prevCheckpointstringmultihashHash of the checkpoint file used to restore transformation state, if any.
prevOffsetintegeruint64Last offset of the previous data slice, if any. Must be equal to the last non-empty newData.offsetInterval.end.
newDataDataSliceDescribes output data written during this transaction, if any.
newCheckpointCheckpointDescribes checkpoint written during this transaction, if any. If an engine operation resulted in no updates to the checkpoint, but checkpoint is still relevant for subsequent runs - a hash of the previous checkpoint should be specified.
newWatermarkstringdate-timeLast watermark of the output data stream, if any. Initial blocks may not have watermarks, but once watermark is set - all subsequent blocks should either carry the same watermark or specify a new (greater) one. Thus, watermarks are monotonically non-decreasing.

Establishes the identity of the dataset. Always the first metadata event in the chain.

datasetIdstring✔️dataset-idUnique identity of the dataset.
datasetKindDatasetKind✔️Type of the dataset.

Associates a set of files with this dataset.

attachmentsAttachments✔️One of the supported attachment sources.

Specifies the complete schema of Data Slices added to the Dataset following this event.

schemastring✔️flatbuffersApache Arrow schema encoded in its native flatbuffers representation.

Provides basic human-readable information about a dataset.

descriptionstringBrief single-sentence summary of a dataset.
keywordsarray(string)Keywords, search terms, or tags used to describe the dataset.

Defines a license that applies to this dataset.

shortNamestring✔️Abbreviated name of the license.
namestring✔️Full name of the license.
spdxIdstringLicense identifier from the SPDX License List.

Contains information on how externally-hosted data can be ingested into the root dataset.

fetchFetchStep✔️Determines where data is sourced from.
preparearray(PrepStep)Defines how raw data is prepared before reading.
readReadStep✔️Defines how data is read into structured format.
preprocessTransformPre-processing query that shapes the data.
mergeMergeStrategy✔️Determines how newly-ingested data should be merged with existing history.

Defines a transformation that produces data in a derivative dataset.

inputsarray(TransformInput)✔️Datasets that will be used as sources.
transformTransform✔️Transformation that will be applied to produce new data.

Lets you manipulate names of the system columns to avoid conflicts.

offsetColumnstringName of the offset column.
operationTypeColumnstringName of the operation type column.
systemTimeColumnstringName of the system time column.
eventTimeColumnstringName of the event time column.

Engine Protocol


Sent by the coordinator to an engine to perform query on raw input data, usually as part of ingest preprocessing step

inputDataPathsarray(string)✔️Paths to input data files to perform query over. Must all have identical schema.
transformTransform✔️Transformation that will be applied to produce new data.
outputDataPathstring✔️pathPath where query result will be written.

Sent by an engine to coordinator when performing the raw query operation

Union TypeDescription

numRecordsinteger✔️uint64Number of records produced by the query

messagestring✔️Explanation of an error

messagestring✔️Brief description of an error
backtracestringDetails of an error (e.g. a backtrace)

Sent by the coordinator to an engine to perform the next step of data transformation

datasetIdstring✔️dataset-idUnique identifier of the output dataset.
datasetAliasstring✔️dataset-aliasAlias of the output dataset, for logging purposes only.
systemTimestring✔️date-timeSystem time to use for new records.
transformTransform✔️Transformation that will be applied to produce new data.
queryInputsarray(TransformRequestInput)✔️Defines inputs used in this transaction. Slices corresponding to every input dataset must be present.
nextOffsetinteger✔️uint64Starting offset to use for new data records.
prevCheckpointPathstringpathTODO: This will be removed when coordinator will be speaking to engines purely through Arrow.
newCheckpointPathstring✔️pathTODO: This will be removed when coordinator will be speaking to engines purely through Arrow.
newDataPathstring✔️pathTODO: This will be removed when coordinator will be speaking to engines purely through Arrow.

Sent as part of the engine transform request operation to describe the input

datasetIdstring✔️dataset-idUnique identifier of the dataset.
datasetAliasstring✔️dataset-aliasAlias of the output dataset, for logging purposes only.
queryAliasstring✔️An alias of this input to be used in queries.
offsetIntervalOffsetIntervalSubset of data that goes into this transaction.
dataPathsarray(string)✔️TODO: This will be removed when coordinator will be slicing data for the engine.
schemaFilestring✔️pathTODO: replace with actual DDL or Parquet schema.

Sent by an engine to coordinator when performing the data transformation

Union TypeDescription

newOffsetIntervalOffsetIntervalData slice produced by the transaction, if any.
newWatermarkstringdate-timeWatermark advanced by the transaction, if any.

messagestring✔️Explanation of an error

messagestring✔️Brief description of an error
backtracestringDetails of an error (e.g. a backtrace)

Embedded attachment item.

pathstring✔️Path to an attachment if it was materialized into a file.
contentstring✔️Content of the attachment.

Defines the source of attachment files.

Union TypeDescription
Attachments::EmbeddedFor attachments that are specified inline and are embedded in the metadata.

For attachments that are specified inline and are embedded in the metadata.


Describes a checkpoint produced by an engine

physicalHashstring✔️multihashHash sum of the checkpoint file.
sizeinteger✔️uint64Size of checkpoint file in bytes.

Describes a slice of data added to a dataset or produced via transformation

logicalHashstring✔️multihashLogical hash sum of the data in this slice.
physicalHashstring✔️multihashHash sum of the data part file.
offsetIntervalOffsetInterval✔️Data slice produced by the transaction.
sizeinteger✔️uint64Size of data file in bytes.

Represents type of the dataset.

Enum Value

Specifies the mapping of system columns onto dataset schema.

offsetColumnstring✔️Name of the offset column.
operationTypeColumnstring✔️Name of the operation type column.
systemTimeColumnstring✔️Name of the system time column.
eventTimeColumnstring✔️Name of the event time column.

Defines an environment variable passed into some job.

namestring✔️Name of the variable.
valuestringValue of the variable.

Defines the external source of data.

Union TypeDescription
EventTimeSource::FromMetadataExtracts event time from the source’s metadata.
EventTimeSource::FromPathExtracts event time from the path component of the source.
EventTimeSource::FromSystemTimeAssigns event time from the system time source.

Extracts event time from the source’s metadata.


Assigns event time from the system time source.


Extracts event time from the path component of the source.

patternstring✔️regexRegular expression where first group contains the timestamp string.
timestampFormatstringFormat of the expected timestamp in java.text.SimpleDateFormat form.

Describes a slice of the input dataset used during a transformation

datasetIdstring✔️dataset-idInput dataset identifier.
prevBlockHashstringmultihashLast block of the input dataset that was previously incorporated into the derivative transformation, if any. Must be equal to the last non-empty newBlockHash. Together with newBlockHash defines a half-open (prevBlockHash, newBlockHash] interval of blocks that will be considered in this transaction.
newBlockHashstringmultihashHash of the last block that will be incorporated into the derivative transformation. When present, defines a half-open (prevBlockHash, newBlockHash] interval of blocks that will be considered in this transaction.
prevOffsetintegeruint64Last data record offset in the input dataset that was previously incorporated into the derivative transformation, if any. Must be equal to the last non-empty newOffset. Together with newOffset defines a half-open (prevOffset, newOffset] interval of data records that will be considered in this transaction.
newOffsetintegeruint64Offset of the last data record that will be incorporated into the derivative transformation, if any. When present, defines a half-open (prevOffset, newOffset] interval of data records that will be considered in this transaction.

Defines the external source of data.

Union TypeDescription
FetchStep::UrlPulls data from one of the supported sources by its URL.
FetchStep::FilesGlobUses glob operator to match files on the local file system.
FetchStep::ContainerRuns the specified OCI container to fetch data from an arbitrary source.
FetchStep::MqttConnects to an MQTT broker to fetch events from the specified topic.
FetchStep::EthereumLogsConnects to an Ethereum node to stream transaction logs.

Pulls data from one of the supported sources by its URL.

urlstring✔️urlURL of the data source
eventTimeEventTimeSourceDescribes how event time is extracted from the source metadata.
cacheSourceCachingDescribes the caching settings used for this source.
headersarray(RequestHeader)Headers to pass during the request (e.g. HTTP Authorization)

Uses glob operator to match files on the local file system.

pathstring✔️Path with a glob pattern.
eventTimeEventTimeSourceDescribes how event time is extracted from the source metadata.
cacheSourceCachingDescribes the caching settings used for this source.
orderstringSpecifies how input files should be ordered before ingestion.
Order is important as every file will be processed individually
and will advance the dataset’s watermark.

Runs the specified OCI container to fetch data from an arbitrary source.

imagestring✔️Image name and and an optional tag.
commandarray(string)Specifies the entrypoint. Not executed within a shell. The default OCI image’s ENTRYPOINT is used if this is not provided.
argsarray(string)Arguments to the entrypoint. The OCI image’s CMD is used if this is not provided.
envarray(EnvVar)Environment variables to propagate into or set in the container.

Connects to an MQTT broker to fetch events from the specified topic.

hoststring✔️Hostname of the MQTT broker.
portinteger✔️Port of the MQTT broker.
usernamestringUsername to use for auth with the broker.
passwordstringPassword to use for auth with the broker (can be templated).
topicsarray(MqttTopicSubscription)✔️List of topic subscription parameters.

Connects to an Ethereum node to stream transaction logs.

chainIdintegeruint64Identifier of the chain to scan logs from. This parameter may be used for RPC endpoint lookup as well as asserting that provided nodeUrl corresponds to the expected chain.
nodeUrlstringurlUrl of the node.
filterstringAn SQL WHERE clause that can be used to pre-filter the logs before fetching them from the ETH node.
signaturestringSolidity log event signature to use for decoding. Using this field adds event to the output containing decoded log as JSON.

Merge strategy determines how newly ingested data should be combined with the data that already exists in the dataset.

Union TypeDescription
MergeStrategy::AppendAppend merge strategy.
MergeStrategy::LedgerLedger merge strategy.
MergeStrategy::SnapshotSnapshot merge strategy.

Append merge strategy.

Under this strategy new data will be appended to the dataset in its entirety, without any deduplication.


Ledger merge strategy.

This strategy should be used for data sources containing ledgers of events. Currently this strategy will only perform deduplication of events using user-specified primary key columns. This means that the source data can contain partially overlapping set of records and only those records that were not previously seen will be appended.

primaryKeyarray(string)✔️Names of the columns that uniquely identify the record throughout its lifetime

Snapshot merge strategy.

This strategy can be used for data state snapshots that are taken periodically and contain only the latest state of the observed entity or system. Over time such snapshots can have new rows added, and old rows either removed or modified.

This strategy transforms snapshot data into an append-only event stream where data already added is immutable. It does so by performing Change Data Capture - essentially diffing the current state of data against the reconstructed previous state and recording differences as retractions or corrections. The Operation Type “op” column will contain:

  • append (+A) when a row appears for the first time
  • retraction (-D) when row disappears
  • correction (-C, +C) when row data has changed, with -C event carrying the old value of the row and +C carrying the new value.

To correctly associate rows between old and new snapshots this strategy relies on user-specified primary key columns.

To identify whether a row has changed this strategy will compare all other columns one by one. If the data contains a column that is guaranteed to change whenever any of the data columns changes (for example a last modification timestamp, an incremental version, or a data hash), then it can be specified in compareColumns property to speed up the detection of modified rows.

primaryKeyarray(string)✔️Names of the columns that uniquely identify the record throughout its lifetime.
compareColumnsarray(string)Names of the columns to compared to determine if a row has changed between two snapshots.

MQTT quality of service class.

Enum Value

MQTT topic subscription parameters.

pathstring✔️Name of the topic (may include patterns).
qosMqttQosQuality of service class.

Describes a range of data as a closed arithmetic interval of offsets

startinteger✔️uint64Start of the closed interval [start; end].
endinteger✔️uint64End of the closed interval [start; end].

Defines the steps to prepare raw data for ingestion.

Union TypeDescription
PrepStep::DecompressPulls data from one of the supported sources by its URL.
PrepStep::PipeExecutes external command to process the data using piped input/output.

Pulls data from one of the supported sources by its URL.

formatstring✔️Name of a compression algorithm used on data.
subPathstringPath to a data file within a multi-file archive. Can contain glob patterns.

Executes external command to process the data using piped input/output.

commandarray(string)✔️Command to execute and its arguments.

Defines how raw data should be read into the structured form.

Union TypeDescription
ReadStep::CsvReader for comma-separated files.
ReadStep::GeoJsonReader for GeoJSON files. It expects one FeatureCollection object in the root and will create a record per each Feature inside it extracting the properties into individual columns and leaving the feature geometry in its own column.
ReadStep::EsriShapefileReader for ESRI Shapefile format.
ReadStep::ParquetReader for Apache Parquet format.
ReadStep::JsonReader for JSON files that contain an array of objects within them.
ReadStep::NdJsonReader for files containing multiple newline-delimited JSON objects with the same schema.
ReadStep::NdGeoJsonReader for Newline-delimited GeoJSON files. It is similar to GeoJson format but instead of FeatureCollection object in the root it expects every individual feature object to appear on its own line.

Reader for comma-separated files.

schemaarray(string)A DDL-formatted schema. Schema can be used to coerce values into more appropriate data types.
separatorstringSets a single character as a separator for each field and value.
encodingstringDecodes the CSV files by the given encoding type.
quotestringSets a single character used for escaping quoted values where the separator can be part of the value. Set an empty string to turn off quotations.
escapestringSets a single character used for escaping quotes inside an already quoted value.
headerbooleanUse the first line as names of columns.
inferSchemabooleanInfers the input schema automatically from data. It requires one extra pass over the data.
nullValuestringSets the string representation of a null value.
dateFormatstringSets the string that indicates a date format. The rfc3339 is the only required format, the other format strings are implementation-specific.
timestampFormatstringSets the string that indicates a timestamp format. The rfc3339 is the only required format, the other format strings are implementation-specific.

Reader for JSON files that contain an array of objects within them.

subPathstringPath in the form of a.b.c to a sub-element of the root JSON object that is an array or objects. If not specified it is assumed that the root element is an array.
schemaarray(string)A DDL-formatted schema. Schema can be used to coerce values into more appropriate data types.
dateFormatstringSets the string that indicates a date format. The rfc3339 is the only required format, the other format strings are implementation-specific.
encodingstringAllows to forcibly set one of standard basic or extended encodings.
timestampFormatstringSets the string that indicates a timestamp format. The rfc3339 is the only required format, the other format strings are implementation-specific.

Reader for files containing multiple newline-delimited JSON objects with the same schema.

schemaarray(string)A DDL-formatted schema. Schema can be used to coerce values into more appropriate data types.
dateFormatstringSets the string that indicates a date format. The rfc3339 is the only required format, the other format strings are implementation-specific.
encodingstringAllows to forcibly set one of standard basic or extended encodings.
timestampFormatstringSets the string that indicates a timestamp format. The rfc3339 is the only required format, the other format strings are implementation-specific.

Reader for GeoJSON files. It expects one FeatureCollection object in the root and will create a record per each Feature inside it extracting the properties into individual columns and leaving the feature geometry in its own column.

schemaarray(string)A DDL-formatted schema. Schema can be used to coerce values into more appropriate data types.

Reader for Newline-delimited GeoJSON files. It is similar to GeoJson format but instead of FeatureCollection object in the root it expects every individual feature object to appear on its own line.

schemaarray(string)A DDL-formatted schema. Schema can be used to coerce values into more appropriate data types.

Reader for ESRI Shapefile format.

schemaarray(string)A DDL-formatted schema. Schema can be used to coerce values into more appropriate data types.
subPathstringIf the ZIP archive contains multiple shapefiles use this field to specify a sub-path to the desired .shp file. Can contain glob patterns to act as a filter.

Reader for Apache Parquet format.

schemaarray(string)A DDL-formatted schema. Schema can be used to coerce values into more appropriate data types.

Defines a header (e.g. HTTP) to be passed into some request.

namestring✔️Name of the header.
valuestring✔️Value of the header.

Defines how external data should be cached.

Union TypeDescription
SourceCaching::ForeverAfter source was processed once it will never be ingested again.

After source was processed once it will never be ingested again.


The state of the source the data was added from to allow fast resuming.

sourceNamestring✔️Identifies the source that the state corresponds to.
kindstring✔️Identifies the type of the state. Standard types include: odf/etag, odf/last-modified.
valuestring✔️Opaque value representing the state.

Defines a query in a multi-step SQL transformation.

aliasstringName of the temporary view that will be created from result of the query. Step without this alias will be treated as an output of the transformation.
querystring✔️SQL query the result of which will be exposed under the alias.

Temporary Flink-specific extension for creating temporal tables from streams.

namestring✔️Name of the dataset to be converted into a temporal table.
primaryKeyarray(string)✔️Column names used as the primary key for creating a table.

Engine-specific processing queries that shape the resulting data.

Union TypeDescription
Transform::SqlTransform using one of the SQL dialects.

Transform using one of the SQL dialects.

enginestring✔️Identifier of the engine used for this transformation.
versionstringVersion of the engine to use.
querystringSQL query the result of which will be used as an output. This is a convenience property meant only for defining queries by hand. When stored in the metadata this property will never be set and instead will be converted into a single-iter queries array.
queriesarray(SqlQueryStep)Specifies multi-step SQL transformations. Each step acts as a shorthand for CREATE TEMPORARY VIEW <alias> AS (<query>). Last query in the array should have no alias and will be treated as an output.
temporalTablesarray(TemporalTable)Temporary Flink-specific extension for creating temporal tables from streams.

Describes a derivative transformation input

datasetRefstring✔️dataset-refA local or remote dataset reference. When block is accepted this MUST be in the form of a DatasetId to guarantee reproducibility, as aliases can change over time.
aliasstringAn alias under which this input will be available in queries. Will be populated from datasetRef if not provided before resolving it to DatasetId.

Represents a watermark in the event stream.


