Polling sources are used for cases when external data is stored somewhere in bulk and we want to periodically synchronize its state with an ODF dataset.
Polling sources are suitable for ingesting data from:
- Periodic database dumps
- Data published as a set of files on the web
- Bulk data access APIs
- External systems using custom connector libraries.
Polling sources are defined via SetPollingSource metadata event:
- kind: SetPollingSource
# Where to fetch the data from.
# Supports multiple protocols and file layouts
# OPTIONAL: How to prepare the binary data
# Includes decompression, file filtering, format conversions
- kind: Decompress
# How to interpret the data.
# Includes data format, schema to apply, error handling
- "date DATE"
- "city STRING"
- "population STRING"
# OPTIONAL: Pre-processing query that shapes the data.
# Useful for converting text data into strict types
# Use one of the supported engines and a query in its dialect
# See: https://docs.kamu.dev/cli/transform/supported-engines/
-- remove commas between thousands
cast(replace(population, ",", "") as bigint)
# How to combine data ingested in the past with the new data.
# Lets you manipulate names of the system columns to avoid conflicts
# or use names better suited for your data.
- kind: SetVocab
The structure of the SetPollingSource event directly reflects all the ingestion phases:
fetch- specifies how to download the data from some external source (e.g. HTTP/FTP) and how to cache it efficiently
prepare(optional) - specifies how to prepare raw binary data for reading (e.g. extracting an archive or converting between formats)
read- specifies how to read the data into structured form (e.g. as CSV or Parquet)
preprocess(optional) - allows to shape the structured data with queries (e.g. to parse and convert types into best suited form wit SQL)
merge- specifies how to combine the read data with the history of previously seen data (this step is extremely important as it performs “ledgerization” / “historization” of the evolving state of data - see Merge Strategies section for explanation).
A polling source can later be deactivated using DisablePollingSource event.
For multiple example of handling tricky data sources see input formats section.
Polling Data via CLI
To poll data into dataset via
kamu use the general-purpose
kamu pull command.
kamu pull com.example.city-populations
Polling Data via API
See Kamu Node protocols documentation for various options of polling data programmatically via APIs.
The perfect scenario for
kamu is when data records contain event time within them as a column, but many data sources on the web are not like that.
If event time is not present in data -
kamu will try to infer it. This can be:
- Modification time for files on local or remote file systems
Last-Modifiedtime for HTTP resources.
If no time can be extracted from the source - it will fall back to using system time.
Usually its best to be explicit about where your event time is coming by defining EventTimeSource
fetch section of SetPollingSource
. It’s pretty flexible, allowing you to even extract time from timestamps that are part of file names:
# Ingest all files matching the pattern (in lexicographic order)
# Extract timestamp as the first regex group in the pattern
# Parse timestamp
kamu does its best to avoid redundant work and not ingest data if source was not updated since the lass poll.
Exact mechanism of cache control depends on the source type
and the protocol used. In case of HTTP, for example, it will rely on standard HTTP caching headers like
The latest caching information is stored in dataset metadata in AddData event in a special SourceState object. This means that it is possible for ingest to return no data and no new watermark, but still write a metadata block containing only the new source state.