CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-airbyte-cdk

A framework for building Airbyte Source and Destination connectors with Python, supporting both programmatic and low-code declarative approaches.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

source-connectors.mddocs/

Source Connectors

Framework for building data extraction connectors with support for HTTP APIs, databases, and files. The Source connector framework provides a structured approach to implementing data ingestion with built-in stream management, incremental synchronization, authentication, error handling, and state management.

Capabilities

Base Source Classes

Core classes for implementing source connectors that extract data from external systems.

from airbyte_cdk import Source
from airbyte_cdk.models import AirbyteCatalog, AirbyteMessage, ConfiguredAirbyteCatalog
from typing import Any, Iterable, List, Mapping, Optional, Tuple
import logging

class Source:
    """
    Base class for Airbyte source connectors.
    """
    def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
        """
        Test connection validity with given configuration.
        
        Args:
            logger: Logger instance for outputting messages
            config: Configuration dictionary containing connection parameters
            
        Returns:
            Tuple of (success_boolean, error_message_or_none)
        """
    
    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
        """
        Discover available streams and their schemas.
        
        Args:
            logger: Logger instance
            config: Configuration dictionary
            
        Returns:
            AirbyteCatalog containing available streams and their schemas
        """
    
    def read(
        self, 
        logger: logging.Logger, 
        config: Mapping[str, Any], 
        catalog: ConfiguredAirbyteCatalog, 
        state: Optional[Mapping[str, Any]] = None
    ) -> Iterable[AirbyteMessage]:
        """
        Read data from the source.
        
        Args:
            logger: Logger instance
            config: Configuration dictionary
            catalog: Configured catalog specifying which streams to read
            state: Optional state for incremental reads
            
        Yields:
            AirbyteMessage instances containing records, state, or logs
        """
    
    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
        """
        Return list of streams for this source.
        
        Args:
            config: Configuration dictionary
            
        Returns:
            List of Stream instances available in this source
        """

HTTP Stream Classes

Classes for building HTTP-based source connectors with built-in pagination, authentication, and error handling.

from airbyte_cdk import HttpStream, HttpSubStream
from airbyte_cdk.sources.streams.core import Stream
from requests.auth import AuthBase
from typing import Any, Iterable, Mapping, Optional

class HttpStream(Stream):
    """
    Base class for HTTP API data extraction streams.
    """
    
    def __init__(self, authenticator: Optional[AuthBase] = None):
        """
        Initialize HTTP stream.
        
        Args:
            authenticator: Authentication handler for HTTP requests
        """
    
    @property
    def url_base(self) -> str:
        """
        Base URL for the API endpoint.
        Example: "https://api.example.com/v1/"
        """
    
    def path(self, **kwargs) -> str:
        """
        Return the API endpoint path for this stream.
        
        Returns:
            Path component of the URL (e.g., "users", "posts")
        """
    
    def request_params(self, **kwargs) -> Mapping[str, Any]:
        """
        Return query parameters for the request.
        
        Returns:
            Dictionary of query parameters
        """
    
    def request_headers(self, **kwargs) -> Mapping[str, Any]:
        """
        Return headers for the request.
        
        Returns:
            Dictionary of HTTP headers
        """
    
    def parse_response(self, response, **kwargs) -> Iterable[Mapping]:
        """
        Parse HTTP response into records.
        
        Args:
            response: HTTP response object
            
        Yields:
            Dictionary records extracted from the response
        """
    
    def next_page_token(self, response) -> Optional[Mapping[str, Any]]:
        """
        Extract next page token for pagination.
        
        Args:
            response: HTTP response object
            
        Returns:
            Token for next page or None if no more pages
        """

class HttpSubStream(HttpStream):
    """
    HTTP stream that depends on data from a parent stream.
    """
    
    def __init__(self, parent: HttpStream, **kwargs):
        """
        Initialize sub-stream with parent dependency.
        
        Args:
            parent: Parent stream that provides data for this sub-stream
        """

Authentication

Authentication handlers for various HTTP authentication schemes.

from airbyte_cdk import TokenAuthenticator, Oauth2Authenticator, BasicHttpAuthenticator
from requests.auth import AuthBase

class TokenAuthenticator(AuthBase):
    """
    Authentication using API tokens in headers.
    """
    
    def __init__(self, token: str, auth_method: str = "Bearer", auth_header: str = "Authorization"):
        """
        Initialize token authenticator.
        
        Args:
            token: API token value
            auth_method: Authentication method (e.g., "Bearer", "Token")
            auth_header: Header name for authentication
        """

class Oauth2Authenticator(AuthBase):
    """
    OAuth 2.0 authentication with automatic token refresh.
    """
    
    def __init__(
        self,
        token_refresh_endpoint: str,
        client_id: str,
        client_secret: str,
        refresh_token: str,
        scopes: Optional[List[str]] = None,
        token_expiry_date: Optional[str] = None,
        access_token: Optional[str] = None
    ):
        """
        Initialize OAuth2 authenticator.
        
        Args:
            token_refresh_endpoint: URL for token refresh
            client_id: OAuth client ID
            client_secret: OAuth client secret
            refresh_token: Refresh token for obtaining access tokens
            scopes: Optional list of OAuth scopes
            token_expiry_date: When current access token expires
            access_token: Current access token
        """

