A framework for building Airbyte Source and Destination connectors with Python, supporting both programmatic and low-code declarative approaches.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Framework for building data loading connectors that write records to databases, data warehouses, files, and APIs. The Destination connector framework provides structured approaches to implementing data loading with batch processing, type mapping, error handling, and integration with Airbyte's standardized message protocol.
Core class for implementing destination connectors that load data into external systems.
from airbyte_cdk import Destination
from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog
from typing import Any, Iterable, Mapping
import logging
class Destination:
"""
Base class for Airbyte destination connectors.
"""
def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
"""
Test connection validity with given configuration.
Args:
logger: Logger instance for outputting messages
config: Configuration dictionary containing connection parameters
Returns:
AirbyteConnectionStatus indicating success or failure with details
"""
def write(
self,
config: Mapping[str, Any],
configured_catalog: ConfiguredAirbyteCatalog,
input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:
"""
Write data records to the destination.
Args:
config: Configuration dictionary
configured_catalog: Catalog specifying destination streams and sync modes
input_messages: Stream of AirbyteMessage instances containing records to write
Yields:
AirbyteMessage instances for status updates, state, or errors
"""
def spec(self) -> ConnectorSpecification:
"""
Return the specification for this destination's configuration.
Returns:
ConnectorSpecification defining required and optional configuration fields
"""Classes and utilities for processing Airbyte protocol messages.
from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, AirbyteStateMessage, Type
from typing import Any, Dict, List
class AirbyteMessage:
"""
Airbyte protocol message containing data records, state, or metadata.
"""
type: Type # MESSAGE, RECORD, STATE, LOG, CATALOG, etc.
record: Optional[AirbyteRecordMessage]
state: Optional[AirbyteStateMessage]
log: Optional[AirbyteLogMessage]
class AirbyteRecordMessage:
"""
Data record message containing actual data to be written.
"""
stream: str # Name of the stream
data: Dict[str, Any] # Record data as key-value pairs
emitted_at: int # Timestamp when record was emitted (milliseconds)
namespace: Optional[str] # Optional namespace for the stream
class ConfiguredAirbyteCatalog:
"""
Catalog describing which streams to write and how.
"""
streams: List[ConfiguredAirbyteStream]
class ConfiguredAirbyteStream:
"""
Configuration for a single stream in the destination.
"""
stream: AirbyteStream # Stream schema and metadata
sync_mode: SyncMode # FULL_REFRESH or INCREMENTAL
destination_sync_mode: DestinationSyncMode # APPEND, OVERWRITE, APPEND_DEDUP
cursor_field: Optional[List[str]] # Fields used for incremental sync
primary_key: Optional[List[List[str]]] # Fields used for deduplicationUtilities for handling destination configuration and data schemas.
from airbyte_cdk.models import ConnectorSpecification, AirbyteStream
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
from typing import Any, Dict, List, Optional
class ConnectorSpecification:
"""
Specification defining the configuration schema for a destination.
"""
connectionSpecification: Dict[str, Any] # JSONSchema for configuration
supportsIncremental: Optional[bool] # Whether destination supports incremental sync
supportsNormalization: Optional[bool] # Whether destination supports normalization
supportsDBT: Optional[bool] # Whether destination supports DBT transformations
supported_destination_sync_modes: List[DestinationSyncMode] # Supported sync modes
def check_config_against_spec_or_exit(config: Dict[str, Any], spec: ConnectorSpecification) -> None:
"""
Validate configuration against the connector specification.
Args:
config: Configuration dictionary to validate
spec: Connector specification with schema
Raises:
SystemExit: If configuration is invalid
"""
class ResourceSchemaLoader:
"""
Load JSON schemas from package resources.
"""
def __init__(self, package_name: str):
"""
Initialize schema loader.
Args:
package_name: Python package containing schema files
"""
def get_schema(self, name: str) -> Dict[str, Any]:
"""
Load schema by name.
Args:
name: Schema file name (without .json extension)
Returns:
Schema dictionary
"""Classes for handling data type conversion and transformation.
from airbyte_cdk.sources.utils.transform import TypeTransformer, TransformConfig
from typing import Any, Dict, Mapping
class TypeTransformer:
"""
Transform data types between Airbyte and destination formats.
"""
def __init__(self, config: TransformConfig):
"""
Initialize type transformer.
Args:
config: Configuration for type transformations
"""
def transform(self, data: Dict[str, Any], schema: Dict[str, Any]) -> Dict[str, Any]:
"""
Transform record data according to schema and configuration.
Args:
data: Record data dictionary
schema: JSONSchema for the record
Returns:
Transformed data dictionary
"""
class TransformConfig:
"""
Configuration for data transformations.
"""
def __init__(
self,
date_format: str = None,
datetime_format: str = None,
time_format: str = None,
normalization: Mapping[str, str] = None
):
"""
Initialize transformation configuration.
Args:
date_format: Format string for date fields
datetime_format: Format string for datetime fields
time_format: Format string for time fields
normalization: Field name normalization mappings
"""from airbyte_cdk import Destination
from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, Status, Type
import logging
import sqlite3
from typing import Any, Iterable, Mapping
class SqliteDestination(Destination):
def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
try:
# Test database connection
conn = sqlite3.connect(config["database_path"])
conn.execute("SELECT 1")
conn.close()
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
except Exception as e:
return AirbyteConnectionStatus(status=Status.FAILED, message=str(e))
def write(
self,
config: Mapping[str, Any],
configured_catalog: ConfiguredAirbyteCatalog,
input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:
conn = sqlite3.connect(config["database_path"])
try:
for message in input_messages:
if message.type == Type.RECORD:
self._write_record(conn, message.record)
elif message.type == Type.STATE:
# Pass through state messages
yield message
# Yield message to maintain protocol
yield message
finally:
conn.close()
def _write_record(self, conn: sqlite3.Connection, record):
table_name = record.stream
data = record.data
# Create table if not exists
self._ensure_table_exists(conn, table_name, data)
# Insert record
columns = list(data.keys())
values = list(data.values())
placeholders = ",".join(["?" for _ in values])
query = f"INSERT INTO {table_name} ({','.join(columns)}) VALUES ({placeholders})"
conn.execute(query, values)
conn.commit()
def _ensure_table_exists(self, conn: sqlite3.Connection, table_name: str, sample_data: dict):
# Simple table creation based on sample data
columns = []
for key, value in sample_data.items():
if isinstance(value, int):
columns.append(f"{key} INTEGER")
elif isinstance(value, float):
columns.append(f"{key} REAL")
else:
columns.append(f"{key} TEXT")
create_sql = f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(columns)})"
conn.execute(create_sql)
conn.commit()
# Usage
destination = SqliteDestination()
config = {"database_path": "/path/to/database.db"}
status = destination.check(logging.getLogger(), config)from airbyte_cdk import Destination
from airbyte_cdk.models import AirbyteMessage, Type
import json
from typing import Any, Dict, Iterable, List, Mapping
class BatchFileDestination(Destination):
def __init__(self):
self._buffer = {} # stream_name -> list of records
self._batch_size = 1000
def write(
self,
config: Mapping[str, Any],
configured_catalog,
input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:
for message in input_messages:
if message.type == Type.RECORD:
self._buffer_record(message.record)
# Check if batch is ready
if len(self._buffer[message.record.stream]) >= self._batch_size:
self._flush_batch(config, message.record.stream)
yield message
# Flush remaining records
for stream_name in self._buffer:
if self._buffer[stream_name]:
self._flush_batch(config, stream_name)
def _buffer_record(self, record):
stream_name = record.stream
if stream_name not in self._buffer:
self._buffer[stream_name] = []
self._buffer[stream_name].append({
"data": record.data,
"emitted_at": record.emitted_at
})
def _flush_batch(self, config: Mapping[str, Any], stream_name: str):
if not self._buffer[stream_name]:
return
output_path = f"{config['output_dir']}/{stream_name}.jsonl"
with open(output_path, "a") as f:
for record in self._buffer[stream_name]:
f.write(json.dumps(record) + "\n")
self._buffer[stream_name] = []
print(f"Flushed batch for stream {stream_name}")from airbyte_cdk import Destination
from airbyte_cdk.sources.utils.transform import TypeTransformer, TransformConfig
from airbyte_cdk.models import AirbyteMessage, Type
import jsonschema
from typing import Any, Dict, Iterable, Mapping
class ValidatingDestination(Destination):
def __init__(self):
self._transformer = TypeTransformer(TransformConfig(
date_format="%Y-%m-%d",
datetime_format="%Y-%m-%dT%H:%M:%S",
))
self._schemas = {} # stream_name -> schema
def write(
self,
config: Mapping[str, Any],
configured_catalog,
input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:
# Load schemas from catalog
for stream_config in configured_catalog.streams:
stream_name = stream_config.stream.name
self._schemas[stream_name] = stream_config.stream.json_schema
for message in input_messages:
if message.type == Type.RECORD:
try:
# Validate and transform record
validated_record = self._validate_and_transform(message.record)
self._write_validated_record(config, validated_record)
except Exception as e:
# Log validation error but don't stop processing
print(f"Validation error for record in {message.record.stream}: {e}")
yield message
def _validate_and_transform(self, record) -> Dict[str, Any]:
stream_name = record.stream
schema = self._schemas.get(stream_name)
if schema:
# Validate against schema
jsonschema.validate(record.data, schema)
# Transform data types
transformed_data = self._transformer.transform(record.data, schema)
return {
"stream": stream_name,
"data": transformed_data,
"emitted_at": record.emitted_at
}
return {
"stream": stream_name,
"data": record.data,
"emitted_at": record.emitted_at
}
def _write_validated_record(self, config: Mapping[str, Any], record: Dict[str, Any]):
# Write the validated and transformed record
passfrom airbyte_cdk import Destination
from airbyte_cdk.models import AirbyteMessage, Type
import requests
import time
from typing import Any, Iterable, Mapping
class ApiDestination(Destination):
def write(
self,
config: Mapping[str, Any],
configured_catalog,
input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:
session = requests.Session()
session.headers.update({
"Authorization": f"Bearer {config['api_token']}",
"Content-Type": "application/json"
})
for message in input_messages:
if message.type == Type.RECORD:
success = self._write_record_with_retry(
session,
config,
message.record
)
if not success:
print(f"Failed to write record to API: {message.record.stream}")
yield message
def _write_record_with_retry(self, session: requests.Session, config: Mapping[str, Any], record, max_retries: int = 3) -> bool:
url = f"{config['api_base_url']}/data/{record.stream}"
payload = {
"data": record.data,
"timestamp": record.emitted_at
}
for attempt in range(max_retries + 1):
try:
response = session.post(url, json=payload, timeout=30)
if response.status_code == 200:
return True
elif response.status_code == 429: # Rate limited
wait_time = int(response.headers.get("Retry-After", 60))
time.sleep(wait_time)
continue
elif response.status_code >= 500: # Server error, retry
time.sleep(2 ** attempt) # Exponential backoff
continue
else:
print(f"API error {response.status_code}: {response.text}")
return False
except requests.exceptions.RequestException as e:
print(f"Request failed (attempt {attempt + 1}): {e}")
if attempt < max_retries:
time.sleep(2 ** attempt)
return FalseInstall with Tessl CLI
npx tessl i tessl/pypi-airbyte-cdk