Airbyte source connector for RKI COVID-19 data from the German Robert Koch-Institut API
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Full-refresh streams providing historical COVID-19 data for all German states. These streams deliver comprehensive state-level historical data with specialized parsing for multi-state responses but use full-refresh synchronization.
Historical COVID-19 cases data for all German states.
class StatesHistoryCases(ByStateRkiCovidStream):
"""
Historical COVID-19 cases data for all German states.
API Endpoint: https://api.corona-zahlen.org/states/history/cases/:days
Sync Mode: Full refresh
Primary Key: None
Provides historical daily cases data for each German state
with state names and abbreviations included in each record.
"""
primary_key = None
def __init__(self, config, **kwargs):
"""
Initialize with configuration containing start_date.
Parameters:
- config: dict containing 'start_date' in YYYY-MM-DD format
"""
def date_to_int(self, start_date) -> int:
"""
Convert start_date to days parameter for API.
Calculates difference between start_date and current date.
Returns minimum of 1 if date is in future.
"""
def path(self, stream_state=None, stream_slice=None, next_page_token=None) -> str:
"""Returns path with days: 'states/history/cases/{days}'"""Historical COVID-19 incidence data for all German states.
class StatesHistoryIncidence(ByStateRkiCovidStream):
"""
Historical COVID-19 incidence data for all German states.
API Endpoint: https://api.corona-zahlen.org/states/history/incidence/:days
Sync Mode: Full refresh
Primary Key: None
Provides historical 7-day incidence rates per 100,000 population
for each state with geographic identifiers.
"""
primary_key = None
def path(self, stream_state=None, stream_slice=None, next_page_token=None) -> str:
"""Returns path: 'states/history/incidence/{days}'"""Historical COVID-19 frozen incidence data for all German states.
class StatesHistoryFrozenIncidence(ByStateRkiCovidStream):
"""
Historical COVID-19 frozen incidence data for all German states.
API Endpoint: https://api.corona-zahlen.org/states/history/frozen-incidence/:days
Sync Mode: Full refresh
Primary Key: None
Provides historical frozen incidence rates for consistent
state-level reporting and cross-state comparisons.
"""
primary_key = None
def path(self, stream_state=None, stream_slice=None, next_page_token=None) -> str:
"""Returns path: 'states/history/frozen-incidence/{days}'"""Historical COVID-19 deaths data for all German states.
class StatesHistoryDeaths(ByStateRkiCovidStream):
"""
Historical COVID-19 deaths data for all German states.
API Endpoint: https://api.corona-zahlen.org/states/history/deaths/:days
Sync Mode: Full refresh
Primary Key: None
Provides historical daily deaths data for each state
for mortality analysis and state-level trend tracking.
"""
primary_key = None
def path(self, stream_state=None, stream_slice=None, next_page_token=None) -> str:
"""Returns path: 'states/history/deaths/{days}'"""Historical COVID-19 recovery data for all German states.
class StatesHistoryRecovered(ByStateRkiCovidStream):
"""
Historical COVID-19 recovery data for all German states.
API Endpoint: https://api.corona-zahlen.org/states/history/recovered/:days
Sync Mode: Full refresh
Primary Key: None
Provides historical recovery data for each state to support
active case calculations and recovery rate analysis.
"""
primary_key = None
def path(self, stream_state=None, stream_slice=None, next_page_token=None) -> str:
"""Returns path: 'states/history/recovered/{days}'"""Historical COVID-19 hospitalization data for all German states.
class StatesHistoryHospitalization(ByStateRkiCovidStream):
"""
Historical COVID-19 hospitalization data for all German states.
API Endpoint: https://api.corona-zahlen.org/states/history/hospitalization/:days
Sync Mode: Full refresh
Primary Key: None
Provides historical hospitalization metrics for each state
including admissions and ICU utilization data.
"""
primary_key = None
def path(self, stream_state=None, stream_slice=None, next_page_token=None) -> str:
"""Returns path: 'states/history/hospitalization/{days}'"""All state historical streams inherit from ByStateRkiCovidStream.
class ByStateRkiCovidStream(RkiCovidStream, ABC):
"""
Base class for state-level historical data streams.
Extends RkiCovidStream with specialized parsing for multi-state
API responses that contain nested data structures organized by state.
"""
def parse_response(self, response, **kwargs):
"""
Parse multi-state historical response structure.
Processes API responses where each state is keyed by abbreviation
and contains a 'history' array of historical records.
Response Structure:
{
"data": {
"BW": {"name": "Baden-Württemberg", "history": [...]},
"BY": {"name": "Bayern", "history": [...]},
...
}
}
Returns:
Flattened records where each historical data point includes:
- Original historical data fields
- name: Full state name
- abbreviation: Two-letter state code
"""from source_rki_covid import SourceRkiCovid
source = SourceRkiCovid()
config = {"start_date": "2023-01-01"}
# Get all streams
streams = source.streams(config)
# Filter for state historical streams
state_streams = [
stream for stream in streams
if stream.__class__.__name__.startswith('StatesHistory')
]
print(f"State historical streams: {len(state_streams)}") # 6 streams# Example with States Cases stream
states_cases = StatesHistoryCases(config=config)
# Read historical data for all states
state_data = {}
for record in states_cases.read_records():
state_name = record['name']
state_abbr = record['abbreviation']
date = record['date']
cases = record.get('cases', 0)
if state_name not in state_data:
state_data[state_name] = []
state_data[state_name].append({
'date': date,
'cases': cases,
'abbreviation': state_abbr
})
print(f"States with data: {len(state_data)}") # 16 German statesfrom datetime import datetime, timedelta
# Calculate API parameters
states_incidence = StatesHistoryIncidence(config={"start_date": "2023-03-01"})
# Check days calculation
days = states_incidence.date_to_int("2023-03-01")
print(f"Requesting {days} days of historical data")
# API endpoint will be: states/history/incidence/{days}
path = states_incidence.path()
print(f"API path: {path}")# Analyze data by specific states
target_states = ['Bayern', 'Nordrhein-Westfalen', 'Baden-Württemberg']
states_deaths = StatesHistoryDeaths(config=config)
for record in states_deaths.read_records():
if record['name'] in target_states:
print(f"{record['name']} ({record['abbreviation']}): "
f"{record['date']} - Deaths: {record.get('deaths', 0)}")State historical streams return records with the following structure:
The streams provide data for all 16 German federal states (Bundesländer):
Unlike Germany historical streams, state historical streams use full refresh synchronization:
Install with Tessl CLI
npx tessl i tessl/pypi-source-rki-covid