CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-airbyte-cdk

A framework for building Airbyte Source and Destination connectors with Python, supporting both programmatic and low-code declarative approaches.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

declarative-cdk.mddocs/

Declarative Low-Code CDK

YAML-based framework for building connectors without writing Python code. The Declarative CDK supports most common connector patterns through declarative configuration with authentication, pagination, transformations, and incremental sync. This approach allows developers to create robust data connectors using only configuration files.

Capabilities

Declarative Source Classes

Core classes for creating low-code connectors using YAML manifests.

from airbyte_cdk import YamlDeclarativeSource, ManifestDeclarativeSource
from typing import Any, Dict, List, Mapping

class YamlDeclarativeSource(ManifestDeclarativeSource):
    """
    Declarative source defined by a YAML file.
    """
    
    def __init__(self, path_to_yaml: str, debug: bool = False):
        """
        Initialize source from YAML manifest file.
        
        Args:
            path_to_yaml: Path to the YAML manifest file
            debug: Enable debug logging for manifest parsing
        """

class ManifestDeclarativeSource:
    """
    Declarative source defined by a manifest of low-code components.
    """
    
    def __init__(
        self,
        source_config: Dict[str, Any],
        debug: bool = False,
        emit_connector_builder_messages: bool = False
    ):
        """
        Initialize source from manifest configuration.
        
        Args:
            source_config: The manifest configuration dictionary
            debug: Enable debug logging
            emit_connector_builder_messages: Enable Connector Builder specific messages
        """

Stream Configuration

Declarative stream definitions using YAML configuration.

from airbyte_cdk import DeclarativeStream
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
from airbyte_cdk.sources.declarative.requesters import HttpRequester

class DeclarativeStream:
    """
    Stream defined through declarative configuration.
    """
    
    def __init__(
        self,
        name: str,
        retriever: SimpleRetriever,
        config: Mapping[str, Any],
        parameters: Mapping[str, Any]
    ):
        """
        Initialize declarative stream.
        
        Args:
            name: Stream name
            retriever: Data retriever component
            config: Stream configuration
            parameters: Runtime parameters
        """

class SimpleRetriever:
    """
    Retriever that extracts data from HTTP responses.
    """
    
    def __init__(
        self,
        requester: HttpRequester,
        record_selector,
        paginator = None,
        partition_router = None
    ):
        """
        Initialize simple retriever.
        
        Args:
            requester: HTTP requester for making API calls
            record_selector: Component to extract records from responses
            paginator: Optional pagination handler
            partition_router: Optional partitioning strategy
        """

HTTP Requester Components

Components for making HTTP requests with authentication and error handling.

from airbyte_cdk import HttpRequester
from airbyte_cdk.sources.declarative.requesters.request_options import RequestOption
from airbyte_cdk.sources.declarative.auth import DeclarativeAuthenticator

class HttpRequester:
    """
    HTTP requester for declarative streams.
    """
    
    def __init__(
        self,
        name: str,
        url_base: str,
        path: str,
        http_method: str = "GET",
        request_options_provider = None,
        authenticator: DeclarativeAuthenticator = None,
        error_handler = None,
        config: Mapping[str, Any] = None,
        parameters: Mapping[str, Any] = None
    ):
        """
        Initialize HTTP requester.
        
        Args:
            name: Name of the requester
            url_base: Base URL for requests
            path: Request path template
            http_method: HTTP method (GET, POST, etc.)
            request_options_provider: Provider for request options
            authenticator: Authentication handler
            error_handler: Error handling strategy
            config: Configuration mapping
            parameters: Runtime parameters
        """

class RequestOption:
    """
    Configuration for request options (headers, params, body).
    """
    
    def __init__(
        self,
        field_name: str,
        inject_into: str,  # "request_parameter", "header", "body_data", "body_json"
        value: Any
    ):
        """
        Initialize request option.
        
        Args:
            field_name: Name of the option field
            inject_into: Where to inject the option
            value: Value or value template
        """

Authentication Components

Declarative authentication handlers for various schemes.

from airbyte_cdk import (
    NoAuth, 
    BasicHttpAuthenticator, 
    BearerAuthenticator, 
    ApiKeyAuthenticator,
    DeclarativeOauth2Authenticator
)

class NoAuth:
    """
    No authentication required.
    """
    pass

class BasicHttpAuthenticator:
    """
    HTTP Basic authentication.
    """
    
    def __init__(self, username: str, password: str):
        """
        Initialize basic authentication.
        
        Args:
            username: Username for authentication
            password: Password for authentication
        """

class BearerAuthenticator:
    """
    Bearer token authentication.
    """
    
    def __init__(self, api_token: str):
        """
        Initialize bearer token auth.
        
        Args:
            api_token: Bearer token for authentication
        """

