Airbyte source connector for RKI COVID-19 data from the German Robert Koch-Institut API
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Incremental streams providing historical COVID-19 data for Germany with date-based cursor synchronization. These streams support efficient incremental updates by tracking the latest data date and only syncing new records.
Historical COVID-19 cases data for Germany with incremental synchronization.
class GermanyHistoryCases(IncrementalRkiCovidStream):
"""
Historical COVID-19 cases data for Germany.
API Endpoint: https://api.corona-zahlen.org/germany/history/cases/:days
Sync Mode: Incremental
Cursor Field: date
Primary Key: None
Provides historical daily cases data with incremental sync support
based on date field. Days parameter calculated from start_date.
"""
primary_key = None
def __init__(self, config, **kwargs):
"""
Initialize with configuration containing start_date.
Parameters:
- config: dict containing 'start_date' in YYYY-MM-DD format
"""
@property
def cursor_field(self) -> str:
"""Returns 'date' - the field used for incremental sync"""
@property
def source_defined_cursor(self) -> bool:
"""Returns False - cursor managed by connector, not API"""
def date_to_int(self, start_date) -> int:
"""
Convert start_date to days parameter for API.
Calculates difference between start_date and current date.
Returns minimum of 1 if date is in future.
"""
def get_updated_state(self, current_stream_state, latest_record):
"""
Update stream state with latest record date.
Compares cursor field values and returns state with
the maximum (most recent) date value.
"""
def read_records(self, stream_state=None, **kwargs):
"""
Read records with incremental filtering.
Filters records to only return those with dates
newer than the current stream state cursor.
"""
def path(self, stream_state=None, stream_slice=None, next_page_token=None) -> str:
"""Returns path with calculated days: 'germany/history/cases/{days}'"""Historical COVID-19 incidence rates for Germany.
class GermanHistoryIncidence(IncrementalRkiCovidStream):
"""
Historical COVID-19 incidence data for Germany.
API Endpoint: https://api.corona-zahlen.org/germany/history/incidence/:days
Sync Mode: Incremental
Cursor Field: date
Primary Key: None
Provides historical 7-day incidence rates per 100,000 population
with incremental synchronization capability.
"""
primary_key = None
cursor_field = "date"
source_defined_cursor = FalseHistorical COVID-19 deaths data for Germany.
class GermanHistoryDeaths(IncrementalRkiCovidStream):
"""
Historical COVID-19 deaths data for Germany.
API Endpoint: https://api.corona-zahlen.org/germany/history/deaths/:days
Sync Mode: Incremental
Cursor Field: date
Primary Key: None
Provides historical daily deaths data with incremental sync
for tracking mortality trends over time.
"""
primary_key = None
cursor_field = "date"
source_defined_cursor = FalseHistorical COVID-19 recovery data for Germany.
class GermanHistoryRecovered(IncrementalRkiCovidStream):
"""
Historical COVID-19 recovered cases data for Germany.
API Endpoint: https://api.corona-zahlen.org/germany/history/recovered/:days
Sync Mode: Incremental
Cursor Field: date
Primary Key: None
Provides historical daily recovery data with incremental sync
for tracking recovery trends and calculating active cases.
"""
primary_key = None
cursor_field = "date"
source_defined_cursor = FalseHistorical COVID-19 frozen incidence data for Germany.
class GermanHistoryFrozenIncidence(IncrementalRkiCovidStream):
"""
Historical COVID-19 frozen incidence data for Germany.
API Endpoint: https://api.corona-zahlen.org/germany/history/frozen-incidence/:days
Sync Mode: Incremental
Cursor Field: date
Primary Key: None
Provides historical frozen incidence rates - incidence values
that are frozen at specific points in time for consistent
reporting and trend analysis.
"""
primary_key = None
cursor_field = "date"
source_defined_cursor = False
def parse_response(self, response, **kwargs):
"""
Parse frozen incidence response.
Extracts history data from nested response structure:
response.json().get("data").get("history")
"""Historical COVID-19 hospitalization data for Germany.
class GermanHistoryHospitalization(IncrementalRkiCovidStream):
"""
Historical COVID-19 hospitalization data for Germany.
API Endpoint: https://api.corona-zahlen.org/germany/history/hospitalization/:days
Sync Mode: Incremental
Cursor Field: date
Primary Key: None
Provides historical hospitalization metrics including
new admissions and ICU utilization with incremental sync.
"""
primary_key = None
cursor_field = "date"
source_defined_cursor = FalseAll historical Germany streams inherit from IncrementalRkiCovidStream.
class IncrementalRkiCovidStream(RkiCovidStream, ABC):
"""
Base class for incremental RKI COVID streams.
Extends RkiCovidStream with incremental sync capabilities:
- Cursor field management
- Stream state tracking
- Incremental record filtering
- State checkpoint handling
"""
state_checkpoint_interval = None
@property
def cursor_field(self) -> str:
"""
Abstract property defining the cursor field name.
Must be implemented by subclasses to specify which
field is used for incremental synchronization.
"""
def get_updated_state(self, current_stream_state, latest_record):
"""
Abstract method for updating stream state.
Must be implemented by subclasses to define how
the stream state is updated with new records.
"""from source_rki_covid import SourceRkiCovid
source = SourceRkiCovid()
config = {"start_date": "2023-01-01"}
# Get historical streams
streams = source.streams(config)
historical_streams = [
stream for stream in streams
if 'History' in stream.__class__.__name__ and
'States' not in stream.__class__.__name__
]
print(f"Historical Germany streams: {len(historical_streams)}") # 6 streams# Example with initial sync (no stream state)
cases_stream = GermanyHistoryCases(config=config)
print(f"Cursor field: {cases_stream.cursor_field}") # 'date'
# Read all records from start_date
for record in cases_stream.read_records():
print(f"Date: {record['date']}, Cases: {record.get('cases', 0)}")# Example with existing stream state
current_state = {"date": "2023-06-15"}
# Read only new records after the state date
for record in cases_stream.read_records(stream_state=current_state):
print(f"New record - Date: {record['date']}")
# Update state with latest record
updated_state = cases_stream.get_updated_state(current_state, record)
current_state = updated_statefrom datetime import datetime
# Understanding the date_to_int method
cases_stream = GermanyHistoryCases(config={"start_date": "2023-01-01"})
# Calculate days parameter for API call
days = cases_stream.date_to_int("2023-01-01")
print(f"Days parameter: {days}")
# This creates API path: germany/history/cases/{days}
path = cases_stream.path()
print(f"API path: {path}")Historical streams return time-series data with records containing:
Install with Tessl CLI
npx tessl i tessl/pypi-source-rki-covid