A framework for building Airbyte Source and Destination connectors with Python, supporting both programmatic and low-code declarative approaches.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
YAML-based framework for building connectors without writing Python code. The Declarative CDK supports most common connector patterns through declarative configuration with authentication, pagination, transformations, and incremental sync. This approach allows developers to create robust data connectors using only configuration files.
Core classes for creating low-code connectors using YAML manifests.
from airbyte_cdk import YamlDeclarativeSource, ManifestDeclarativeSource
from typing import Any, Dict, List, Mapping
class YamlDeclarativeSource(ManifestDeclarativeSource):
"""
Declarative source defined by a YAML file.
"""
def __init__(self, path_to_yaml: str, debug: bool = False):
"""
Initialize source from YAML manifest file.
Args:
path_to_yaml: Path to the YAML manifest file
debug: Enable debug logging for manifest parsing
"""
class ManifestDeclarativeSource:
"""
Declarative source defined by a manifest of low-code components.
"""
def __init__(
self,
source_config: Dict[str, Any],
debug: bool = False,
emit_connector_builder_messages: bool = False
):
"""
Initialize source from manifest configuration.
Args:
source_config: The manifest configuration dictionary
debug: Enable debug logging
emit_connector_builder_messages: Enable Connector Builder specific messages
"""Declarative stream definitions using YAML configuration.
from airbyte_cdk import DeclarativeStream
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
from airbyte_cdk.sources.declarative.requesters import HttpRequester
class DeclarativeStream:
"""
Stream defined through declarative configuration.
"""
def __init__(
self,
name: str,
retriever: SimpleRetriever,
config: Mapping[str, Any],
parameters: Mapping[str, Any]
):
"""
Initialize declarative stream.
Args:
name: Stream name
retriever: Data retriever component
config: Stream configuration
parameters: Runtime parameters
"""
class SimpleRetriever:
"""
Retriever that extracts data from HTTP responses.
"""
def __init__(
self,
requester: HttpRequester,
record_selector,
paginator = None,
partition_router = None
):
"""
Initialize simple retriever.
Args:
requester: HTTP requester for making API calls
record_selector: Component to extract records from responses
paginator: Optional pagination handler
partition_router: Optional partitioning strategy
"""Components for making HTTP requests with authentication and error handling.
from airbyte_cdk import HttpRequester
from airbyte_cdk.sources.declarative.requesters.request_options import RequestOption
from airbyte_cdk.sources.declarative.auth import DeclarativeAuthenticator
class HttpRequester:
"""
HTTP requester for declarative streams.
"""
def __init__(
self,
name: str,
url_base: str,
path: str,
http_method: str = "GET",
request_options_provider = None,
authenticator: DeclarativeAuthenticator = None,
error_handler = None,
config: Mapping[str, Any] = None,
parameters: Mapping[str, Any] = None
):
"""
Initialize HTTP requester.
Args:
name: Name of the requester
url_base: Base URL for requests
path: Request path template
http_method: HTTP method (GET, POST, etc.)
request_options_provider: Provider for request options
authenticator: Authentication handler
error_handler: Error handling strategy
config: Configuration mapping
parameters: Runtime parameters
"""
class RequestOption:
"""
Configuration for request options (headers, params, body).
"""
def __init__(
self,
field_name: str,
inject_into: str, # "request_parameter", "header", "body_data", "body_json"
value: Any
):
"""
Initialize request option.
Args:
field_name: Name of the option field
inject_into: Where to inject the option
value: Value or value template
"""Declarative authentication handlers for various schemes.
from airbyte_cdk import (
NoAuth,
BasicHttpAuthenticator,
BearerAuthenticator,
ApiKeyAuthenticator,
DeclarativeOauth2Authenticator
)
class NoAuth:
"""
No authentication required.
"""
pass
class BasicHttpAuthenticator:
"""
HTTP Basic authentication.
"""
def __init__(self, username: str, password: str):
"""
Initialize basic authentication.
Args:
username: Username for authentication
password: Password for authentication
"""
class BearerAuthenticator:
"""
Bearer token authentication.
"""
def __init__(self, api_token: str):
"""
Initialize bearer token auth.
Args:
api_token: Bearer token for authentication
"""
class ApiKeyAuthenticator:
"""
API key authentication via header or query parameter.
"""
def __init__(
self,
api_token: str,
header: str = None,
request_param: str = None
):
"""
Initialize API key authentication.
Args:
api_token: API key value
header: Header name for API key (if using header)
request_param: Parameter name for API key (if using query param)
"""
class DeclarativeOauth2Authenticator:
"""
OAuth 2.0 authentication with token refresh.
"""
def __init__(
self,
token_refresh_endpoint: str,
client_id: str,
client_secret: str,
refresh_token: str,
scopes: List[str] = None,
token_expiry_date: str = None,
access_token: str = None,
refresh_request_body: Dict[str, Any] = None
):
"""
Initialize OAuth2 authentication.
Args:
token_refresh_endpoint: URL for refreshing tokens
client_id: OAuth client ID
client_secret: OAuth client secret
refresh_token: Refresh token
scopes: OAuth scopes
token_expiry_date: When current token expires
access_token: Current access token
refresh_request_body: Additional refresh request parameters
"""Components for handling API pagination patterns.
from airbyte_cdk import DefaultPaginator
from airbyte_cdk.sources.declarative.requesters.paginators.strategies import (
OffsetIncrement,
PageIncrement,
CursorPaginationStrategy
)
class DefaultPaginator:
"""
Default paginator with configurable pagination strategy.
"""
def __init__(
self,
pagination_strategy,
page_size_option: RequestOption = None,
page_token_option: RequestOption = None,
config: Mapping[str, Any] = None,
parameters: Mapping[str, Any] = None
):
"""
Initialize paginator.
Args:
pagination_strategy: Strategy for extracting pagination info
page_size_option: How to specify page size in requests
page_token_option: How to specify page token in requests
config: Configuration mapping
parameters: Runtime parameters
"""
class OffsetIncrement:
"""
Pagination using offset-based increments.
"""
def __init__(self, page_size: int, offset_param: str = "offset"):
"""
Initialize offset pagination.
Args:
page_size: Number of records per page
offset_param: Parameter name for offset
"""
class PageIncrement:
"""
Pagination using page number increments.
"""
def __init__(self, page_size: int = None, start_from_page: int = 1):
"""
Initialize page-based pagination.
Args:
page_size: Number of records per page
start_from_page: Starting page number
"""
class CursorPaginationStrategy:
"""
Pagination using cursor tokens.
"""
def __init__(
self,
cursor_value: str,
stop_condition: str = None,
page_size: int = None
):
"""
Initialize cursor pagination.
Args:
cursor_value: JSONPath to extract cursor from response
stop_condition: Condition to stop pagination
page_size: Number of records per page
"""Components for extracting and transforming records from API responses.
from airbyte_cdk import RecordSelector, DpathExtractor
from airbyte_cdk.sources.declarative.transformations import AddFields, RecordTransformation
class RecordSelector:
"""
Selects records from API responses.
"""
def __init__(
self,
extractor: DpathExtractor,
record_filter = None,
transformations: List[RecordTransformation] = None
):
"""
Initialize record selector.
Args:
extractor: Component to extract records from responses
record_filter: Optional filter for records
transformations: List of transformations to apply
"""
class DpathExtractor:
"""
Extracts records using JSONPath expressions.
"""
def __init__(self, field_path: List[str], config: Mapping[str, Any] = None):
"""
Initialize dpath extractor.
Args:
field_path: JSONPath to records in response
config: Configuration mapping
"""
class AddFields:
"""
Transformation that adds fields to records.
"""
def __init__(self, fields: List[dict]):
"""
Initialize add fields transformation.
Args:
fields: List of field definitions to add
"""version: "0.29.0"
type: DeclarativeSource
check:
type: CheckStream
stream_names: ["users"]
streams:
- type: DeclarativeStream
name: users
primary_key: ["id"]
retriever:
type: SimpleRetriever
requester:
type: HttpRequester
url_base: "https://api.example.com/v1/"
path: "users"
http_method: "GET"
authenticator:
type: BearerAuthenticator
api_token: "{{ config['api_token'] }}"
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: ["data"]
spec:
type: Spec
connection_specification:
type: object
properties:
api_token:
type: string
title: API Token
airbyte_secret: true
required:
- api_tokenfrom airbyte_cdk import YamlDeclarativeSource
class ExampleSource(YamlDeclarativeSource):
def __init__(self):
super().__init__(path_to_yaml="manifest.yaml")
# The source is now fully functional with the YAML manifestversion: "0.29.0"
type: DeclarativeSource
definitions:
base_requester:
type: HttpRequester
url_base: "https://api.example.com/v1/"
authenticator:
type: BearerAuthenticator
api_token: "{{ config['api_token'] }}"
base_retriever:
type: SimpleRetriever
requester:
$ref: "#/definitions/base_requester"
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: ["data"]
paginator:
type: DefaultPaginator
pagination_strategy:
type: CursorPaginationStrategy
cursor_value: "{{ response.get('next_page_token') }}"
stop_condition: "{{ not response.get('next_page_token') }}"
page_token_option:
type: RequestOption
field_name: "page_token"
inject_into: "request_parameter"
streams:
- type: DeclarativeStream
name: orders
primary_key: ["id"]
retriever:
$ref: "#/definitions/base_retriever"
requester:
$ref: "#/definitions/base_requester"
path: "orders"
request_parameters:
updated_since: "{{ stream_state.get('updated_at', config.get('start_date')) }}"
incremental_sync:
type: DatetimeBasedCursor
cursor_field: "updated_at"
datetime_format: "%Y-%m-%dT%H:%M:%SZ"
cursor_granularity: "PT1S"
start_datetime:
type: MinMaxDatetime
datetime: "{{ config['start_date'] }}"
datetime_format: "%Y-%m-%d"
check:
type: CheckStream
stream_names: ["orders"]
spec:
type: Spec
connection_specification:
type: object
properties:
api_token:
type: string
title: API Token
airbyte_secret: true
start_date:
type: string
title: Start Date
format: date
pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}$"
required:
- api_token
- start_dateversion: "0.29.0"
type: DeclarativeSource
definitions:
oauth_authenticator:
type: DeclarativeOauth2Authenticator
token_refresh_endpoint: "https://api.example.com/oauth/token"
client_id: "{{ config['client_id'] }}"
client_secret: "{{ config['client_secret'] }}"
refresh_token: "{{ config['refresh_token'] }}"
scopes: ["read:users", "read:data"]
streams:
- type: DeclarativeStream
name: protected_data
retriever:
type: SimpleRetriever
requester:
type: HttpRequester
url_base: "https://api.example.com/v1/"
path: "protected"
authenticator:
$ref: "#/definitions/oauth_authenticator"
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: ["items"]
spec:
type: Spec
connection_specification:
type: object
properties:
client_id:
type: string
title: Client ID
client_secret:
type: string
title: Client Secret
airbyte_secret: true
refresh_token:
type: string
title: Refresh Token
airbyte_secret: true
required:
- client_id
- client_secret
- refresh_tokenstreams:
- type: DeclarativeStream
name: transformed_users
retriever:
type: SimpleRetriever
requester:
type: HttpRequester
url_base: "https://api.example.com/v1/"
path: "users"
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: ["users"]
transformations:
- type: AddFields
fields:
- path: ["source"]
value: "api"
- path: ["extracted_at"]
value: "{{ now_utc() }}"
- path: ["full_name"]
value: "{{ record['first_name'] }} {{ record['last_name'] }}"streams:
- type: DeclarativeStream
name: users
primary_key: ["id"]
retriever:
type: SimpleRetriever
requester:
type: HttpRequester
url_base: "https://api.example.com/v1/"
path: "users"
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: ["users"]
- type: DeclarativeStream
name: user_posts
primary_key: ["id"]
retriever:
type: SimpleRetriever
requester:
type: HttpRequester
url_base: "https://api.example.com/v1/"
path: "users/{{ stream_slice.user_id }}/posts"
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: ["posts"]
partition_router:
type: SubstreamPartitionRouter
parent_stream_configs:
- stream: "#/streams/0" # Reference to users stream
parent_key: "id"
partition_field: "user_id"Install with Tessl CLI
npx tessl i tessl/pypi-airbyte-cdk