Airbyte source connector for RKI COVID-19 data from the German Robert Koch-Institut API
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
The primary connector interface providing connection testing and stream discovery functionality. SourceRkiCovid is the main entry point that manages all available data streams and validates connectivity.
Main source connector class that coordinates access to all RKI COVID-19 data streams.
class SourceRkiCovid(AbstractSource):
"""
Main source connector for RKI COVID-19 data.
Inherits from airbyte_cdk.sources.AbstractSource and provides
standardized Airbyte connector functionality.
"""Validates connectivity to the RKI COVID-19 API before attempting data synchronization.
def check_connection(self, logger, config) -> Tuple[bool, any]:
"""
Test connection availability for the connector.
Parameters:
- logger: Logger object for recording connection test results
- config: User-input configuration object conforming to spec.json
Returns:
Tuple[bool, any]: (True, None) if connection successful,
(False, error_message) if connection fails
Implementation:
Tests connectivity by making a GET request to the Germany endpoint
at https://api.corona-zahlen.org/germany and checking for 200 status.
"""Returns the complete list of all 16 available data streams configured with the provided settings.
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
Discover and return all available data streams.
Parameters:
- config: User configuration mapping containing start_date
Returns:
List[Stream]: List of 16 configured stream objects including:
- 4 current data streams (full-refresh)
- 6 historical Germany streams (incremental)
- 6 state historical streams (full-refresh)
Stream Categories:
- Germany current statistics
- Germany age groups
- Germany states current statistics
- Germany states age groups
- Historical Germany data (cases, deaths, recovered, incidence, frozen incidence, hospitalization)
- Historical states data (same metrics as Germany historical)
"""from source_rki_covid import SourceRkiCovid
# Create source instance
source = SourceRkiCovid()
# Minimal configuration
config = {
"start_date": "2023-01-01"
}# Test connection before running sync
is_valid, error = source.check_connection(logger=None, config=config)
if is_valid:
print("Connection successful!")
else:
print(f"Connection failed: {error}")# Get all available streams
streams = source.streams(config)
print(f"Total streams: {len(streams)}") # 16 streams
# Examine stream types
for stream in streams:
print(f"Stream: {stream.__class__.__name__}")
print(f"Primary key: {stream.primary_key}")
# Check if incremental
if hasattr(stream, 'cursor_field'):
print(f"Cursor field: {stream.cursor_field}")Install with Tessl CLI
npx tessl i tessl/pypi-source-rki-covid