CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-airbyte-cdk

A framework for building Airbyte Source and Destination connectors with Python, supporting both programmatic and low-code declarative approaches.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

destination-connectors.mddocs/

Destination Connectors

Framework for building data loading connectors that write records to databases, data warehouses, files, and APIs. The Destination connector framework provides structured approaches to implementing data loading with batch processing, type mapping, error handling, and integration with Airbyte's standardized message protocol.

Capabilities

Base Destination Class

Core class for implementing destination connectors that load data into external systems.

from airbyte_cdk import Destination
from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog
from typing import Any, Iterable, Mapping
import logging

class Destination:
    """
    Base class for Airbyte destination connectors.
    """
    
    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
        """
        Test connection validity with given configuration.
        
        Args:
            logger: Logger instance for outputting messages
            config: Configuration dictionary containing connection parameters
            
        Returns:
            AirbyteConnectionStatus indicating success or failure with details
        """
    
    def write(
        self, 
        config: Mapping[str, Any], 
        configured_catalog: ConfiguredAirbyteCatalog, 
        input_messages: Iterable[AirbyteMessage]
    ) -> Iterable[AirbyteMessage]:
        """
        Write data records to the destination.
        
        Args:
            config: Configuration dictionary
            configured_catalog: Catalog specifying destination streams and sync modes
            input_messages: Stream of AirbyteMessage instances containing records to write
            
        Yields:
            AirbyteMessage instances for status updates, state, or errors
        """
    
    def spec(self) -> ConnectorSpecification:
        """
        Return the specification for this destination's configuration.
        
        Returns:
            ConnectorSpecification defining required and optional configuration fields
        """

Message Processing

Classes and utilities for processing Airbyte protocol messages.

from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, AirbyteStateMessage, Type
from typing import Any, Dict, List

class AirbyteMessage:
    """
    Airbyte protocol message containing data records, state, or metadata.
    """
    
    type: Type  # MESSAGE, RECORD, STATE, LOG, CATALOG, etc.
    record: Optional[AirbyteRecordMessage]
    state: Optional[AirbyteStateMessage]
    log: Optional[AirbyteLogMessage]

class AirbyteRecordMessage:
    """
    Data record message containing actual data to be written.
    """
    
    stream: str  # Name of the stream
    data: Dict[str, Any]  # Record data as key-value pairs
    emitted_at: int  # Timestamp when record was emitted (milliseconds)
    namespace: Optional[str]  # Optional namespace for the stream

class ConfiguredAirbyteCatalog:
    """
    Catalog describing which streams to write and how.
    """
    
    streams: List[ConfiguredAirbyteStream]

class ConfiguredAirbyteStream:
    """
    Configuration for a single stream in the destination.
    """
    
    stream: AirbyteStream  # Stream schema and metadata
    sync_mode: SyncMode  # FULL_REFRESH or INCREMENTAL
    destination_sync_mode: DestinationSyncMode  # APPEND, OVERWRITE, APPEND_DEDUP
    cursor_field: Optional[List[str]]  # Fields used for incremental sync
    primary_key: Optional[List[List[str]]]  # Fields used for deduplication

Configuration and Schema Handling

Utilities for handling destination configuration and data schemas.

from airbyte_cdk.models import ConnectorSpecification, AirbyteStream
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
from typing import Any, Dict, List, Optional

class ConnectorSpecification:
    """
    Specification defining the configuration schema for a destination.
    """
    
    connectionSpecification: Dict[str, Any]  # JSONSchema for configuration
    supportsIncremental: Optional[bool]  # Whether destination supports incremental sync
    supportsNormalization: Optional[bool]  # Whether destination supports normalization
    supportsDBT: Optional[bool]  # Whether destination supports DBT transformations
    supported_destination_sync_modes: List[DestinationSyncMode]  # Supported sync modes

def check_config_against_spec_or_exit(config: Dict[str, Any], spec: ConnectorSpecification) -> None:
    """
    Validate configuration against the connector specification.
    
    Args:
        config: Configuration dictionary to validate
        spec: Connector specification with schema
        
    Raises:
        SystemExit: If configuration is invalid
    """

class ResourceSchemaLoader:
    """
    Load JSON schemas from package resources.
    """
    
    def __init__(self, package_name: str):
        """
        Initialize schema loader.
        
        Args:
            package_name: Python package containing schema files
        """
    
    def get_schema(self, name: str) -> Dict[str, Any]:
        """
        Load schema by name.
        
        Args:
            name: Schema file name (without .json extension)
            
        Returns:
            Schema dictionary
        """

