Airbyte source connector for RKI COVID-19 data from the German Robert Koch-Institut API
npx @tessl/cli install tessl/pypi-source-rki-covid@0.1.0An Airbyte source connector for accessing COVID-19 data from the German Robert Koch-Institut (RKI) public API. This connector provides comprehensive access to German epidemiological data including national statistics, state-level breakdowns, age group analyses, and historical trends across multiple metrics.
from source_rki_covid import SourceRkiCovidFor running the connector:
from source_rki_covid.run import runfrom source_rki_covid import SourceRkiCovid
# Initialize the source
source = SourceRkiCovid()
# Configuration (requires start_date)
config = {
"start_date": "2023-01-01" # UTC date in YYYY-MM-DD format
}
# Check connection
is_valid, error = source.check_connection(logger=None, config=config)
# Get available streams
streams = source.streams(config)
print(f"Available streams: {len(streams)}") # Returns 16 streamsThe connector follows Airbyte's standard source architecture using the Airbyte CDK:
The connector organizes COVID-19 data into logical streams covering:
The primary connector interface providing connection testing and stream discovery functionality.
class SourceRkiCovid(AbstractSource):
def check_connection(self, logger, config) -> Tuple[bool, any]: ...
def streams(self, config: Mapping[str, Any]) -> List[Stream]: ...Full-refresh streams providing current COVID-19 statistics snapshots for Germany overall, individual states, and age group breakdowns.
class Germany(RkiCovidStream): ...
class GermanyStates(RkiCovidStream): ...
class GermanyAgeGroups(RkiCovidStream): ...
class GermanyStatesAgeGroups(RkiCovidStream): ...Incremental streams providing historical COVID-19 data for Germany with date-based cursor synchronization. Supports cases, deaths, recovered, incidence, frozen incidence, and hospitalization metrics.
class GermanyHistoryCases(IncrementalRkiCovidStream): ...
class GermanHistoryIncidence(IncrementalRkiCovidStream): ...
class GermanHistoryDeaths(IncrementalRkiCovidStream): ...
class GermanHistoryRecovered(IncrementalRkiCovidStream): ...
class GermanHistoryFrozenIncidence(IncrementalRkiCovidStream): ...
class GermanHistoryHospitalization(IncrementalRkiCovidStream): ...Historical Data Streams (Germany)
Full-refresh streams providing historical COVID-19 data for all German states. Covers the same metrics as Germany historical streams but with state-level granularity and different sync behavior.
class StatesHistoryCases(ByStateRkiCovidStream): ...
class StatesHistoryIncidence(ByStateRkiCovidStream): ...
class StatesHistoryFrozenIncidence(ByStateRkiCovidStream): ...
class StatesHistoryDeaths(ByStateRkiCovidStream): ...
class StatesHistoryRecovered(ByStateRkiCovidStream): ...
class StatesHistoryHospitalization(ByStateRkiCovidStream): ...Console script entry point and programmatic execution functions for running the connector.
def run(): ...{
"start_date": {
"type": "string",
"title": "Start Date",
"description": "UTC date in the format 2017-01-25. Any data before this date will not be replicated.",
"required": True
}
}from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
from abc import ABC
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
import requestsThe connector provides 16 total streams organized into three categories:
All streams connect to the RKI COVID API at https://api.corona-zahlen.org/ and provide comprehensive coverage of German COVID-19 epidemiological data for analytics, reporting, and research applications.