A framework for building Airbyte Source and Destination connectors with Python, supporting both programmatic and low-code declarative approaches.
npx @tessl/cli install tessl/pypi-airbyte-cdk@3.1.0The Airbyte CDK (Connector Development Kit) is a Python framework for building Airbyte Source and Destination connectors. It provides both programmatic and low-code declarative approaches to creating data integration connectors, enabling developers to build reliable data extraction and loading solutions for various APIs, databases, and data sources.
The CDK offers three main approaches: traditional programmatic connector development using Python classes, low-code declarative manifests using YAML configuration, and destination connector development for data loading scenarios.
pip install airbyte-cdkMain connector classes:
from airbyte_cdk import Source, Destination
from airbyte_cdk.entrypoint import launchLow-code declarative sources:
from airbyte_cdk import YamlDeclarativeSource, ManifestDeclarativeSourceHTTP streams and utilities:
from airbyte_cdk import HttpStream, HttpSubStream
from airbyte_cdk import TokenAuthenticator, Oauth2AuthenticatorProtocol models:
from airbyte_cdk import AirbyteMessage, ConfiguredAirbyteCatalog, AirbyteStreamfrom typing import Any, Iterable, List, Mapping, Optional, Tuple
from airbyte_cdk import Source
from airbyte_cdk.models import AirbyteCatalog, AirbyteMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.sources.streams import Stream
import logging
class MySource(Source):
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
"""
Test connection to the source.
Returns (True, None) if successful, (False, error_message) otherwise.
"""
try:
# Test connection logic here
return True, None
except Exception as e:
return False, str(e)
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
Return list of streams for this source.
"""
return [MyStream(config=config)]
def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
"""
Discover available streams and their schemas.
"""
return AirbyteCatalog(streams=[stream.as_airbyte_stream() for stream in self.streams(config)])
# Launch the connector
if __name__ == "__main__":
from airbyte_cdk.entrypoint import launch
launch(MySource(), sys.argv[1:])from airbyte_cdk import YamlDeclarativeSource
class MyDeclarativeSource(YamlDeclarativeSource):
def __init__(self):
super().__init__(path_to_yaml="manifest.yaml")
# The manifest.yaml file defines streams, authentication, and data transformationfrom typing import Any, Iterable, Mapping
from airbyte_cdk import Destination
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog
class MyDestination(Destination):
def write(
self,
config: Mapping[str, Any],
configured_catalog: ConfiguredAirbyteCatalog,
input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:
"""
Write data records to the destination.
"""
for message in input_messages:
if message.type == Type.RECORD:
# Process and write the record
self._write_record(message.record)
yield messageThe Airbyte CDK follows a layered architecture designed for extensibility and reusability:
Source and Destination classes providing the foundational interfaceHttpStream and Stream classes for data extraction with built-in state managementFramework for building data extraction connectors with support for HTTP APIs, databases, and files. Includes stream management, incremental sync, authentication, error handling, and state management.
Framework for building data loading connectors that write records to databases, data warehouses, files, and APIs. Provides batch processing, type mapping, and error handling capabilities.
YAML-based framework for building connectors without writing Python code. Supports most common connector patterns through declarative configuration with authentication, pagination, transformations, and incremental sync.
Complete example of building and running a connector:
import sys
from airbyte_cdk import Source, HttpStream, launch
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
class UsersStream(HttpStream):
url_base = "https://api.example.com/"
primary_key = "id"
def __init__(self, config: dict):
super().__init__()
self._config = config
def path(self, **kwargs) -> str:
return "users"
def request_headers(self, **kwargs) -> Mapping[str, Any]:
return {"Authorization": f"Bearer {self._config['api_token']}"}
def parse_response(self, response, **kwargs) -> Iterable[Mapping]:
data = response.json()
for record in data.get("users", []):
yield record
class ExampleSource(Source):
def check_connection(self, logger, config):
return True, None
def streams(self, config):
return [UsersStream(config)]
if __name__ == "__main__":
launch(ExampleSource(), sys.argv[1:])This example demonstrates a basic HTTP source connector that extracts user data from an API with token authentication.