Type Mapping and Transformation

Classes for handling data type conversion and transformation.

from airbyte_cdk.sources.utils.transform import TypeTransformer, TransformConfig
from typing import Any, Dict, Mapping

class TypeTransformer:
    """
    Transform data types between Airbyte and destination formats.
    """
    
    def __init__(self, config: TransformConfig):
        """
        Initialize type transformer.
        
        Args:
            config: Configuration for type transformations
        """
    
    def transform(self, data: Dict[str, Any], schema: Dict[str, Any]) -> Dict[str, Any]:
        """
        Transform record data according to schema and configuration.
        
        Args:
            data: Record data dictionary
            schema: JSONSchema for the record
            
        Returns:
            Transformed data dictionary
        """

class TransformConfig:
    """
    Configuration for data transformations.
    """
    
    def __init__(
        self,
        date_format: str = None,
        datetime_format: str = None,
        time_format: str = None,
        normalization: Mapping[str, str] = None
    ):
        """
        Initialize transformation configuration.
        
        Args:
            date_format: Format string for date fields
            datetime_format: Format string for datetime fields
            time_format: Format string for time fields
            normalization: Field name normalization mappings
        """

Usage Examples

Basic Database Destination

from airbyte_cdk import Destination
from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, Status, Type
import logging
import sqlite3
from typing import Any, Iterable, Mapping

class SqliteDestination(Destination):
    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
        try:
            # Test database connection
            conn = sqlite3.connect(config["database_path"])
            conn.execute("SELECT 1")
            conn.close()
            return AirbyteConnectionStatus(status=Status.SUCCEEDED)
        except Exception as e:
            return AirbyteConnectionStatus(status=Status.FAILED, message=str(e))
    
    def write(
        self, 
        config: Mapping[str, Any], 
        configured_catalog: ConfiguredAirbyteCatalog, 
        input_messages: Iterable[AirbyteMessage]
    ) -> Iterable[AirbyteMessage]:
        
        conn = sqlite3.connect(config["database_path"])
        
        try:
            for message in input_messages:
                if message.type == Type.RECORD:
                    self._write_record(conn, message.record)
                elif message.type == Type.STATE:
                    # Pass through state messages
                    yield message
                
                # Yield message to maintain protocol
                yield message
        finally:
            conn.close()
    
    def _write_record(self, conn: sqlite3.Connection, record):
        table_name = record.stream
        data = record.data
        
        # Create table if not exists
        self._ensure_table_exists(conn, table_name, data)
        
        # Insert record
        columns = list(data.keys())
        values = list(data.values())
        placeholders = ",".join(["?" for _ in values])
        
        query = f"INSERT INTO {table_name} ({','.join(columns)}) VALUES ({placeholders})"
        conn.execute(query, values)
        conn.commit()
    
    def _ensure_table_exists(self, conn: sqlite3.Connection, table_name: str, sample_data: dict):
        # Simple table creation based on sample data
        columns = []
        for key, value in sample_data.items():
            if isinstance(value, int):
                columns.append(f"{key} INTEGER")
            elif isinstance(value, float):
                columns.append(f"{key} REAL")
            else:
                columns.append(f"{key} TEXT")
        
        create_sql = f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(columns)})"
        conn.execute(create_sql)
        conn.commit()

# Usage
destination = SqliteDestination()
config = {"database_path": "/path/to/database.db"}
status = destination.check(logging.getLogger(), config)

Batch Processing Destination

from airbyte_cdk import Destination
from airbyte_cdk.models import AirbyteMessage, Type
import json
from typing import Any, Dict, Iterable, List, Mapping

class BatchFileDestination(Destination):
    def __init__(self):
        self._buffer = {}  # stream_name -> list of records
        self._batch_size = 1000
    
    def write(
        self, 
        config: Mapping[str, Any], 
        configured_catalog, 
        input_messages: Iterable[AirbyteMessage]
    ) -> Iterable[AirbyteMessage]:
        
        for message in input_messages:
            if message.type == Type.RECORD:
                self._buffer_record(message.record)
                
                # Check if batch is ready
                if len(self._buffer[message.record.stream]) >= self._batch_size:
                    self._flush_batch(config, message.record.stream)
            
            yield message
        
        # Flush remaining records
        for stream_name in self._buffer:
            if self._buffer[stream_name]:
                self._flush_batch(config, stream_name)
    
    def _buffer_record(self, record):
        stream_name = record.stream
        if stream_name not in self._buffer:
            self._buffer[stream_name] = []
        
        self._buffer[stream_name].append({
            "data": record.data,
            "emitted_at": record.emitted_at
        })
    
    def _flush_batch(self, config: Mapping[str, Any], stream_name: str):
        if not self._buffer[stream_name]:
            return
        
        output_path = f"{config['output_dir']}/{stream_name}.jsonl"
        
        with open(output_path, "a") as f:
            for record in self._buffer[stream_name]:
                f.write(json.dumps(record) + "\n")
        
        self._buffer[stream_name] = []
        print(f"Flushed batch for stream {stream_name}")

