CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-source-rki-covid

Airbyte source connector for RKI COVID-19 data from the German Robert Koch-Institut API

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

main-source.mddocs/

Main Source Interface

The primary connector interface providing connection testing and stream discovery functionality. SourceRkiCovid is the main entry point that manages all available data streams and validates connectivity.

Capabilities

Source Class

Main source connector class that coordinates access to all RKI COVID-19 data streams.

class SourceRkiCovid(AbstractSource):
    """
    Main source connector for RKI COVID-19 data.
    
    Inherits from airbyte_cdk.sources.AbstractSource and provides
    standardized Airbyte connector functionality.
    """

Connection Testing

Validates connectivity to the RKI COVID-19 API before attempting data synchronization.

def check_connection(self, logger, config) -> Tuple[bool, any]:
    """
    Test connection availability for the connector.

    Parameters:
    - logger: Logger object for recording connection test results
    - config: User-input configuration object conforming to spec.json

    Returns:
    Tuple[bool, any]: (True, None) if connection successful, 
                     (False, error_message) if connection fails

    Implementation:
    Tests connectivity by making a GET request to the Germany endpoint
    at https://api.corona-zahlen.org/germany and checking for 200 status.
    """

Stream Discovery

Returns the complete list of all 16 available data streams configured with the provided settings.

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
    """
    Discover and return all available data streams.

    Parameters:
    - config: User configuration mapping containing start_date

    Returns:
    List[Stream]: List of 16 configured stream objects including:
    - 4 current data streams (full-refresh)
    - 6 historical Germany streams (incremental)  
    - 6 state historical streams (full-refresh)

    Stream Categories:
    - Germany current statistics
    - Germany age groups
    - Germany states current statistics  
    - Germany states age groups
    - Historical Germany data (cases, deaths, recovered, incidence, frozen incidence, hospitalization)
    - Historical states data (same metrics as Germany historical)
    """

Usage Examples

Basic Source Initialization

from source_rki_covid import SourceRkiCovid

# Create source instance
source = SourceRkiCovid()

# Minimal configuration
config = {
    "start_date": "2023-01-01"
}

Connection Testing

# Test connection before running sync
is_valid, error = source.check_connection(logger=None, config=config)

if is_valid:
    print("Connection successful!")
else:
    print(f"Connection failed: {error}")

Stream Discovery

# Get all available streams
streams = source.streams(config)

print(f"Total streams: {len(streams)}")  # 16 streams

# Examine stream types
for stream in streams:
    print(f"Stream: {stream.__class__.__name__}")
    print(f"Primary key: {stream.primary_key}")
    
    # Check if incremental
    if hasattr(stream, 'cursor_field'):
        print(f"Cursor field: {stream.cursor_field}")

Dependencies

  • airbyte_cdk.sources.AbstractSource: Base class providing standard connector interface
  • requests: HTTP client for API communication (used internally)
  • RKI COVID API: External API at https://api.corona-zahlen.org/

Install with Tessl CLI

npx tessl i tessl/pypi-source-rki-covid

docs

current-data-streams.md

entry-points.md

historical-data-streams.md

index.md

main-source.md

state-historical-streams.md

tile.json