CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-source-rki-covid

Airbyte source connector for RKI COVID-19 data from the German Robert Koch-Institut API

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

index.mddocs/

Source RKI COVID

An Airbyte source connector for accessing COVID-19 data from the German Robert Koch-Institut (RKI) public API. This connector provides comprehensive access to German epidemiological data including national statistics, state-level breakdowns, age group analyses, and historical trends across multiple metrics.

Package Information

  • Package Name: source-rki-covid
  • Package Type: Python (Airbyte connector)
  • Language: Python
  • Installation: Available through Airbyte connector catalog
  • Dependencies: airbyte-cdk 0.80.0

Core Imports

from source_rki_covid import SourceRkiCovid

For running the connector:

from source_rki_covid.run import run

Basic Usage

from source_rki_covid import SourceRkiCovid

# Initialize the source
source = SourceRkiCovid()

# Configuration (requires start_date)
config = {
    "start_date": "2023-01-01"  # UTC date in YYYY-MM-DD format
}

# Check connection
is_valid, error = source.check_connection(logger=None, config=config)

# Get available streams
streams = source.streams(config)
print(f"Available streams: {len(streams)}")  # Returns 16 streams

Architecture

The connector follows Airbyte's standard source architecture using the Airbyte CDK:

  • SourceRkiCovid: Main source class inheriting from AbstractSource
  • Stream Classes: 16 specialized stream classes for different data endpoints
  • Base Classes: Abstract base classes providing common functionality
  • Configuration: Simple schema requiring only a start_date parameter

The connector organizes COVID-19 data into logical streams covering:

  • Current snapshots (Germany, states, age groups)
  • Historical trends with incremental sync capability
  • State-level historical data with full refresh

Capabilities

Main Source Interface

The primary connector interface providing connection testing and stream discovery functionality.

class SourceRkiCovid(AbstractSource):
    def check_connection(self, logger, config) -> Tuple[bool, any]: ...
    def streams(self, config: Mapping[str, Any]) -> List[Stream]: ...

Main Source

Current Data Streams

Full-refresh streams providing current COVID-19 statistics snapshots for Germany overall, individual states, and age group breakdowns.

class Germany(RkiCovidStream): ...
class GermanyStates(RkiCovidStream): ...
class GermanyAgeGroups(RkiCovidStream): ...
class GermanyStatesAgeGroups(RkiCovidStream): ...

Current Data Streams

Historical Data Streams (Germany)

Incremental streams providing historical COVID-19 data for Germany with date-based cursor synchronization. Supports cases, deaths, recovered, incidence, frozen incidence, and hospitalization metrics.

class GermanyHistoryCases(IncrementalRkiCovidStream): ...
class GermanHistoryIncidence(IncrementalRkiCovidStream): ...
class GermanHistoryDeaths(IncrementalRkiCovidStream): ...
class GermanHistoryRecovered(IncrementalRkiCovidStream): ...
class GermanHistoryFrozenIncidence(IncrementalRkiCovidStream): ...
class GermanHistoryHospitalization(IncrementalRkiCovidStream): ...

Historical Data Streams (Germany)

State Historical Data Streams

Full-refresh streams providing historical COVID-19 data for all German states. Covers the same metrics as Germany historical streams but with state-level granularity and different sync behavior.

class StatesHistoryCases(ByStateRkiCovidStream): ...
class StatesHistoryIncidence(ByStateRkiCovidStream): ...
class StatesHistoryFrozenIncidence(ByStateRkiCovidStream): ...
class StatesHistoryDeaths(ByStateRkiCovidStream): ...
class StatesHistoryRecovered(ByStateRkiCovidStream): ...
class StatesHistoryHospitalization(ByStateRkiCovidStream): ...

State Historical Data Streams

Entry Point Functions

Console script entry point and programmatic execution functions for running the connector.

def run(): ...

Entry Point Functions

Configuration Schema

{
    "start_date": {
        "type": "string",
        "title": "Start Date", 
        "description": "UTC date in the format 2017-01-25. Any data before this date will not be replicated.",
        "required": True
    }
}

Types

from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
from abc import ABC
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
import requests

Stream Overview

The connector provides 16 total streams organized into three categories:

  • 4 Current Data Streams: Full-refresh snapshots of current statistics
  • 6 Historical Germany Streams: Incremental sync of historical national data
  • 6 State Historical Streams: Full-refresh historical data for all states

All streams connect to the RKI COVID API at https://api.corona-zahlen.org/ and provide comprehensive coverage of German COVID-19 epidemiological data for analytics, reporting, and research applications.

Install with Tessl CLI

npx tessl i tessl/pypi-source-rki-covid

docs

current-data-streams.md

entry-points.md

historical-data-streams.md

index.md

main-source.md

state-historical-streams.md

tile.json