or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

current-data-streams.mdentry-points.mdhistorical-data-streams.mdindex.mdmain-source.mdstate-historical-streams.md
tile.json

tessl/pypi-source-rki-covid

Airbyte source connector for RKI COVID-19 data from the German Robert Koch-Institut API

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/source-rki-covid@0.1.x

To install, run

npx @tessl/cli install tessl/pypi-source-rki-covid@0.1.0

index.mddocs/

Source RKI COVID

An Airbyte source connector for accessing COVID-19 data from the German Robert Koch-Institut (RKI) public API. This connector provides comprehensive access to German epidemiological data including national statistics, state-level breakdowns, age group analyses, and historical trends across multiple metrics.

Package Information

  • Package Name: source-rki-covid
  • Package Type: Python (Airbyte connector)
  • Language: Python
  • Installation: Available through Airbyte connector catalog
  • Dependencies: airbyte-cdk 0.80.0

Core Imports

from source_rki_covid import SourceRkiCovid

For running the connector:

from source_rki_covid.run import run

Basic Usage

from source_rki_covid import SourceRkiCovid

# Initialize the source
source = SourceRkiCovid()

# Configuration (requires start_date)
config = {
    "start_date": "2023-01-01"  # UTC date in YYYY-MM-DD format
}

# Check connection
is_valid, error = source.check_connection(logger=None, config=config)

# Get available streams
streams = source.streams(config)
print(f"Available streams: {len(streams)}")  # Returns 16 streams

Architecture

The connector follows Airbyte's standard source architecture using the Airbyte CDK:

  • SourceRkiCovid: Main source class inheriting from AbstractSource
  • Stream Classes: 16 specialized stream classes for different data endpoints
  • Base Classes: Abstract base classes providing common functionality
  • Configuration: Simple schema requiring only a start_date parameter

The connector organizes COVID-19 data into logical streams covering:

  • Current snapshots (Germany, states, age groups)
  • Historical trends with incremental sync capability
  • State-level historical data with full refresh

Capabilities

Main Source Interface

The primary connector interface providing connection testing and stream discovery functionality.

class SourceRkiCovid(AbstractSource):
    def check_connection(self, logger, config) -> Tuple[bool, any]: ...
    def streams(self, config: Mapping[str, Any]) -> List[Stream]: ...

Main Source

Current Data Streams

Full-refresh streams providing current COVID-19 statistics snapshots for Germany overall, individual states, and age group breakdowns.

class Germany(RkiCovidStream): ...
class GermanyStates(RkiCovidStream): ...
class GermanyAgeGroups(RkiCovidStream): ...
class GermanyStatesAgeGroups(RkiCovidStream): ...

Current Data Streams

Historical Data Streams (Germany)

Incremental streams providing historical COVID-19 data for Germany with date-based cursor synchronization. Supports cases, deaths, recovered, incidence, frozen incidence, and hospitalization metrics.

class GermanyHistoryCases(IncrementalRkiCovidStream): ...
class GermanHistoryIncidence(IncrementalRkiCovidStream): ...
class GermanHistoryDeaths(IncrementalRkiCovidStream): ...
class GermanHistoryRecovered(IncrementalRkiCovidStream): ...
class GermanHistoryFrozenIncidence(IncrementalRkiCovidStream): ...
class GermanHistoryHospitalization(IncrementalRkiCovidStream): ...

Historical Data Streams (Germany)

State Historical Data Streams

Full-refresh streams providing historical COVID-19 data for all German states. Covers the same metrics as Germany historical streams but with state-level granularity and different sync behavior.

class StatesHistoryCases(ByStateRkiCovidStream): ...
class StatesHistoryIncidence(ByStateRkiCovidStream): ...
class StatesHistoryFrozenIncidence(ByStateRkiCovidStream): ...
class StatesHistoryDeaths(ByStateRkiCovidStream): ...
class StatesHistoryRecovered(ByStateRkiCovidStream): ...
class StatesHistoryHospitalization(ByStateRkiCovidStream): ...

State Historical Data Streams

Entry Point Functions

Console script entry point and programmatic execution functions for running the connector.

def run(): ...

Entry Point Functions

Configuration Schema

{
    "start_date": {
        "type": "string",
        "title": "Start Date", 
        "description": "UTC date in the format 2017-01-25. Any data before this date will not be replicated.",
        "required": True
    }
}

Types

from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
from abc import ABC
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
import requests

Stream Overview

The connector provides 16 total streams organized into three categories:

  • 4 Current Data Streams: Full-refresh snapshots of current statistics
  • 6 Historical Germany Streams: Incremental sync of historical national data
  • 6 State Historical Streams: Full-refresh historical data for all states

All streams connect to the RKI COVID API at https://api.corona-zahlen.org/ and provide comprehensive coverage of German COVID-19 epidemiological data for analytics, reporting, and research applications.