Blockchains, especially those that run smart contracts, have turned into real treasure troves of data and can be viewed as datasets that aggregate millions of smaller datasets within them. Kamu allows you to directly use data from any EVM-compatible blockchain.

Currently we support reading logs, but access to fully reconcilable transaction information is planned.

Configuring RPC Node

Data is read directly from blockchain nodes, so to ingest data your CLI needs to have the RPC URL configured:

kind: CLIConfig
version: 1
content:
  source:
    ethereum:
      rpcEndpoints:
        - chainId: 1
          chainName: Ethereum Mainnet
          nodeUrl: wss://localhost:8545

You can also specify nodeUrl directly in the FetchStep::EthereumLogs (see below).

Accessing Specific Log

Logs can be accessed using FetchStep::EthereumLogs (full example):

kind: DatasetSnapshot
version: 1
content:
  name: net.rocketpool.reth.tokens-minted
  kind: Root
  metadata:
    - kind: SetPollingSource
      fetch:
        kind: EthereumLogs
        # Ethereum Mainnet
        # See: https://chainlist.org/
        chainId: 1
        # Optional signature can be used to decode
        # the event into a nested struct
        signature: |
          TokensMinted(
            address indexed to,
            uint256 amount,
            uint256 ethAmount,
            uint256 time
          )          
        # Filter by contract address and the deployment
        # block to limit scanning. Syntax is the same as
        # in SQL WHERE clause.
        filter: |
          address = X'ae78736cd615f374d3085123a210448e74fc6393'
          and
          block_number > 13325304          
      read:
        kind: Parquet
      preprocess:
        kind: Sql
        engine: datafusion
        query: |
          select
            to_timestamp_seconds(
              cast(event.time as bigint)
            ) as event_time,
            block_number,
            block_hash,
            transaction_index,
            transaction_hash,
            log_index,
            event.to as to,
            event.amount as amount,
            event."ethAmount" as eth_amount
          from input          
      merge:
        kind: Append

The above source declaration will use filter to create the most efficient RPC request to the blockchain node and stream the events using .

The output of the EthereumLogs fetch step is a data structure that corresponds directly to the output of eth_getLogs endpoint, with the addition of optional decoded event field:

NameTypeNullableDescription
block_numberuint64
block_hashbinary
block_timestamptimestampVMany providers don’t yet return blockTimestamp from eth_getLogs
transaction_indexuint64
transaction_hashbinary
log_indexuint64
addressbinary
topic0binaryV
topic1binaryV
topic2binaryV
topic3binaryV
databinary
eventstructIf signature is specified, will contain a decoded version of the event as a nested struct field

Decoding Raw Logs in SQL

It’s also possible to ingest raw logs without decoding them with signature.

In this example we read all logs of the specific contract and decode them later in SQL (full example):

kind: DatasetSnapshot
version: 1
content:
  name: net.rocketpool.reth.mint-burn
  kind: Root
  metadata:
    - kind: SetPollingSource
      fetch:
        kind: EthereumLogs
        # Ethereum Mainnet
        # See: https://chainlist.org/
        chainId: 1
        # Read raw logs, filtering by signatures we care about
        filter: |
          address = X'ae78736cd615f374d3085123a210448e74fc6393'
          and (
            topic0 = eth_event_selector(
              'TokensMinted(address indexed to, uint256 amount, uint256 ethAmount, uint256 time)'
            )
            or topic0 = eth_event_selector(
              'TokensBurned(address indexed from, uint256 amount, uint256 ethAmount, uint256 time)'
            )
          )
          and block_number > 13325304          
      read:
        kind: Parquet
      preprocess:
        kind: Sql
        engine: datafusion
        query: |
          select
            to_timestamp_seconds(
              cast(json_get_str(event, 'time') as bigint
            ) as event_time,
            block_number,
            block_hash,
            transaction_index,
            transaction_hash,
            log_index,
            json_get_str(event, 'name') as event_name,
            decode(
              coalesce(
                json_get_str(event, 'to'),
                json_get_str(event, 'from')
              ),
              'hex'
            ) as holder_address,
            json_get_str(event, 'amount') as amount,
            json_get_str(event, 'ethAmount') as eth_amount
          from (
            select
              *,
              coalesce(
                eth_try_decode_event(
                  'TokensMinted(address indexed to, uint256 amount, uint256 ethAmount, uint256 time)',
                  topic0,
                  topic1,
                  topic2,
                  topic3,
                  data
                ),
                eth_try_decode_event(
                  'TokensBurned(address indexed from, uint256 amount, uint256 ethAmount, uint256 time)',
                  topic0,
                  topic1,
                  topic2,
                  topic3,
                  data
                )
              ) as event
            from input
          )          
      merge:
        kind: Append

The above code is using eth_try_decode_event function provided by datafusion-ethers extension to the SQL engine. Upon success this function returns a decoded event as JSON string, which we then take apart using datafusion-functions-json set of functions.

Future Work

Blockchain data satisfies all properties of an ODF dataset (history-preserving, reproducible, verifiable), with differences primarily in encoding. We therefore see Kamu NOT as an indexer (solutions that copy data from blockchains into queryable databases, leading to recentralization concerns). Instead we see blockchains as a natural extension of the Open Data Fabric network.

Integrating blockchains as data sources and consumers of data via Kamu Oracle makes Kamu the first system where you can go through the whole cycle (reading on-chain data, merging it with off-chain data, providing data back on-chain to a smart contract) within one solution, with just SQL, with full verifiability.

Our future work will focus on further erasing the boundary between on- and off-chain data.