A framework for building Airbyte Source and Destination connectors with Python, supporting both programmatic and low-code declarative approaches.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Framework for building data extraction connectors with support for HTTP APIs, databases, and files. The Source connector framework provides a structured approach to implementing data ingestion with built-in stream management, incremental synchronization, authentication, error handling, and state management.
Core classes for implementing source connectors that extract data from external systems.
from airbyte_cdk import Source
from airbyte_cdk.models import AirbyteCatalog, AirbyteMessage, ConfiguredAirbyteCatalog
from typing import Any, Iterable, List, Mapping, Optional, Tuple
import logging
class Source:
"""
Base class for Airbyte source connectors.
"""
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
"""
Test connection validity with given configuration.
Args:
logger: Logger instance for outputting messages
config: Configuration dictionary containing connection parameters
Returns:
Tuple of (success_boolean, error_message_or_none)
"""
def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
"""
Discover available streams and their schemas.
Args:
logger: Logger instance
config: Configuration dictionary
Returns:
AirbyteCatalog containing available streams and their schemas
"""
def read(
self,
logger: logging.Logger,
config: Mapping[str, Any],
catalog: ConfiguredAirbyteCatalog,
state: Optional[Mapping[str, Any]] = None
) -> Iterable[AirbyteMessage]:
"""
Read data from the source.
Args:
logger: Logger instance
config: Configuration dictionary
catalog: Configured catalog specifying which streams to read
state: Optional state for incremental reads
Yields:
AirbyteMessage instances containing records, state, or logs
"""
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
Return list of streams for this source.
Args:
config: Configuration dictionary
Returns:
List of Stream instances available in this source
"""Classes for building HTTP-based source connectors with built-in pagination, authentication, and error handling.
from airbyte_cdk import HttpStream, HttpSubStream
from airbyte_cdk.sources.streams.core import Stream
from requests.auth import AuthBase
from typing import Any, Iterable, Mapping, Optional
class HttpStream(Stream):
"""
Base class for HTTP API data extraction streams.
"""
def __init__(self, authenticator: Optional[AuthBase] = None):
"""
Initialize HTTP stream.
Args:
authenticator: Authentication handler for HTTP requests
"""
@property
def url_base(self) -> str:
"""
Base URL for the API endpoint.
Example: "https://api.example.com/v1/"
"""
def path(self, **kwargs) -> str:
"""
Return the API endpoint path for this stream.
Returns:
Path component of the URL (e.g., "users", "posts")
"""
def request_params(self, **kwargs) -> Mapping[str, Any]:
"""
Return query parameters for the request.
Returns:
Dictionary of query parameters
"""
def request_headers(self, **kwargs) -> Mapping[str, Any]:
"""
Return headers for the request.
Returns:
Dictionary of HTTP headers
"""
def parse_response(self, response, **kwargs) -> Iterable[Mapping]:
"""
Parse HTTP response into records.
Args:
response: HTTP response object
Yields:
Dictionary records extracted from the response
"""
def next_page_token(self, response) -> Optional[Mapping[str, Any]]:
"""
Extract next page token for pagination.
Args:
response: HTTP response object
Returns:
Token for next page or None if no more pages
"""
class HttpSubStream(HttpStream):
"""
HTTP stream that depends on data from a parent stream.
"""
def __init__(self, parent: HttpStream, **kwargs):
"""
Initialize sub-stream with parent dependency.
Args:
parent: Parent stream that provides data for this sub-stream
"""Authentication handlers for various HTTP authentication schemes.
from airbyte_cdk import TokenAuthenticator, Oauth2Authenticator, BasicHttpAuthenticator
from requests.auth import AuthBase
class TokenAuthenticator(AuthBase):
"""
Authentication using API tokens in headers.
"""
def __init__(self, token: str, auth_method: str = "Bearer", auth_header: str = "Authorization"):
"""
Initialize token authenticator.
Args:
token: API token value
auth_method: Authentication method (e.g., "Bearer", "Token")
auth_header: Header name for authentication
"""
class Oauth2Authenticator(AuthBase):
"""
OAuth 2.0 authentication with automatic token refresh.
"""
def __init__(
self,
token_refresh_endpoint: str,
client_id: str,
client_secret: str,
refresh_token: str,
scopes: Optional[List[str]] = None,
token_expiry_date: Optional[str] = None,
access_token: Optional[str] = None
):
"""
Initialize OAuth2 authenticator.
Args:
token_refresh_endpoint: URL for token refresh
client_id: OAuth client ID
client_secret: OAuth client secret
refresh_token: Refresh token for obtaining access tokens
scopes: Optional list of OAuth scopes
token_expiry_date: When current access token expires
access_token: Current access token
"""
class BasicHttpAuthenticator(AuthBase):
"""
HTTP Basic authentication.
"""
def __init__(self, username: str, password: str):
"""
Initialize basic authentication.
Args:
username: Username for basic auth
password: Password for basic auth
"""Classes for managing incremental synchronization state.
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.streams.core import IncrementalMixin
from typing import Any, Mapping, Optional
class IncrementalMixin:
"""
Mixin for streams that support incremental synchronization.
"""
@property
def cursor_field(self) -> str:
"""
Field name used as cursor for incremental sync.
Returns:
Name of the cursor field (e.g., "updated_at", "id")
"""
def get_updated_state(
self,
current_stream_state: Mapping[str, Any],
latest_record: Mapping[str, Any]
) -> Mapping[str, Any]:
"""
Update stream state based on the latest record.
Args:
current_stream_state: Current state for this stream
latest_record: Latest record processed
Returns:
Updated state dictionary
"""
class ConnectorStateManager:
"""
Manages state across all streams in a connector.
"""
def get_stream_state(self, stream_name: str) -> Mapping[str, Any]:
"""
Get state for a specific stream.
Args:
stream_name: Name of the stream
Returns:
State dictionary for the stream
"""
def update_state_for_stream(
self,
stream_name: str,
state: Mapping[str, Any]
) -> None:
"""
Update state for a specific stream.
Args:
stream_name: Name of the stream
state: New state dictionary
"""from airbyte_cdk import Source, HttpStream
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
import logging
from typing import Any, Mapping
class UsersStream(HttpStream):
url_base = "https://api.example.com/v1/"
primary_key = "id"
def __init__(self, config: Mapping[str, Any]):
authenticator = TokenAuthenticator(token=config["api_token"])
super().__init__(authenticator=authenticator)
self._config = config
def path(self, **kwargs) -> str:
return "users"
def parse_response(self, response, **kwargs):
data = response.json()
for user in data.get("users", []):
yield user
class ExampleSource(Source):
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]):
try:
# Test API connection
stream = UsersStream(config)
# Perform test request
return True, None
except Exception as e:
return False, str(e)
def streams(self, config: Mapping[str, Any]):
return [UsersStream(config)]
# Usage
source = ExampleSource()
config = {"api_token": "your_token_here"}
success, error = source.check_connection(logging.getLogger(), config)from airbyte_cdk import HttpStream
from airbyte_cdk.sources.streams.core import IncrementalMixin
from datetime import datetime
from typing import Any, Mapping, Optional
class OrdersStream(HttpStream, IncrementalMixin):
url_base = "https://api.example.com/v1/"
primary_key = "id"
cursor_field = "updated_at"
def path(self, **kwargs) -> str:
return "orders"
def request_params(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Mapping[str, Any]:
params = {"limit": 100}
# Add cursor for incremental sync
if stream_state and self.cursor_field in stream_state:
params["updated_since"] = stream_state[self.cursor_field]
# Add pagination token
if kwargs.get("next_page_token"):
params["page_token"] = kwargs["next_page_token"]
return params
def next_page_token(self, response) -> Optional[str]:
data = response.json()
return data.get("next_page_token")
def parse_response(self, response, **kwargs):
data = response.json()
for order in data.get("orders", []):
yield order
def get_updated_state(self, current_stream_state: Mapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
current_cursor = current_stream_state.get(self.cursor_field, "")
latest_cursor = latest_record.get(self.cursor_field, "")
return {self.cursor_field: max(current_cursor, latest_cursor)}from airbyte_cdk import HttpStream
from airbyte_cdk.sources.streams.http.requests_native_auth import Oauth2Authenticator
class AuthenticatedStream(HttpStream):
url_base = "https://api.example.com/v1/"
def __init__(self, config: Mapping[str, Any]):
authenticator = Oauth2Authenticator(
token_refresh_endpoint="https://api.example.com/oauth/token",
client_id=config["client_id"],
client_secret=config["client_secret"],
refresh_token=config["refresh_token"]
)
super().__init__(authenticator=authenticator)from airbyte_cdk import HttpSubStream
class UserPostsStream(HttpSubStream):
def __init__(self, parent: UsersStream, **kwargs):
super().__init__(parent=parent, **kwargs)
def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
user_id = stream_slice["parent"]["id"]
return f"users/{user_id}/posts"
def stream_slices(self, **kwargs):
# Use parent stream records as slices
for user in self.parent.read_records(**kwargs):
yield {"parent": user}
def parse_response(self, response, **kwargs):
data = response.json()
for post in data.get("posts", []):
yield postInstall with Tessl CLI
npx tessl i tessl/pypi-airbyte-cdk