CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-source-rki-covid

Airbyte source connector for RKI COVID-19 data from the German Robert Koch-Institut API

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

current-data-streams.mddocs/

Current Data Streams

Full-refresh streams providing current COVID-19 statistics snapshots. These streams deliver the most recent available data for Germany overall, individual states, and age group breakdowns without historical tracking.

Capabilities

Germany Overall Statistics

Current COVID-19 statistics for Germany as a whole.

class Germany(RkiCovidStream):
    """
    Current COVID-19 statistics for Germany overall.
    
    API Endpoint: https://api.corona-zahlen.org/germany/
    Sync Mode: Full refresh
    Primary Key: None
    
    Returns current national-level statistics including cases, deaths,
    recovered, incidence rates, and other epidemiological metrics.
    """
    
    primary_key = None
    
    def path(self, stream_state=None, stream_slice=None, next_page_token=None) -> str:
        """Returns API path: 'germany/'"""

Germany States Statistics

Current COVID-19 statistics for all German states (Bundesländer).

class GermanyStates(RkiCovidStream):
    """
    Current COVID-19 statistics for all German states.
    
    API Endpoint: https://api.corona-zahlen.org/states/
    Sync Mode: Full refresh
    Primary Key: None
    
    Returns current statistics for each German state including
    state name, abbreviation, and all COVID-19 metrics.
    """
    
    primary_key = None
    
    def path(self, stream_state=None, stream_slice=None, next_page_token=None) -> str:
        """Returns API path: 'states/'"""
        
    def parse_response(self, response, **kwargs):
        """
        Parses nested state data from API response.
        
        Extracts individual state records from response.json().get("data")
        where each state is keyed by abbreviation.
        """

Germany Age Groups Statistics

Current COVID-19 statistics broken down by age groups for Germany.

class GermanyAgeGroups(RkiCovidStream):
    """
    Current COVID-19 statistics by age groups for Germany.
    
    API Endpoint: https://api.corona-zahlen.org/germany/age-groups
    Sync Mode: Full refresh  
    Primary Key: None
    
    Returns current age-stratified statistics for epidemiological
    analysis by demographic groups.
    """
    
    primary_key = None
    
    def path(self, stream_state=None, stream_slice=None, next_page_token=None) -> str:
        """Returns API path: 'germany/age-groups'"""
        
    def parse_response(self, response, **kwargs):
        """
        Extracts age group data from API response.
        
        Returns response.json().get("data") containing age group statistics.
        """

Germany States Age Groups Statistics

Current COVID-19 statistics by age groups for each German state.

class GermanyStatesAgeGroups(RkiCovidStream):
    """
    Current COVID-19 statistics by age groups for all German states.
    
    API Endpoint: https://api.corona-zahlen.org/states/age-groups
    Sync Mode: Full refresh
    Primary Key: None
    
    Returns age-stratified statistics for each state, combining
    geographical and demographic breakdowns.
    """
    
    primary_key = None
    
    def path(self, stream_state=None, stream_slice=None, next_page_token=None) -> str:
        """Returns API path: 'states/age-groups'"""
        
    def parse_response(self, response, **kwargs):
        """
        Parses nested states and age groups data.
        
        Flattens the nested structure where each state contains
        multiple age group records, adding state abbreviation
        to each age group record.
        """

Base Stream Class

All current data streams inherit from the base RkiCovidStream class.

class RkiCovidStream(HttpStream, ABC):
    """
    Base class for full-refresh RKI COVID streams.
    
    Provides common functionality including:
    - Base URL: https://api.corona-zahlen.org/
    - No pagination (single page responses)
    - Standard JSON parsing
    - Common request parameter handling
    """
    
    url_base = "https://api.corona-zahlen.org/"
    
    def next_page_token(self, response) -> Optional[Mapping[str, Any]]:
        """No pagination - returns None"""
        
    def request_params(self, stream_state=None, stream_slice=None, next_page_token=None):
        """Returns empty dict - no additional parameters needed"""
        
    def parse_response(self, response, **kwargs):
        """
        Default response parsing.
        
        Returns:
        Single JSON response as iterable containing one record.
        """

Usage Examples

Accessing Current Data

from source_rki_covid import SourceRkiCovid

source = SourceRkiCovid()
config = {"start_date": "2023-01-01"}

# Get all streams
streams = source.streams(config)

# Filter for current data streams
current_streams = [
    stream for stream in streams 
    if stream.__class__.__name__ in [
        'Germany', 'GermanyStates', 'GermanyAgeGroups', 'GermanyStatesAgeGroups'
    ]
]

print(f"Current data streams: {len(current_streams)}")  # 4 streams

Reading Current Statistics

# Example with Germany stream
germany_stream = Germany()

# Read current records (typically one record with current stats)
for record in germany_stream.read_records():
    print(f"Current Germany statistics: {record}")
    
# Example with States stream  
states_stream = GermanyStates()

# Read current records for all states
for record in states_stream.read_records():
    print(f"State: {record.get('name')}, Cases: {record.get('cases')}")

Data Structure

Current data streams return comprehensive COVID-19 statistics including:

  • Cases: Total confirmed cases and daily new cases
  • Deaths: Total deaths and daily new deaths
  • Recovered: Total recovered cases and daily new recovered
  • Incidence: 7-day incidence rates per 100,000 population
  • Hospitalization: Current hospitalization metrics
  • Metadata: Last update timestamps, data sources, geographic identifiers

State-level streams additionally include:

  • State Name: Full German state name
  • Abbreviation: Two-letter state code
  • Population: State population figures for rate calculations

Install with Tessl CLI

npx tessl i tessl/pypi-source-rki-covid

docs

current-data-streams.md

entry-points.md

historical-data-streams.md

index.md

main-source.md

state-historical-streams.md

tile.json