CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-source-rki-covid

Airbyte source connector for RKI COVID-19 data from the German Robert Koch-Institut API

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

historical-data-streams.mddocs/

Historical Data Streams (Germany)

Incremental streams providing historical COVID-19 data for Germany with date-based cursor synchronization. These streams support efficient incremental updates by tracking the latest data date and only syncing new records.

Capabilities

Historical Cases Data

Historical COVID-19 cases data for Germany with incremental synchronization.

class GermanyHistoryCases(IncrementalRkiCovidStream):
    """
    Historical COVID-19 cases data for Germany.
    
    API Endpoint: https://api.corona-zahlen.org/germany/history/cases/:days
    Sync Mode: Incremental
    Cursor Field: date
    Primary Key: None
    
    Provides historical daily cases data with incremental sync support
    based on date field. Days parameter calculated from start_date.
    """
    
    primary_key = None
    
    def __init__(self, config, **kwargs):
        """
        Initialize with configuration containing start_date.
        
        Parameters:
        - config: dict containing 'start_date' in YYYY-MM-DD format
        """
        
    @property
    def cursor_field(self) -> str:
        """Returns 'date' - the field used for incremental sync"""
        
    @property  
    def source_defined_cursor(self) -> bool:
        """Returns False - cursor managed by connector, not API"""
        
    def date_to_int(self, start_date) -> int:
        """
        Convert start_date to days parameter for API.
        
        Calculates difference between start_date and current date.
        Returns minimum of 1 if date is in future.
        """
        
    def get_updated_state(self, current_stream_state, latest_record):
        """
        Update stream state with latest record date.
        
        Compares cursor field values and returns state with
        the maximum (most recent) date value.
        """
        
    def read_records(self, stream_state=None, **kwargs):
        """
        Read records with incremental filtering.
        
        Filters records to only return those with dates
        newer than the current stream state cursor.
        """
        
    def path(self, stream_state=None, stream_slice=None, next_page_token=None) -> str:
        """Returns path with calculated days: 'germany/history/cases/{days}'"""

Historical Incidence Data

Historical COVID-19 incidence rates for Germany.

class GermanHistoryIncidence(IncrementalRkiCovidStream):
    """
    Historical COVID-19 incidence data for Germany.
    
    API Endpoint: https://api.corona-zahlen.org/germany/history/incidence/:days
    Sync Mode: Incremental
    Cursor Field: date
    Primary Key: None
    
    Provides historical 7-day incidence rates per 100,000 population
    with incremental synchronization capability.
    """
    
    primary_key = None
    cursor_field = "date"
    source_defined_cursor = False

Historical Deaths Data

Historical COVID-19 deaths data for Germany.

class GermanHistoryDeaths(IncrementalRkiCovidStream):
    """
    Historical COVID-19 deaths data for Germany.
    
    API Endpoint: https://api.corona-zahlen.org/germany/history/deaths/:days
    Sync Mode: Incremental
    Cursor Field: date
    Primary Key: None
    
    Provides historical daily deaths data with incremental sync
    for tracking mortality trends over time.
    """
    
    primary_key = None
    cursor_field = "date"
    source_defined_cursor = False

Historical Recovered Data

Historical COVID-19 recovery data for Germany.

class GermanHistoryRecovered(IncrementalRkiCovidStream):
    """
    Historical COVID-19 recovered cases data for Germany.
    
    API Endpoint: https://api.corona-zahlen.org/germany/history/recovered/:days
    Sync Mode: Incremental
    Cursor Field: date
    Primary Key: None
    
    Provides historical daily recovery data with incremental sync
    for tracking recovery trends and calculating active cases.
    """
    
    primary_key = None
    cursor_field = "date"
    source_defined_cursor = False

Historical Frozen Incidence Data

Historical COVID-19 frozen incidence data for Germany.

class GermanHistoryFrozenIncidence(IncrementalRkiCovidStream):
    """
    Historical COVID-19 frozen incidence data for Germany.
    
    API Endpoint: https://api.corona-zahlen.org/germany/history/frozen-incidence/:days
    Sync Mode: Incremental
    Cursor Field: date
    Primary Key: None
    
    Provides historical frozen incidence rates - incidence values
    that are frozen at specific points in time for consistent
    reporting and trend analysis.
    """
    
    primary_key = None
    cursor_field = "date"
    source_defined_cursor = False
    
    def parse_response(self, response, **kwargs):
        """
        Parse frozen incidence response.
        
        Extracts history data from nested response structure:
        response.json().get("data").get("history")
        """

