0
# Airbyte CDK
1
2
The Airbyte CDK (Connector Development Kit) is a Python framework for building Airbyte Source and Destination connectors. It provides both programmatic and low-code declarative approaches to creating data integration connectors, enabling developers to build reliable data extraction and loading solutions for various APIs, databases, and data sources.
3
4
The CDK offers three main approaches: traditional programmatic connector development using Python classes, low-code declarative manifests using YAML configuration, and destination connector development for data loading scenarios.
5
6
## Package Information
7
8
- **Package Name**: airbyte-cdk
9
- **Version**: 3.1.0
10
- **Language**: Python
11
- **Installation**: `pip install airbyte-cdk`
12
- **Python Version**: 3.9+
13
- **Framework Type**: Connector Development Kit
14
15
## Core Imports
16
17
Main connector classes:
18
19
```python
20
from airbyte_cdk import Source, Destination
21
from airbyte_cdk.entrypoint import launch
22
```
23
24
Low-code declarative sources:
25
26
```python
27
from airbyte_cdk import YamlDeclarativeSource, ManifestDeclarativeSource
28
```
29
30
HTTP streams and utilities:
31
32
```python
33
from airbyte_cdk import HttpStream, HttpSubStream
34
from airbyte_cdk import TokenAuthenticator, Oauth2Authenticator
35
```
36
37
Protocol models:
38
39
```python
40
from airbyte_cdk import AirbyteMessage, ConfiguredAirbyteCatalog, AirbyteStream
41
```
42
43
## Basic Usage
44
45
### Creating a Source Connector
46
47
```python
48
from typing import Any, Iterable, List, Mapping, Optional, Tuple
49
from airbyte_cdk import Source
50
from airbyte_cdk.models import AirbyteCatalog, AirbyteMessage, ConfiguredAirbyteCatalog
51
from airbyte_cdk.sources.streams import Stream
52
import logging
53
54
class MySource(Source):
55
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
56
"""
57
Test connection to the source.
58
Returns (True, None) if successful, (False, error_message) otherwise.
59
"""
60
try:
61
# Test connection logic here
62
return True, None
63
except Exception as e:
64
return False, str(e)
65
66
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
67
"""
68
Return list of streams for this source.
69
"""
70
return [MyStream(config=config)]
71
72
def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
73
"""
74
Discover available streams and their schemas.
75
"""
76
return AirbyteCatalog(streams=[stream.as_airbyte_stream() for stream in self.streams(config)])
77
78
# Launch the connector
79
if __name__ == "__main__":
80
from airbyte_cdk.entrypoint import launch
81
launch(MySource(), sys.argv[1:])
82
```
83
84
### Using Low-Code Declarative Sources
85
86
```python
87
from airbyte_cdk import YamlDeclarativeSource
88
89
class MyDeclarativeSource(YamlDeclarativeSource):
90
def __init__(self):
91
super().__init__(path_to_yaml="manifest.yaml")
92
93
# The manifest.yaml file defines streams, authentication, and data transformation
94
```
95
96
### Creating a Destination Connector
97
98
```python
99
from typing import Any, Iterable, Mapping
100
from airbyte_cdk import Destination
101
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog
102
103
class MyDestination(Destination):
104
def write(
105
self,
106
config: Mapping[str, Any],
107
configured_catalog: ConfiguredAirbyteCatalog,
108
input_messages: Iterable[AirbyteMessage]
109
) -> Iterable[AirbyteMessage]:
110
"""
111
Write data records to the destination.
112
"""
113
for message in input_messages:
114
if message.type == Type.RECORD:
115
# Process and write the record
116
self._write_record(message.record)
117
yield message
118
```
119
120
## Architecture
121
122
The Airbyte CDK follows a layered architecture designed for extensibility and reusability:
123
124
### Core Components
125
126
1. **Base Connectors**: Abstract `Source` and `Destination` classes providing the foundational interface
127
2. **Stream Framework**: `HttpStream` and `Stream` classes for data extraction with built-in state management
128
3. **Authentication Layer**: Various authenticators for OAuth2, API tokens, and custom authentication schemes
129
4. **Declarative Framework**: YAML-based low-code approach for building connectors without Python code
130
5. **Protocol Models**: Pydantic models implementing the Airbyte Protocol for message exchange
131
6. **Utilities**: Helper functions for configuration, schema handling, and data transformation
132
133
### Stream Types
134
135
- **HTTP Streams**: For REST API data sources with pagination, authentication, and error handling
136
- **Incremental Streams**: Support for state-based incremental synchronization
137
- **Sub-streams**: Nested data extraction from parent-child relationships
138
- **Concurrent Streams**: High-throughput data extraction with parallel processing
139
140
## Capabilities
141
142
### [Source Connectors](./source-connectors.md)
143
144
Framework for building data extraction connectors with support for HTTP APIs, databases, and files. Includes stream management, incremental sync, authentication, error handling, and state management.
145
146
### [Destination Connectors](./destination-connectors.md)
147
148
Framework for building data loading connectors that write records to databases, data warehouses, files, and APIs. Provides batch processing, type mapping, and error handling capabilities.
149
150
### [Declarative Low-Code CDK](./declarative-cdk.md)
151
152
YAML-based framework for building connectors without writing Python code. Supports most common connector patterns through declarative configuration with authentication, pagination, transformations, and incremental sync.
153
154
## Quick Start Example
155
156
Complete example of building and running a connector:
157
158
```python
159
import sys
160
from airbyte_cdk import Source, HttpStream, launch
161
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
162
163
class UsersStream(HttpStream):
164
url_base = "https://api.example.com/"
165
primary_key = "id"
166
167
def __init__(self, config: dict):
168
super().__init__()
169
self._config = config
170
171
def path(self, **kwargs) -> str:
172
return "users"
173
174
def request_headers(self, **kwargs) -> Mapping[str, Any]:
175
return {"Authorization": f"Bearer {self._config['api_token']}"}
176
177
def parse_response(self, response, **kwargs) -> Iterable[Mapping]:
178
data = response.json()
179
for record in data.get("users", []):
180
yield record
181
182
class ExampleSource(Source):
183
def check_connection(self, logger, config):
184
return True, None
185
186
def streams(self, config):
187
return [UsersStream(config)]
188
189
if __name__ == "__main__":
190
launch(ExampleSource(), sys.argv[1:])
191
```
192
193
This example demonstrates a basic HTTP source connector that extracts user data from an API with token authentication.