Issue Spec PR

Start Date: 2023-10-25

Authors:

Compatibility:

  • Backwards-compatible
  • Forwards-compatible

Summary

Introduces a set of new metadata events (AddPushSource, DisablePushSource) for defining “push”-style data ingestion sources for root datasets.

Motivation

Using SetPollingSource event users of ODF can already define “poll”-style data ingestion sources that describe how to read and historize external data to bring it into a root dataset.

We would also like to support “push”-style sources where an external agent periodically adds data into a root dataset. Such sources can range from IoT devices to scripts and all kinds of business process automation.

Similarly to polling sources, push sources also will require defining:

  • What format the data is expected to arrive in
  • Preprocessing queries (e.g. if the data format of the source like IoT device cannot be changed upstream)
  • Merge strategy

Guide-level explanation

Users can define push sources on root datasets using AddPushSource, DisablePushSource events.

Multiple push sources can be active simultaneously per one dataset for cases where multiple actors are writing to the same dataset simultaneously in slightly varying formats.

Push and polling sources are mutually exclusive. However, it must be possible to switch dataset from “push” to “pull” ingest and vice versa - thus we also introduce DisablePollingSource event that allows to turn off the polling source before switching to push model.

The state of push sources can be stored in existing sourceState section of AddData event.

The implementations will verify that all push sources result in the same final data schema, as captured in the SetDataSchema event.

Reference-level explanation

AddPushSource event schema:

{
  "$id": "http://open-data-fabric.github.com/schemas/AddPushSource",
  "$schema": "http://json-schema.org/draft-07/schema#",
  "description": "Describes how to ingest data into a root dataset from a certain logical source.",
  "type": "object",
  "additionalProperties": false,
  "required": [
    "source",
    "read",
    "merge"
  ],
  "properties": {
    "source": {
      "type": "string",
      "description": "Name that identifies this source within this dataset."
    },
    "read": {
      "$ref": "/schemas/ReadStep",
      "description": "Defines how data is read into structured format."
    },
    "preprocess": {
      "$ref": "/schemas/Transform",
      "description": "Pre-processing query that shapes the data."
    },
    "merge": {
      "$ref": "/schemas/MergeStrategy",
      "description": "Determines how newly-ingested data should be merged with existing history."
    }
  }
}

DisablePushSource event schema:

{
  "$id": "http://open-data-fabric.github.com/schemas/DisablePushSource",
  "$schema": "http://json-schema.org/draft-07/schema#",
  "description": "Disables the previously defined source.",
  "type": "object",
  "additionalProperties": false,
  "required": [
    "source"
  ],
  "properties": {
    "source": {
      "type": "string",
      "description": "Identifier of the source to be disabled."
    }
  }
}

DisablePollingSource event schema:

{
  "$id": "http://open-data-fabric.github.com/schemas/DisablePollingSource",
  "$schema": "http://json-schema.org/draft-07/schema#",
  "description": "Disables the previously defined polling source.",
  "type": "object",
  "additionalProperties": false,
  "required": [],
  "properties": {}
}

The source identifier is the same one as used in SourceState::source, meaning that AddData::sourceState can be used by the implementations to store the state of the push source for resuming the consumption and implementing “exactly-once” semantics.

Compatibility

This change is backwards-compatible.

This change is forwards-incompatible as older implementation will fail to parse the new core events (we still lack the mechanism to mark some events as “OK to disregard”).

Drawbacks

  • More complexity in metadata

Alternatives

N/A

Prior art

N/A

Unresolved questions

  • Non-trivial ingestion merge strategies (snapshot, ledger) currently require access to past stream data to perform CDC and deduplication. We were considering to avoid the need to read past data eventually by storing all necessary information in the checkpoints. However, allowing for multiple push sources would mean that we need separate checkpoints per source as sources can have different merge strategies. Given that it’s still not clear if storing state in checkpoints for things like CDC is even practical for large datasets - we decided to not let this block this RFC, as the need for several push sources per dataset does seem like a practical necessity.

Future possibilities

N/A