CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-source-rki-covid

Airbyte source connector for RKI COVID-19 data from the German Robert Koch-Institut API

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

Source RKI COVID

An Airbyte source connector for accessing COVID-19 data from the German Robert Koch-Institut (RKI) public API. This connector provides comprehensive access to German epidemiological data including national statistics, state-level breakdowns, age group analyses, and historical trends across multiple metrics.

Package Information

  • Package Name: source-rki-covid
  • Package Type: Python (Airbyte connector)
  • Language: Python
  • Installation: Available through Airbyte connector catalog
  • Dependencies: airbyte-cdk 0.80.0

Core Imports

from source_rki_covid import SourceRkiCovid

For running the connector:

from source_rki_covid.run import run

Basic Usage

from source_rki_covid import SourceRkiCovid

# Initialize the source
source = SourceRkiCovid()

# Configuration (requires start_date)
config = {
    "start_date": "2023-01-01"  # UTC date in YYYY-MM-DD format
}

# Check connection
is_valid, error = source.check_connection(logger=None, config=config)

# Get available streams
streams = source.streams(config)
print(f"Available streams: {len(streams)}")  # Returns 16 streams

Architecture

The connector follows Airbyte's standard source architecture using the Airbyte CDK:

  • SourceRkiCovid: Main source class inheriting from AbstractSource
  • Stream Classes: 16 specialized stream classes for different data endpoints
  • Base Classes: Abstract base classes providing common functionality
  • Configuration: Simple schema requiring only a start_date parameter

The connector organizes COVID-19 data into logical streams covering:

  • Current snapshots (Germany, states, age groups)
  • Historical trends with incremental sync capability
  • State-level historical data with full refresh

Capabilities

Main Source Interface

The primary connector interface providing connection testing and stream discovery functionality.

class SourceRkiCovid(AbstractSource):
    def check_connection(self, logger, config) -> Tuple[bool, any]: ...
    def streams(self, config: Mapping[str, Any]) -> List[Stream]: ...

Main Source

Current Data Streams

Full-refresh streams providing current COVID-19 statistics snapshots for Germany overall, individual states, and age group breakdowns.

class Germany(RkiCovidStream): ...
class GermanyStates(RkiCovidStream): ...
class GermanyAgeGroups(RkiCovidStream): ...
class GermanyStatesAgeGroups(RkiCovidStream): ...

Current Data Streams

Historical Data Streams (Germany)

Incremental streams providing historical COVID-19 data for Germany with date-based cursor synchronization. Supports cases, deaths, recovered, incidence, frozen incidence, and hospitalization metrics.

class GermanyHistoryCases(IncrementalRkiCovidStream): ...
class GermanHistoryIncidence(IncrementalRkiCovidStream): ...
class GermanHistoryDeaths(IncrementalRkiCovidStream): ...
class GermanHistoryRecovered(IncrementalRkiCovidStream): ...
class GermanHistoryFrozenIncidence(IncrementalRkiCovidStream): ...
class GermanHistoryHospitalization(IncrementalRkiCovidStream): ...

Historical Data Streams (Germany)

State Historical Data Streams

Full-refresh streams providing historical COVID-19 data for all German states. Covers the same metrics as Germany historical streams but with state-level granularity and different sync behavior.

class StatesHistoryCases(ByStateRkiCovidStream): ...
class StatesHistoryIncidence(ByStateRkiCovidStream): ...
class StatesHistoryFrozenIncidence(ByStateRkiCovidStream): ...
class StatesHistoryDeaths(ByStateRkiCovidStream): ...
class StatesHistoryRecovered(ByStateRkiCovidStream): ...
class StatesHistoryHospitalization(ByStateRkiCovidStream): ...

State Historical Data Streams

Entry Point Functions

Console script entry point and programmatic execution functions for running the connector.

def run(): ...

Entry Point Functions

Configuration Schema

{
    "start_date": {
        "type": "string",
        "title": "Start Date", 
        "description": "UTC date in the format 2017-01-25. Any data before this date will not be replicated.",
        "required": True
    }
}

Types

from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
from abc import ABC
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
import requests

Stream Overview

The connector provides 16 total streams organized into three categories:

  • 4 Current Data Streams: Full-refresh snapshots of current statistics
  • 6 Historical Germany Streams: Incremental sync of historical national data
  • 6 State Historical Streams: Full-refresh historical data for all states

All streams connect to the RKI COVID API at https://api.corona-zahlen.org/ and provide comprehensive coverage of German COVID-19 epidemiological data for analytics, reporting, and research applications.

Install with Tessl CLI

npx tessl i tessl/pypi-source-rki-covid
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/source-rki-covid@0.1.x
Publish Source
CLI
Badge
tessl/pypi-source-rki-covid badge