CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-source-rki-covid

Airbyte source connector for RKI COVID-19 data from the German Robert Koch-Institut API

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

state-historical-streams.mddocs/

State Historical Data Streams

Full-refresh streams providing historical COVID-19 data for all German states. These streams deliver comprehensive state-level historical data with specialized parsing for multi-state responses but use full-refresh synchronization.

Capabilities

States Historical Cases

Historical COVID-19 cases data for all German states.

class StatesHistoryCases(ByStateRkiCovidStream):
    """
    Historical COVID-19 cases data for all German states.
    
    API Endpoint: https://api.corona-zahlen.org/states/history/cases/:days
    Sync Mode: Full refresh
    Primary Key: None
    
    Provides historical daily cases data for each German state
    with state names and abbreviations included in each record.
    """
    
    primary_key = None
    
    def __init__(self, config, **kwargs):
        """
        Initialize with configuration containing start_date.
        
        Parameters:
        - config: dict containing 'start_date' in YYYY-MM-DD format
        """
        
    def date_to_int(self, start_date) -> int:
        """
        Convert start_date to days parameter for API.
        
        Calculates difference between start_date and current date.
        Returns minimum of 1 if date is in future.
        """
        
    def path(self, stream_state=None, stream_slice=None, next_page_token=None) -> str:
        """Returns path with days: 'states/history/cases/{days}'"""

States Historical Incidence

Historical COVID-19 incidence data for all German states.

class StatesHistoryIncidence(ByStateRkiCovidStream):
    """
    Historical COVID-19 incidence data for all German states.
    
    API Endpoint: https://api.corona-zahlen.org/states/history/incidence/:days
    Sync Mode: Full refresh
    Primary Key: None
    
    Provides historical 7-day incidence rates per 100,000 population
    for each state with geographic identifiers.
    """
    
    primary_key = None
    
    def path(self, stream_state=None, stream_slice=None, next_page_token=None) -> str:
        """Returns path: 'states/history/incidence/{days}'"""

States Historical Frozen Incidence

Historical COVID-19 frozen incidence data for all German states.

class StatesHistoryFrozenIncidence(ByStateRkiCovidStream):
    """
    Historical COVID-19 frozen incidence data for all German states.
    
    API Endpoint: https://api.corona-zahlen.org/states/history/frozen-incidence/:days
    Sync Mode: Full refresh
    Primary Key: None
    
    Provides historical frozen incidence rates for consistent
    state-level reporting and cross-state comparisons.
    """
    
    primary_key = None
    
    def path(self, stream_state=None, stream_slice=None, next_page_token=None) -> str:
        """Returns path: 'states/history/frozen-incidence/{days}'"""

States Historical Deaths

Historical COVID-19 deaths data for all German states.

class StatesHistoryDeaths(ByStateRkiCovidStream):
    """
    Historical COVID-19 deaths data for all German states.
    
    API Endpoint: https://api.corona-zahlen.org/states/history/deaths/:days
    Sync Mode: Full refresh
    Primary Key: None
    
    Provides historical daily deaths data for each state
    for mortality analysis and state-level trend tracking.
    """
    
    primary_key = None
    
    def path(self, stream_state=None, stream_slice=None, next_page_token=None) -> str:
        """Returns path: 'states/history/deaths/{days}'"""

States Historical Recovered

Historical COVID-19 recovery data for all German states.

class StatesHistoryRecovered(ByStateRkiCovidStream):
    """
    Historical COVID-19 recovery data for all German states.
    
    API Endpoint: https://api.corona-zahlen.org/states/history/recovered/:days
    Sync Mode: Full refresh
    Primary Key: None
    
    Provides historical recovery data for each state to support
    active case calculations and recovery rate analysis.
    """
    
    primary_key = None
    
    def path(self, stream_state=None, stream_slice=None, next_page_token=None) -> str:
        """Returns path: 'states/history/recovered/{days}'"""

States Historical Hospitalization

Historical COVID-19 hospitalization data for all German states.

class StatesHistoryHospitalization(ByStateRkiCovidStream):
    """
    Historical COVID-19 hospitalization data for all German states.
    
    API Endpoint: https://api.corona-zahlen.org/states/history/hospitalization/:days
    Sync Mode: Full refresh
    Primary Key: None
    
    Provides historical hospitalization metrics for each state
    including admissions and ICU utilization data.
    """
    
    primary_key = None
    
    def path(self, stream_state=None, stream_slice=None, next_page_token=None) -> str:
        """Returns path: 'states/history/hospitalization/{days}'"""