Historical Hospitalization Data

Historical COVID-19 hospitalization data for Germany.

class GermanHistoryHospitalization(IncrementalRkiCovidStream):
    """
    Historical COVID-19 hospitalization data for Germany.
    
    API Endpoint: https://api.corona-zahlen.org/germany/history/hospitalization/:days
    Sync Mode: Incremental
    Cursor Field: date
    Primary Key: None
    
    Provides historical hospitalization metrics including
    new admissions and ICU utilization with incremental sync.
    """
    
    primary_key = None
    cursor_field = "date" 
    source_defined_cursor = False

Base Incremental Stream Class

All historical Germany streams inherit from IncrementalRkiCovidStream.

class IncrementalRkiCovidStream(RkiCovidStream, ABC):
    """
    Base class for incremental RKI COVID streams.
    
    Extends RkiCovidStream with incremental sync capabilities:
    - Cursor field management
    - Stream state tracking
    - Incremental record filtering
    - State checkpoint handling
    """
    
    state_checkpoint_interval = None
    
    @property
    def cursor_field(self) -> str:
        """
        Abstract property defining the cursor field name.
        
        Must be implemented by subclasses to specify which
        field is used for incremental synchronization.
        """
        
    def get_updated_state(self, current_stream_state, latest_record):
        """
        Abstract method for updating stream state.
        
        Must be implemented by subclasses to define how
        the stream state is updated with new records.
        """

Usage Examples

Incremental Sync Setup

from source_rki_covid import SourceRkiCovid

source = SourceRkiCovid()
config = {"start_date": "2023-01-01"}

# Get historical streams
streams = source.streams(config)
historical_streams = [
    stream for stream in streams 
    if 'History' in stream.__class__.__name__ and 
       'States' not in stream.__class__.__name__
]

print(f"Historical Germany streams: {len(historical_streams)}")  # 6 streams

Reading Historical Data

# Example with initial sync (no stream state)
cases_stream = GermanyHistoryCases(config=config)

print(f"Cursor field: {cases_stream.cursor_field}")  # 'date'

# Read all records from start_date
for record in cases_stream.read_records():
    print(f"Date: {record['date']}, Cases: {record.get('cases', 0)}")

Incremental Updates

# Example with existing stream state
current_state = {"date": "2023-06-15"}

# Read only new records after the state date
for record in cases_stream.read_records(stream_state=current_state):
    print(f"New record - Date: {record['date']}")
    
    # Update state with latest record
    updated_state = cases_stream.get_updated_state(current_state, record)
    current_state = updated_state

Date Range Calculation

from datetime import datetime

# Understanding the date_to_int method
cases_stream = GermanyHistoryCases(config={"start_date": "2023-01-01"})

# Calculate days parameter for API call
days = cases_stream.date_to_int("2023-01-01")
print(f"Days parameter: {days}")

# This creates API path: germany/history/cases/{days}
path = cases_stream.path()
print(f"API path: {path}")

Data Structure

Historical streams return time-series data with records containing:

  • date: Date string in YYYY-MM-DD format (cursor field)
  • cases/deaths/recovered: Daily counts for the specific metric
  • incidence: 7-day incidence rate per 100,000 population
  • weekIncidence: Weekly incidence calculations
  • delta: Day-over-day changes in metrics
  • meta: Metadata including last update timestamps

Incremental Sync Behavior

  1. Initial Sync: Fetches all data from start_date to current date
  2. State Management: Tracks the latest date processed
  3. Incremental Updates: Only syncs records newer than the last state date
  4. API Efficiency: Uses calculated days parameter to limit API response size
  5. Data Consistency: Ensures no duplicate records through date-based filtering

Install with Tessl CLI

npx tessl i tessl/pypi-source-rki-covid

docs

current-data-streams.md

entry-points.md

historical-data-streams.md

index.md

main-source.md

state-historical-streams.md

tile.json