or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

declarative-cdk.mddestination-connectors.mdindex.mdsource-connectors.md
tile.json

tessl/pypi-airbyte-cdk

A framework for building Airbyte Source and Destination connectors with Python, supporting both programmatic and low-code declarative approaches.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/airbyte-cdk@3.1.x

To install, run

npx @tessl/cli install tessl/pypi-airbyte-cdk@3.1.0

index.mddocs/

Airbyte CDK

The Airbyte CDK (Connector Development Kit) is a Python framework for building Airbyte Source and Destination connectors. It provides both programmatic and low-code declarative approaches to creating data integration connectors, enabling developers to build reliable data extraction and loading solutions for various APIs, databases, and data sources.

The CDK offers three main approaches: traditional programmatic connector development using Python classes, low-code declarative manifests using YAML configuration, and destination connector development for data loading scenarios.

Package Information

  • Package Name: airbyte-cdk
  • Version: 3.1.0
  • Language: Python
  • Installation: pip install airbyte-cdk
  • Python Version: 3.9+
  • Framework Type: Connector Development Kit

Core Imports

Main connector classes:

from airbyte_cdk import Source, Destination
from airbyte_cdk.entrypoint import launch

Low-code declarative sources:

from airbyte_cdk import YamlDeclarativeSource, ManifestDeclarativeSource

HTTP streams and utilities:

from airbyte_cdk import HttpStream, HttpSubStream
from airbyte_cdk import TokenAuthenticator, Oauth2Authenticator

Protocol models:

from airbyte_cdk import AirbyteMessage, ConfiguredAirbyteCatalog, AirbyteStream

Basic Usage

Creating a Source Connector

from typing import Any, Iterable, List, Mapping, Optional, Tuple
from airbyte_cdk import Source
from airbyte_cdk.models import AirbyteCatalog, AirbyteMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.sources.streams import Stream
import logging

class MySource(Source):
    def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
        """
        Test connection to the source.
        Returns (True, None) if successful, (False, error_message) otherwise.
        """
        try:
            # Test connection logic here
            return True, None
        except Exception as e:
            return False, str(e)
    
    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
        """
        Return list of streams for this source.
        """
        return [MyStream(config=config)]
    
    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
        """
        Discover available streams and their schemas.
        """
        return AirbyteCatalog(streams=[stream.as_airbyte_stream() for stream in self.streams(config)])

# Launch the connector
if __name__ == "__main__":
    from airbyte_cdk.entrypoint import launch
    launch(MySource(), sys.argv[1:])

Using Low-Code Declarative Sources

from airbyte_cdk import YamlDeclarativeSource

class MyDeclarativeSource(YamlDeclarativeSource):
    def __init__(self):
        super().__init__(path_to_yaml="manifest.yaml")

# The manifest.yaml file defines streams, authentication, and data transformation

Creating a Destination Connector

from typing import Any, Iterable, Mapping
from airbyte_cdk import Destination
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog

class MyDestination(Destination):
    def write(
        self, 
        config: Mapping[str, Any], 
        configured_catalog: ConfiguredAirbyteCatalog, 
        input_messages: Iterable[AirbyteMessage]
    ) -> Iterable[AirbyteMessage]:
        """
        Write data records to the destination.
        """
        for message in input_messages:
            if message.type == Type.RECORD:
                # Process and write the record
                self._write_record(message.record)
            yield message

Architecture

The Airbyte CDK follows a layered architecture designed for extensibility and reusability:

Core Components

  1. Base Connectors: Abstract Source and Destination classes providing the foundational interface
  2. Stream Framework: HttpStream and Stream classes for data extraction with built-in state management
  3. Authentication Layer: Various authenticators for OAuth2, API tokens, and custom authentication schemes
  4. Declarative Framework: YAML-based low-code approach for building connectors without Python code
  5. Protocol Models: Pydantic models implementing the Airbyte Protocol for message exchange
  6. Utilities: Helper functions for configuration, schema handling, and data transformation

Stream Types

  • HTTP Streams: For REST API data sources with pagination, authentication, and error handling
  • Incremental Streams: Support for state-based incremental synchronization
  • Sub-streams: Nested data extraction from parent-child relationships
  • Concurrent Streams: High-throughput data extraction with parallel processing

Capabilities

Source Connectors

Framework for building data extraction connectors with support for HTTP APIs, databases, and files. Includes stream management, incremental sync, authentication, error handling, and state management.

Destination Connectors

Framework for building data loading connectors that write records to databases, data warehouses, files, and APIs. Provides batch processing, type mapping, and error handling capabilities.

Declarative Low-Code CDK

YAML-based framework for building connectors without writing Python code. Supports most common connector patterns through declarative configuration with authentication, pagination, transformations, and incremental sync.

Quick Start Example

Complete example of building and running a connector:

import sys
from airbyte_cdk import Source, HttpStream, launch
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator

class UsersStream(HttpStream):
    url_base = "https://api.example.com/"
    primary_key = "id"
    
    def __init__(self, config: dict):
        super().__init__()
        self._config = config
    
    def path(self, **kwargs) -> str:
        return "users"
    
    def request_headers(self, **kwargs) -> Mapping[str, Any]:
        return {"Authorization": f"Bearer {self._config['api_token']}"}
    
    def parse_response(self, response, **kwargs) -> Iterable[Mapping]:
        data = response.json()
        for record in data.get("users", []):
            yield record

class ExampleSource(Source):
    def check_connection(self, logger, config):
        return True, None
    
    def streams(self, config):
        return [UsersStream(config)]

if __name__ == "__main__":
    launch(ExampleSource(), sys.argv[1:])

This example demonstrates a basic HTTP source connector that extracts user data from an API with token authentication.