Type-Safe Destination with Schema Validation

from airbyte_cdk import Destination
from airbyte_cdk.sources.utils.transform import TypeTransformer, TransformConfig
from airbyte_cdk.models import AirbyteMessage, Type
import jsonschema
from typing import Any, Dict, Iterable, Mapping

class ValidatingDestination(Destination):
    def __init__(self):
        self._transformer = TypeTransformer(TransformConfig(
            date_format="%Y-%m-%d",
            datetime_format="%Y-%m-%dT%H:%M:%S",
        ))
        self._schemas = {}  # stream_name -> schema
    
    def write(
        self, 
        config: Mapping[str, Any], 
        configured_catalog, 
        input_messages: Iterable[AirbyteMessage]
    ) -> Iterable[AirbyteMessage]:
        
        # Load schemas from catalog
        for stream_config in configured_catalog.streams:
            stream_name = stream_config.stream.name
            self._schemas[stream_name] = stream_config.stream.json_schema
        
        for message in input_messages:
            if message.type == Type.RECORD:
                try:
                    # Validate and transform record
                    validated_record = self._validate_and_transform(message.record)
                    self._write_validated_record(config, validated_record)
                except Exception as e:
                    # Log validation error but don't stop processing
                    print(f"Validation error for record in {message.record.stream}: {e}")
            
            yield message
    
    def _validate_and_transform(self, record) -> Dict[str, Any]:
        stream_name = record.stream
        schema = self._schemas.get(stream_name)
        
        if schema:
            # Validate against schema
            jsonschema.validate(record.data, schema)
            
            # Transform data types
            transformed_data = self._transformer.transform(record.data, schema)
            return {
                "stream": stream_name,
                "data": transformed_data,
                "emitted_at": record.emitted_at
            }
        
        return {
            "stream": stream_name,
            "data": record.data,
            "emitted_at": record.emitted_at
        }
    
    def _write_validated_record(self, config: Mapping[str, Any], record: Dict[str, Any]):
        # Write the validated and transformed record
        pass

API Destination with Error Handling

from airbyte_cdk import Destination
from airbyte_cdk.models import AirbyteMessage, Type
import requests
import time
from typing import Any, Iterable, Mapping

class ApiDestination(Destination):
    def write(
        self, 
        config: Mapping[str, Any], 
        configured_catalog, 
        input_messages: Iterable[AirbyteMessage]
    ) -> Iterable[AirbyteMessage]:
        
        session = requests.Session()
        session.headers.update({
            "Authorization": f"Bearer {config['api_token']}",
            "Content-Type": "application/json"
        })
        
        for message in input_messages:
            if message.type == Type.RECORD:
                success = self._write_record_with_retry(
                    session, 
                    config, 
                    message.record
                )
                
                if not success:
                    print(f"Failed to write record to API: {message.record.stream}")
            
            yield message
    
    def _write_record_with_retry(self, session: requests.Session, config: Mapping[str, Any], record, max_retries: int = 3) -> bool:
        url = f"{config['api_base_url']}/data/{record.stream}"
        payload = {
            "data": record.data,
            "timestamp": record.emitted_at
        }
        
        for attempt in range(max_retries + 1):
            try:
                response = session.post(url, json=payload, timeout=30)
                
                if response.status_code == 200:
                    return True
                elif response.status_code == 429:  # Rate limited
                    wait_time = int(response.headers.get("Retry-After", 60))
                    time.sleep(wait_time)
                    continue
                elif response.status_code >= 500:  # Server error, retry
                    time.sleep(2 ** attempt)  # Exponential backoff
                    continue
                else:
                    print(f"API error {response.status_code}: {response.text}")
                    return False
                    
            except requests.exceptions.RequestException as e:
                print(f"Request failed (attempt {attempt + 1}): {e}")
                if attempt < max_retries:
                    time.sleep(2 ** attempt)
                
        return False

Install with Tessl CLI

npx tessl i tessl/pypi-airbyte-cdk

docs

declarative-cdk.md

destination-connectors.md

index.md

source-connectors.md

tile.json