class BasicHttpAuthenticator(AuthBase):
    """
    HTTP Basic authentication.
    """
    
    def __init__(self, username: str, password: str):
        """
        Initialize basic authentication.
        
        Args:
            username: Username for basic auth
            password: Password for basic auth
        """

Stream State Management

Classes for managing incremental synchronization state.

from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.streams.core import IncrementalMixin
from typing import Any, Mapping, Optional

class IncrementalMixin:
    """
    Mixin for streams that support incremental synchronization.
    """
    
    @property
    def cursor_field(self) -> str:
        """
        Field name used as cursor for incremental sync.
        
        Returns:
            Name of the cursor field (e.g., "updated_at", "id")
        """
    
    def get_updated_state(
        self, 
        current_stream_state: Mapping[str, Any], 
        latest_record: Mapping[str, Any]
    ) -> Mapping[str, Any]:
        """
        Update stream state based on the latest record.
        
        Args:
            current_stream_state: Current state for this stream
            latest_record: Latest record processed
            
        Returns:
            Updated state dictionary
        """

class ConnectorStateManager:
    """
    Manages state across all streams in a connector.
    """
    
    def get_stream_state(self, stream_name: str) -> Mapping[str, Any]:
        """
        Get state for a specific stream.
        
        Args:
            stream_name: Name of the stream
            
        Returns:
            State dictionary for the stream
        """
    
    def update_state_for_stream(
        self, 
        stream_name: str, 
        state: Mapping[str, Any]
    ) -> None:
        """
        Update state for a specific stream.
        
        Args:
            stream_name: Name of the stream
            state: New state dictionary
        """

Usage Examples

Basic HTTP Source

from airbyte_cdk import Source, HttpStream
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
import logging
from typing import Any, Mapping

class UsersStream(HttpStream):
    url_base = "https://api.example.com/v1/"
    primary_key = "id"
    
    def __init__(self, config: Mapping[str, Any]):
        authenticator = TokenAuthenticator(token=config["api_token"])
        super().__init__(authenticator=authenticator)
        self._config = config
    
    def path(self, **kwargs) -> str:
        return "users"
    
    def parse_response(self, response, **kwargs):
        data = response.json()
        for user in data.get("users", []):
            yield user

class ExampleSource(Source):
    def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]):
        try:
            # Test API connection
            stream = UsersStream(config)
            # Perform test request
            return True, None
        except Exception as e:
            return False, str(e)
    
    def streams(self, config: Mapping[str, Any]):
        return [UsersStream(config)]

# Usage
source = ExampleSource()
config = {"api_token": "your_token_here"}
success, error = source.check_connection(logging.getLogger(), config)

Incremental Stream with Pagination

from airbyte_cdk import HttpStream
from airbyte_cdk.sources.streams.core import IncrementalMixin
from datetime import datetime
from typing import Any, Mapping, Optional

class OrdersStream(HttpStream, IncrementalMixin):
    url_base = "https://api.example.com/v1/"
    primary_key = "id"
    cursor_field = "updated_at"
    
    def path(self, **kwargs) -> str:
        return "orders"
    
    def request_params(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Mapping[str, Any]:
        params = {"limit": 100}
        
        # Add cursor for incremental sync
        if stream_state and self.cursor_field in stream_state:
            params["updated_since"] = stream_state[self.cursor_field]
        
        # Add pagination token
        if kwargs.get("next_page_token"):
            params["page_token"] = kwargs["next_page_token"]
        
        return params
    
    def next_page_token(self, response) -> Optional[str]:
        data = response.json()
        return data.get("next_page_token")
    
    def parse_response(self, response, **kwargs):
        data = response.json()
        for order in data.get("orders", []):
            yield order
    
    def get_updated_state(self, current_stream_state: Mapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
        current_cursor = current_stream_state.get(self.cursor_field, "")
        latest_cursor = latest_record.get(self.cursor_field, "")
        
        return {self.cursor_field: max(current_cursor, latest_cursor)}

OAuth2 Authentication

from airbyte_cdk import HttpStream
from airbyte_cdk.sources.streams.http.requests_native_auth import Oauth2Authenticator

class AuthenticatedStream(HttpStream):
    url_base = "https://api.example.com/v1/"
    
    def __init__(self, config: Mapping[str, Any]):
        authenticator = Oauth2Authenticator(
            token_refresh_endpoint="https://api.example.com/oauth/token",
            client_id=config["client_id"],
            client_secret=config["client_secret"],
            refresh_token=config["refresh_token"]
        )
        super().__init__(authenticator=authenticator)

Sub-stream Implementation

from airbyte_cdk import HttpSubStream

class UserPostsStream(HttpSubStream):
    def __init__(self, parent: UsersStream, **kwargs):
        super().__init__(parent=parent, **kwargs)
    
    def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
        user_id = stream_slice["parent"]["id"]
        return f"users/{user_id}/posts"
    
    def stream_slices(self, **kwargs):
        # Use parent stream records as slices
        for user in self.parent.read_records(**kwargs):
            yield {"parent": user}
    
    def parse_response(self, response, **kwargs):
        data = response.json()
        for post in data.get("posts", []):
            yield post

Install with Tessl CLI

npx tessl i tessl/pypi-airbyte-cdk

docs

declarative-cdk.md

destination-connectors.md

index.md

source-connectors.md

tile.json