Airbyte source connector for RKI COVID-19 data from the German Robert Koch-Institut API
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Full-refresh streams providing current COVID-19 statistics snapshots. These streams deliver the most recent available data for Germany overall, individual states, and age group breakdowns without historical tracking.
Current COVID-19 statistics for Germany as a whole.
class Germany(RkiCovidStream):
"""
Current COVID-19 statistics for Germany overall.
API Endpoint: https://api.corona-zahlen.org/germany/
Sync Mode: Full refresh
Primary Key: None
Returns current national-level statistics including cases, deaths,
recovered, incidence rates, and other epidemiological metrics.
"""
primary_key = None
def path(self, stream_state=None, stream_slice=None, next_page_token=None) -> str:
"""Returns API path: 'germany/'"""Current COVID-19 statistics for all German states (Bundesländer).
class GermanyStates(RkiCovidStream):
"""
Current COVID-19 statistics for all German states.
API Endpoint: https://api.corona-zahlen.org/states/
Sync Mode: Full refresh
Primary Key: None
Returns current statistics for each German state including
state name, abbreviation, and all COVID-19 metrics.
"""
primary_key = None
def path(self, stream_state=None, stream_slice=None, next_page_token=None) -> str:
"""Returns API path: 'states/'"""
def parse_response(self, response, **kwargs):
"""
Parses nested state data from API response.
Extracts individual state records from response.json().get("data")
where each state is keyed by abbreviation.
"""Current COVID-19 statistics broken down by age groups for Germany.
class GermanyAgeGroups(RkiCovidStream):
"""
Current COVID-19 statistics by age groups for Germany.
API Endpoint: https://api.corona-zahlen.org/germany/age-groups
Sync Mode: Full refresh
Primary Key: None
Returns current age-stratified statistics for epidemiological
analysis by demographic groups.
"""
primary_key = None
def path(self, stream_state=None, stream_slice=None, next_page_token=None) -> str:
"""Returns API path: 'germany/age-groups'"""
def parse_response(self, response, **kwargs):
"""
Extracts age group data from API response.
Returns response.json().get("data") containing age group statistics.
"""Current COVID-19 statistics by age groups for each German state.
class GermanyStatesAgeGroups(RkiCovidStream):
"""
Current COVID-19 statistics by age groups for all German states.
API Endpoint: https://api.corona-zahlen.org/states/age-groups
Sync Mode: Full refresh
Primary Key: None
Returns age-stratified statistics for each state, combining
geographical and demographic breakdowns.
"""
primary_key = None
def path(self, stream_state=None, stream_slice=None, next_page_token=None) -> str:
"""Returns API path: 'states/age-groups'"""
def parse_response(self, response, **kwargs):
"""
Parses nested states and age groups data.
Flattens the nested structure where each state contains
multiple age group records, adding state abbreviation
to each age group record.
"""All current data streams inherit from the base RkiCovidStream class.
class RkiCovidStream(HttpStream, ABC):
"""
Base class for full-refresh RKI COVID streams.
Provides common functionality including:
- Base URL: https://api.corona-zahlen.org/
- No pagination (single page responses)
- Standard JSON parsing
- Common request parameter handling
"""
url_base = "https://api.corona-zahlen.org/"
def next_page_token(self, response) -> Optional[Mapping[str, Any]]:
"""No pagination - returns None"""
def request_params(self, stream_state=None, stream_slice=None, next_page_token=None):
"""Returns empty dict - no additional parameters needed"""
def parse_response(self, response, **kwargs):
"""
Default response parsing.
Returns:
Single JSON response as iterable containing one record.
"""from source_rki_covid import SourceRkiCovid
source = SourceRkiCovid()
config = {"start_date": "2023-01-01"}
# Get all streams
streams = source.streams(config)
# Filter for current data streams
current_streams = [
stream for stream in streams
if stream.__class__.__name__ in [
'Germany', 'GermanyStates', 'GermanyAgeGroups', 'GermanyStatesAgeGroups'
]
]
print(f"Current data streams: {len(current_streams)}") # 4 streams# Example with Germany stream
germany_stream = Germany()
# Read current records (typically one record with current stats)
for record in germany_stream.read_records():
print(f"Current Germany statistics: {record}")
# Example with States stream
states_stream = GermanyStates()
# Read current records for all states
for record in states_stream.read_records():
print(f"State: {record.get('name')}, Cases: {record.get('cases')}")Current data streams return comprehensive COVID-19 statistics including:
State-level streams additionally include:
Install with Tessl CLI
npx tessl i tessl/pypi-source-rki-covid