Polling sources are used for cases when external data is stored somewhere in bulk and we want to periodically synchronize its state with an ODF dataset.

Polling sources are suitable for ingesting data from:

  • Periodic database dumps
  • Data published as a set of files on the web
  • Bulk data access APIs
  • External systems using custom connector libraries.

Source Metadata

Polling sources are defined via SetPollingSource metadata event:

---
kind: DatasetSnapshot
version: 1
content:
  name: com.example.city-populations
  kind: Root
  metadata:
    - kind: SetPollingSource
      # Where to fetch the data from.
      # Supports multiple protocols and file layouts
      fetch:
        kind: Url
        url: https://example.com/city_populations_over_time.zip
      # OPTIONAL: How to prepare the binary data
      # Includes decompression, file filtering, format conversions
      prepare:
        - kind: Decompress
          format: Zip
      # How to interpret the data.
      # Includes data format, schema to apply, error handling
      read:
        kind: Csv
        header: true
        schema:
          - "date DATE"
          - "city STRING"
          - "population STRING"
      # OPTIONAL: Pre-processing query that shapes the data.
      # Useful for converting text data into strict types
      preprocess:
        kind: Sql
        # Use one of the supported engines and a query in its dialect
        # See: https://docs.kamu.dev/cli/transform/supported-engines/
        engine: datafusion
        query: |
          select
            date,
            city,
            -- remove commas between thousands
            cast(replace(population, ",", "") as bigint)
          from input          
      # How to combine data ingested in the past with the new data.
      merge:
        kind: Ledger
        primaryKey:
          - date
          - city
      # Lets you manipulate names of the system columns to avoid conflicts
      # or use names better suited for your data.
    - kind: SetVocab
      eventTimeColumn: date

The structure of the SetPollingSource event directly reflects all the ingestion phases:

  • fetch - specifies how to download the data from some external source (e.g. HTTP/FTP) and how to cache it efficiently
  • prepare (optional) - specifies how to prepare raw binary data for reading (e.g. extracting an archive or converting between formats)
  • read - specifies how to read the data into structured form (e.g. as CSV or Parquet)
  • preprocess (optional) - allows to shape the structured data with queries (e.g. to parse and convert types into best suited form wit SQL)
  • merge - specifies how to combine the read data with the history of previously seen data (this step is extremely important as it performs “ledgerization” / “historization” of the evolving state of data - see Merge Strategies section for explanation).

A polling source can later be deactivated using DisablePollingSource event.

For multiple example of handling tricky data sources see input formats section.

Polling Data via CLI

To poll data into dataset via kamu use the general-purpose kamu pull command.

kamu pull com.example.city-populations

Polling Data via API

See Kamu Node protocols documentation for various options of polling data programmatically via APIs.

Event Time

The perfect scenario for kamu is when data records contain event time within them as a column, but many data sources on the web are not like that.

If event time is not present in data - kamu will try to infer it. This can be:

  • Modification time for files on local or remote file systems
  • Last-Modified time for HTTP resources.

If no time can be extracted from the source - it will fall back to using system time.

Usually its best to be explicit about where your event time is coming by defining EventTimeSource in the fetch section of SetPollingSource . It’s pretty flexible, allowing you to even extract time from timestamps that are part of file names:

fetch:
  kind: FilesGlob
  # Ingest all files matching the pattern (in lexicographic order)
  path: /data/database-dump-*.json.gz
  eventTime:
    kind: FromPath
    # Extract timestamp as the first regex group in the pattern
    pattern: database-dump-(\d+-\d+-\d+)\.json\.gz
    # Parse timestamp
    timestampFormat: '%Y-%m-%d'

Source Caching

kamu does its best to avoid redundant work and not ingest data if source was not updated since the lass poll.

Exact mechanism of cache control depends on the source type and the protocol used. In case of HTTP, for example, it will rely on standard HTTP caching headers like ETag and Last-Modified.

The latest caching information is stored in dataset metadata in AddData event in a special SourceState object. This means that it is possible for ingest to return no data and no new watermark, but still write a metadata block containing only the new source state.

You can control caching behavior via SourceCaching object in the fetch section of SetPollingSource .