Input Formats
kamu supports multiple data sources
and data formats
that in combination with custom preparation steps
can be used ingest all kinds of data.
Note that kamu is not made to be or replace data workflow tools like Apache Airflow or Apache Ni-Fi, or data extraction tool like Debezium. Utilities described below are here only to simplify the initial data ingestion step - the very first step in the data’s journey through a web of structured stream processing pipelines.
CSV Variants
You can customize a lot of formatting options for CSV format parser.
For example, a tab-separated file can be read as:
read:
kind: Csv
separator: "\t"
quote: '"'
See: ReadStep::Csv
JSON Document
If you have a JSON document such as this:
{
"nested": {
"values": [
{"id": 1, "key": "value"},
{"id": 2, "key": "value"},
]
}
}
first consider if you can use NDJSON instead, as ordinary JSON documents cannot be split and read efficiently.
If not, use:
read:
kind: Json
subPath: nested.values
Where subPath points at the array of records withing the document.
See: ReadStep::Json
NDJSON
NDJSON, aka newline-delimited JSON file such as:
{"id": 1, "key": "value"}
{"id": 2, "key": "value"}
Can be read using:
read:
kind: NdJson
schema:
- id BIGINT
- key STRING
See: ReadStep::NdJson
GeoJSON Document
Simply use:
read:
kind: GeoJson
The reader expects one FeatureCollection object in the root and will create a record per each Feature inside it, extracting the properties into individual columns and leaving the feature geometry in its own column.
See: ReadStep::GeoJson
NDGeoJSON Document
Simply use:
read:
kind: NdGeoJson
It is similar to GeoJSON format but instead of FeatureCollection object in the root it expects every individual Feature to appear on its own line.
See: ReadStep::NdGeoJson
Esri Shapefile
GIS data in ESRI format can be read as:
read:
kind: EsriShapefile
subPath: specific_data-*.shp
# Use preprocess to optionally convert between different projections
preprocess:
kind: Sql
engine: spark
query: |
SELECT
ST_Transform(geometry, "epsg:3157", "epsg:4326") as geometry,
...
FROM input
Compressed Data & Archives
Use decompress preparation step to extract data from gzip, zip archives.
prepare:
- kind: Decompress
format: Gzip
In case of a multi-file archive you can specify which file should be extracted:
prepare:
- kind: Decompress
format: Zip
subPath: specific-file-*.csv # Note: can contain glob patterns
See: PrepStep::Decompress
Other Formats
If you have to work with formats that are not natively supported you’ll need to transcode them.
Using the pipe preparation step you can specify a custom program or a script that will get data via STDIN and output result to STDOUT.
For example here’s how transcoding a JSON document into CSV using jq may look like:
prepare:
- kind: Pipe
command:
- 'jq'
- '-r'
- '.values[] | [.id, .key] | @csv'
read:
kind: Csv
schema:
- id BIGINT
- key STRING
See: PrepStep::Pipe
Directory of Timestamped Files
The FetchStep::FilesGlob
is used in cases where directory contains a growing set of files. Files can be periodic snapshots of your database or represent batches of new data in a ledger. In either case file content should never change - once kamu processes a file it will not consider it again. It’s OK for files to disappear - kamu will remember the name of the file it ingested last and will only consider files that are higher in order than that one (lexicographically based on file name, or based on event time as shown below).
In the example below data inside the files is in snapshot format, and to complicate things it does not itself contain an event time - the event time is written into the file’s name.
Directory contents:
db-table-dump-2020-01-01.csv
db-table-dump-2020-01-02.csv
db-table-dump-2020-01-03.csv
Fetch step:
fetch:
kind: FilesGlob
path: /home/username/data/db-table-dump-*.csv
eventTime:
kind: FromPath
pattern: 'db-table-dump-(\d+-\d+-\d+)\.csv'
timestampFormat: '%Y-%m-%d'
cache:
kind: Forever
See: FetchStep::FilesGlob
Dealing with API Keys
Sometimes you may want to parametrize the URL to include things like API keys and auth tokens. For this kamu supports basic variable substitution:
fetch:
kind: Url
url: "https://api.etherscan.io/api?apikey=${{ env.ETHERSCAN_API_KEY }}"
You can use default values for parameters:
fetch:
kind: Container
image: "ghcr.io/kamu-data/fetch-com.defillama:${{ env.IMAGE_VERSION || '0.1.5' }}"
args:
- --request-interval
- '${{ env.request_interval || 2 }}'
Also, there’s a possibility of specifying several possible options:
fetch:
kind: Container
image: "ghcr.io/kamu-data/fetch-com.github.stats:0.1.0 }}"
args:
- --access-token
- ${{ env.GH_TOKEN || env.GITHUB_TOKEN || 'demo' }}
Using Ingest Scripts
Sometimes you may need the power of a general purpose programming language to deal with particularly complex API, or when doing web scraping. For this kamu supports containerized ingestion tasks:
fetch:
kind: Container
image: "ghcr.io/kamu-data/fetch-my-dataset:0.1.0"
env:
- name: ETH_NODE_PROVIDER_URL
The specified container image is expected to conform to the following interface:
- Produce data to
stdout - Write warnings / errors to
sterr - Use following environment variables:
ODF_LAST_MODIFIED- last modified time of data from the previous ingest run, if any (in RFC3339 format)ODF_ETAG- caching tag of data from the previous ingest run, if anyODF_BATCH_SIZE- is the recommended number of records, for ingest scripts that provide continuous stream of data and can resume from previous state- default value is 10 000, can be overridden via
env
- default value is 10 000, can be overridden via
ODF_NEW_LAST_MODIFIED_PATH- path to a text file where ingest script may write newLast-ModifiedtimestampODF_NEW_ETAG_PATH- path to a text file where ingest script may write neweTagODF_NEW_HAS_MORE_DATA_PATH- path to a text file which ingest script can create to indicate about having more data for the next batch- ⚠️ Please note: if the file is created, one of the following output marks must also be present:
eTagorLast-Modifiedtimestamp
- ⚠️ Please note: if the file is created, one of the following output marks must also be present:
Need More Examples?
To give you more examples on how to deal with different ingest scenarios we’ve created an experimental repository where we publish Root Dataset manifests for a variety of Open Data sources - check out kamu-contrib repo.