Base State Stream Class

All state historical streams inherit from ByStateRkiCovidStream.

class ByStateRkiCovidStream(RkiCovidStream, ABC):
    """
    Base class for state-level historical data streams.
    
    Extends RkiCovidStream with specialized parsing for multi-state
    API responses that contain nested data structures organized by state.
    """
    
    def parse_response(self, response, **kwargs):
        """
        Parse multi-state historical response structure.
        
        Processes API responses where each state is keyed by abbreviation
        and contains a 'history' array of historical records.
        
        Response Structure:
        {
            "data": {
                "BW": {"name": "Baden-Württemberg", "history": [...]},
                "BY": {"name": "Bayern", "history": [...]},
                ...
            }
        }
        
        Returns:
        Flattened records where each historical data point includes:
        - Original historical data fields
        - name: Full state name
        - abbreviation: Two-letter state code
        """

Usage Examples

Accessing State Historical Data

from source_rki_covid import SourceRkiCovid

source = SourceRkiCovid()
config = {"start_date": "2023-01-01"}

# Get all streams
streams = source.streams(config)

# Filter for state historical streams
state_streams = [
    stream for stream in streams 
    if stream.__class__.__name__.startswith('StatesHistory')
]

print(f"State historical streams: {len(state_streams)}")  # 6 streams

Reading Multi-State Data

# Example with States Cases stream
states_cases = StatesHistoryCases(config=config)

# Read historical data for all states
state_data = {}
for record in states_cases.read_records():
    state_name = record['name']
    state_abbr = record['abbreviation']
    date = record['date']
    cases = record.get('cases', 0)
    
    if state_name not in state_data:
        state_data[state_name] = []
    
    state_data[state_name].append({
        'date': date,
        'cases': cases,
        'abbreviation': state_abbr
    })

print(f"States with data: {len(state_data)}")  # 16 German states

Date Range Processing

from datetime import datetime, timedelta

# Calculate API parameters
states_incidence = StatesHistoryIncidence(config={"start_date": "2023-03-01"})

# Check days calculation
days = states_incidence.date_to_int("2023-03-01")
print(f"Requesting {days} days of historical data")

# API endpoint will be: states/history/incidence/{days}
path = states_incidence.path()
print(f"API path: {path}")

State-Level Analysis

# Analyze data by specific states
target_states = ['Bayern', 'Nordrhein-Westfalen', 'Baden-Württemberg']

states_deaths = StatesHistoryDeaths(config=config)

for record in states_deaths.read_records():
    if record['name'] in target_states:
        print(f"{record['name']} ({record['abbreviation']}): "
              f"{record['date']} - Deaths: {record.get('deaths', 0)}")

Data Structure

State historical streams return records with the following structure:

Historical Data Fields

  • date: Date string in YYYY-MM-DD format
  • cases/deaths/recovered: Daily counts for the specific metric
  • incidence: 7-day incidence rate per 100,000 population
  • weekIncidence: Weekly incidence calculations
  • delta: Day-over-day changes in metrics

State Identification Fields

  • name: Full German state name (e.g., "Baden-Württemberg")
  • abbreviation: Two-letter state code (e.g., "BW")

Metadata Fields

  • population: State population for rate calculations
  • meta: Additional metadata including last update timestamps

German States Coverage

The streams provide data for all 16 German federal states (Bundesländer):

  • Baden-Württemberg (BW)
  • Bayern (BY)
  • Berlin (BE)
  • Brandenburg (BB)
  • Bremen (HB)
  • Hamburg (HH)
  • Hessen (HE)
  • Mecklenburg-Vorpommern (MV)
  • Niedersachsen (NI)
  • Nordrhein-Westfalen (NW)
  • Rheinland-Pfalz (RP)
  • Saarland (SL)
  • Sachsen (SN)
  • Sachsen-Anhalt (ST)
  • Schleswig-Holstein (SH)
  • Thüringen (TH)

Sync Behavior

Unlike Germany historical streams, state historical streams use full refresh synchronization:

  1. Complete Data Fetch: Retrieves all historical data for all states on each sync
  2. No State Tracking: Does not maintain incremental state between syncs
  3. Comprehensive Coverage: Ensures complete historical dataset for all states
  4. API Efficiency: Uses date range parameter to limit response size
  5. Data Consistency: Full refresh ensures no missing state data

Install with Tessl CLI

npx tessl i tessl/pypi-source-rki-covid

docs

current-data-streams.md

entry-points.md

historical-data-streams.md

index.md

main-source.md

state-historical-streams.md

tile.json