class ApiKeyAuthenticator:
    """
    API key authentication via header or query parameter.
    """
    
    def __init__(
        self, 
        api_token: str, 
        header: str = None, 
        request_param: str = None
    ):
        """
        Initialize API key authentication.
        
        Args:
            api_token: API key value
            header: Header name for API key (if using header)
            request_param: Parameter name for API key (if using query param)
        """

class DeclarativeOauth2Authenticator:
    """
    OAuth 2.0 authentication with token refresh.
    """
    
    def __init__(
        self,
        token_refresh_endpoint: str,
        client_id: str,
        client_secret: str,
        refresh_token: str,
        scopes: List[str] = None,
        token_expiry_date: str = None,
        access_token: str = None,
        refresh_request_body: Dict[str, Any] = None
    ):
        """
        Initialize OAuth2 authentication.
        
        Args:
            token_refresh_endpoint: URL for refreshing tokens
            client_id: OAuth client ID
            client_secret: OAuth client secret
            refresh_token: Refresh token
            scopes: OAuth scopes
            token_expiry_date: When current token expires
            access_token: Current access token
            refresh_request_body: Additional refresh request parameters
        """

Pagination Strategies

Components for handling API pagination patterns.

from airbyte_cdk import DefaultPaginator
from airbyte_cdk.sources.declarative.requesters.paginators.strategies import (
    OffsetIncrement, 
    PageIncrement, 
    CursorPaginationStrategy
)

class DefaultPaginator:
    """
    Default paginator with configurable pagination strategy.
    """
    
    def __init__(
        self,
        pagination_strategy,
        page_size_option: RequestOption = None,
        page_token_option: RequestOption = None,
        config: Mapping[str, Any] = None,
        parameters: Mapping[str, Any] = None
    ):
        """
        Initialize paginator.
        
        Args:
            pagination_strategy: Strategy for extracting pagination info
            page_size_option: How to specify page size in requests
            page_token_option: How to specify page token in requests
            config: Configuration mapping
            parameters: Runtime parameters
        """

class OffsetIncrement:
    """
    Pagination using offset-based increments.
    """
    
    def __init__(self, page_size: int, offset_param: str = "offset"):
        """
        Initialize offset pagination.
        
        Args:
            page_size: Number of records per page
            offset_param: Parameter name for offset
        """

class PageIncrement:
    """
    Pagination using page number increments.
    """
    
    def __init__(self, page_size: int = None, start_from_page: int = 1):
        """
        Initialize page-based pagination.
        
        Args:
            page_size: Number of records per page
            start_from_page: Starting page number
        """

class CursorPaginationStrategy:
    """
    Pagination using cursor tokens.
    """
    
    def __init__(
        self, 
        cursor_value: str, 
        stop_condition: str = None,
        page_size: int = None
    ):
        """
        Initialize cursor pagination.
        
        Args:
            cursor_value: JSONPath to extract cursor from response
            stop_condition: Condition to stop pagination
            page_size: Number of records per page
        """

Record Processing

Components for extracting and transforming records from API responses.

from airbyte_cdk import RecordSelector, DpathExtractor
from airbyte_cdk.sources.declarative.transformations import AddFields, RecordTransformation

class RecordSelector:
    """
    Selects records from API responses.
    """
    
    def __init__(
        self,
        extractor: DpathExtractor,
        record_filter = None,
        transformations: List[RecordTransformation] = None
    ):
        """
        Initialize record selector.
        
        Args:
            extractor: Component to extract records from responses
            record_filter: Optional filter for records
            transformations: List of transformations to apply
        """

class DpathExtractor:
    """
    Extracts records using JSONPath expressions.
    """
    
    def __init__(self, field_path: List[str], config: Mapping[str, Any] = None):
        """
        Initialize dpath extractor.
        
        Args:
            field_path: JSONPath to records in response
            config: Configuration mapping
        """

class AddFields:
    """
    Transformation that adds fields to records.
    """
    
    def __init__(self, fields: List[dict]):
        """
        Initialize add fields transformation.
        
        Args:
            fields: List of field definitions to add
        """

Usage Examples

Basic YAML Manifest

version: "0.29.0"
type: DeclarativeSource

check:
  type: CheckStream
  stream_names: ["users"]

streams:
  - type: DeclarativeStream
    name: users
    primary_key: ["id"]
    retriever:
      type: SimpleRetriever
      requester:
        type: HttpRequester
        url_base: "https://api.example.com/v1/"
        path: "users"
        http_method: "GET"
        authenticator:
          type: BearerAuthenticator
          api_token: "{{ config['api_token'] }}"
      record_selector:
        type: RecordSelector
        extractor:
          type: DpathExtractor
          field_path: ["data"]

spec:
  type: Spec
  connection_specification:
    type: object
    properties:
      api_token:
        type: string
        title: API Token
        airbyte_secret: true
    required:
      - api_token

Python Implementation

from airbyte_cdk import YamlDeclarativeSource

class ExampleSource(YamlDeclarativeSource):
    def __init__(self):
        super().__init__(path_to_yaml="manifest.yaml")

