RFC-014: Minimizing scanning for last offset and block
Start Date: 2023-12-17
Authors:
Compatibility:
- Backwards-compatible
- Forwards-compatible
Summary
Removes SetWatermark
and modifies AddData
and ExecuteQuery
events so that they always carried enough information about the output offset and offsets of the inputs to prepare the next transaction without deep scanning of the metadata.
Motivation
Currently, AddData
and ExecuteQuery
events carry [start, end]
intervals for blocks and offsets to describe inputs and outputs of a transaction. To represent empty input/output they are made optional.
This is problematic, because to understand what offsets or blocks need to be used for inputs / output of the next transaction it’s sometimes required to scan metadata chain until a non-empty interval is encountered.
Extending metadata to carry this information regardless of whether any input data was used or any output data was written would allow us to prepare the next transaction using the last AddData
or ExecuteQuery
block, avoiding deep scanning of the chain.
Guide-level explanation
Imagine a derivative dataset that is aggregating data from an IoT source:
- The source writes 1M batches over the year, resulting in ~1M
ExecuteQuery
events - The dataset is configured to aggregate data per month, thus only ~12 of
ExecuteQuery
events will have data in them - Because when transaction does not produce data we don’t write any offset information, to find out the last offset to use for the next transaction the system needs to routinely traverse thousands of empty blocks to find a non-empty one.
Similar problem exists in multi-input derivative datasets where one input updates significantly more frequently than the other, making us scan many ExecuteQuery
blocks to understand the last input offset interval that was already processed.
The proposed change avoids this by always carrying enough information in the events to understand which offsets and blocks were already processed.
It is also proposed to remove the SetWatermark
event, treating the advancement of the watermark the same as in the case when AddData
/ ExecuteQuery
blocks don’t contain any state change except for the watermark, reducing the complexity and interplay of events that implementations have to deal with.
Reference-level explanation
The AddData
and ExecuteQuery
events will be extended with the new prevOffset
property to represent last offset of the previous data slice. It must be equal to the last non-empty outputData.offsetInterval.end
.
For example, if the first AddData
event of a Root dataset looks like this (using null
to represent the missing properties explicitly):
prevCheckpoint: null
prevOffset: null
newData:
logicalHash: <hash>
physicalHash: <hash>
offsetInterval:
start: 0
end: 9
size: 100
newCheckpoint: null
newWatermark: "2023-12-31T00:00:00"
newSourceState: null
A correctly chained AddData
event with data will look like so:
prevCheckpoint: null
prevOffset: 9
newData:
logicalHash: <hash>
physicalHash: <hash>
offsetInterval:
start: 10
end: 19
size: 100
newCheckpoint: null
newWatermark: "2023-12-31T01:00:00"
newSourceState: null
While the next chained AddData
that only advances the watermark will look like so:
prevCheckpoint: null
prevOffset: 19
newData: null
newCheckpoint: null
newWatermark: "2023-12-31T03:00:00"
newSourceState: null
The SetWatermark
event, while advancing the watermark, does not carry over any other things like previous offset, checkpoint, or source state, which unnecessarily complicates the processing. We, therefore, propose removing this event in favor of “no-data” variant of AddData
and ExecuteQuery
events, as shown in the example above.
Similarly, the InputSlice
will no longer use closed [start, end]
intervals for blockInterval
and offsetInterval
. The new schema will use what are essentially half-open intervals where starting point will always be carried across all transaction, even if the interval itself is empty.
Proposed InputSlice
schema:
{
"description": "Describes a slice of the input dataset used during a transformation",
"type": "object",
"additionalProperties": false,
"required": [
"datasetId"
],
"properties": {
"datasetId": {
"type": "string",
"format": "dataset-id",
"description": "Input dataset identifier."
},
"prevBlockHash": {
"type": "string",
"format": "multihash",
"description": "Last block of the input dataset that was previously incorporated into the derivative transformation, if any. Must be equal to the last non-empty `newBlockHash`. Together with `newBlockHash` defines a half-open `(prevBlockHash, newBlockHash]` interval of blocks that will be considered in this transaction."
},
"newBlockHash": {
"type": "string",
"format": "multihash",
"description": "Hash of the last block that will be incorporated into the derivative transformation. When present, defines a half-open `(prevBlockHash, newBlockHash]` interval of blocks that will be considered in this transaction."
},
"prevOffset": {
"type": "integer",
"format": "uint64",
"description": "Last data record offset in the input dataset that was previously incorporated into the derivative transformation, if any. Must be equal to the last non-empty `newOffset`. Together with `newOffset` defines a half-open `(prevOffset, newOffset]` interval of data records that will be considered in this transaction."
},
"newOffset": {
"type": "integer",
"format": "uint64",
"description": "Offset of the last data record that will be incorporated into the derivative transformation, if any. When present, defines a half-open `(prevOffset, newOffset]` interval of data records that will be considered in this transaction."
}
}
}
Compatibility
This change will be executed as part of the backwards compatibility breaking changes.
Drawbacks
- Half-open intervals are slightly harder to understand
- Will require stricter validation in implementations to catch propagation errors
- Replacing
SetWatermark
with “no-data” version ofAddData
event makes the event slightly bigger, but with an added bonus of minimizing the chain scanning and simplifying resuming the processing
Alternatives
- We could rely on “skip list” data structure to find blocks with non-empty intervals
- While this may work for output data, it would bring too much complexity in case of input datasets
Prior art
N/A
Unresolved questions
N/A
Future possibilities
We are still considering to implement the “skip list” data structure, but primarily to allow skipping all “routine” data processing events like AddData
and ExecuteQuery
(high-volume) and let us quickly iterate over the “out-of-ordinary” (low-volume) metadata events.