# The source is now fully functional with the YAML manifest

Advanced Manifest with Pagination and Incremental Sync

version: "0.29.0"
type: DeclarativeSource

definitions:
  base_requester:
    type: HttpRequester
    url_base: "https://api.example.com/v1/"
    authenticator:
      type: BearerAuthenticator
      api_token: "{{ config['api_token'] }}"
  
  base_retriever:
    type: SimpleRetriever
    requester:
      $ref: "#/definitions/base_requester"
    record_selector:
      type: RecordSelector
      extractor:
        type: DpathExtractor
        field_path: ["data"]
    paginator:
      type: DefaultPaginator
      pagination_strategy:
        type: CursorPaginationStrategy
        cursor_value: "{{ response.get('next_page_token') }}"
        stop_condition: "{{ not response.get('next_page_token') }}"
      page_token_option:
        type: RequestOption
        field_name: "page_token"
        inject_into: "request_parameter"

streams:
  - type: DeclarativeStream
    name: orders
    primary_key: ["id"]
    retriever:
      $ref: "#/definitions/base_retriever"
      requester:
        $ref: "#/definitions/base_requester"
        path: "orders"
        request_parameters:
          updated_since: "{{ stream_state.get('updated_at', config.get('start_date')) }}"
    incremental_sync:
      type: DatetimeBasedCursor
      cursor_field: "updated_at"
      datetime_format: "%Y-%m-%dT%H:%M:%SZ"
      cursor_granularity: "PT1S"
      start_datetime:
        type: MinMaxDatetime
        datetime: "{{ config['start_date'] }}"
        datetime_format: "%Y-%m-%d"

check:
  type: CheckStream
  stream_names: ["orders"]

spec:
  type: Spec
  connection_specification:
    type: object
    properties:
      api_token:
        type: string
        title: API Token
        airbyte_secret: true
      start_date:
        type: string
        title: Start Date
        format: date
        pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}$"
    required:
      - api_token
      - start_date

OAuth2 Authentication Manifest

version: "0.29.0"
type: DeclarativeSource

definitions:
  oauth_authenticator:
    type: DeclarativeOauth2Authenticator
    token_refresh_endpoint: "https://api.example.com/oauth/token"
    client_id: "{{ config['client_id'] }}"
    client_secret: "{{ config['client_secret'] }}"
    refresh_token: "{{ config['refresh_token'] }}"
    scopes: ["read:users", "read:data"]

streams:
  - type: DeclarativeStream
    name: protected_data
    retriever:
      type: SimpleRetriever
      requester:
        type: HttpRequester
        url_base: "https://api.example.com/v1/"
        path: "protected"
        authenticator:
          $ref: "#/definitions/oauth_authenticator"
      record_selector:
        type: RecordSelector
        extractor:
          type: DpathExtractor
          field_path: ["items"]

spec:
  type: Spec
  connection_specification:
    type: object
    properties:
      client_id:
        type: string
        title: Client ID
      client_secret:
        type: string
        title: Client Secret
        airbyte_secret: true
      refresh_token:
        type: string
        title: Refresh Token
        airbyte_secret: true
    required:
      - client_id
      - client_secret
      - refresh_token

Record Transformations

streams:
  - type: DeclarativeStream
    name: transformed_users
    retriever:
      type: SimpleRetriever
      requester:
        type: HttpRequester
        url_base: "https://api.example.com/v1/"
        path: "users"
      record_selector:
        type: RecordSelector
        extractor:
          type: DpathExtractor
          field_path: ["users"]
        transformations:
          - type: AddFields
            fields:
              - path: ["source"]
                value: "api"
              - path: ["extracted_at"]
                value: "{{ now_utc() }}"
              - path: ["full_name"]
                value: "{{ record['first_name'] }} {{ record['last_name'] }}"

Sub-streams (Parent-Child Relationships)

streams:
  - type: DeclarativeStream
    name: users
    primary_key: ["id"]
    retriever:
      type: SimpleRetriever
      requester:
        type: HttpRequester
        url_base: "https://api.example.com/v1/"
        path: "users"
      record_selector:
        type: RecordSelector
        extractor:
          type: DpathExtractor
          field_path: ["users"]

  - type: DeclarativeStream
    name: user_posts
    primary_key: ["id"]
    retriever:
      type: SimpleRetriever
      requester:
        type: HttpRequester
        url_base: "https://api.example.com/v1/"
        path: "users/{{ stream_slice.user_id }}/posts"
      record_selector:
        type: RecordSelector
        extractor:
          type: DpathExtractor
          field_path: ["posts"]
      partition_router:
        type: SubstreamPartitionRouter
        parent_stream_configs:
          - stream: "#/streams/0"  # Reference to users stream
            parent_key: "id"
            partition_field: "user_id"

Install with Tessl CLI

npx tessl i tessl/pypi-airbyte-cdk

docs

declarative-cdk.md

destination-connectors.md

index.md

source-connectors.md

tile.json