0
# Source Connectors
1
2
Framework for building data extraction connectors with support for HTTP APIs, databases, and files. The Source connector framework provides a structured approach to implementing data ingestion with built-in stream management, incremental synchronization, authentication, error handling, and state management.
3
4
## Capabilities
5
6
### Base Source Classes
7
8
Core classes for implementing source connectors that extract data from external systems.
9
10
```python { .api }
11
from airbyte_cdk import Source
12
from airbyte_cdk.models import AirbyteCatalog, AirbyteMessage, ConfiguredAirbyteCatalog
13
from typing import Any, Iterable, List, Mapping, Optional, Tuple
14
import logging
15
16
class Source:
17
"""
18
Base class for Airbyte source connectors.
19
"""
20
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
21
"""
22
Test connection validity with given configuration.
23
24
Args:
25
logger: Logger instance for outputting messages
26
config: Configuration dictionary containing connection parameters
27
28
Returns:
29
Tuple of (success_boolean, error_message_or_none)
30
"""
31
32
def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
33
"""
34
Discover available streams and their schemas.
35
36
Args:
37
logger: Logger instance
38
config: Configuration dictionary
39
40
Returns:
41
AirbyteCatalog containing available streams and their schemas
42
"""
43
44
def read(
45
self,
46
logger: logging.Logger,
47
config: Mapping[str, Any],
48
catalog: ConfiguredAirbyteCatalog,
49
state: Optional[Mapping[str, Any]] = None
50
) -> Iterable[AirbyteMessage]:
51
"""
52
Read data from the source.
53
54
Args:
55
logger: Logger instance
56
config: Configuration dictionary
57
catalog: Configured catalog specifying which streams to read
58
state: Optional state for incremental reads
59
60
Yields:
61
AirbyteMessage instances containing records, state, or logs
62
"""
63
64
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
65
"""
66
Return list of streams for this source.
67
68
Args:
69
config: Configuration dictionary
70
71
Returns:
72
List of Stream instances available in this source
73
"""
74
```
75
76
### HTTP Stream Classes
77
78
Classes for building HTTP-based source connectors with built-in pagination, authentication, and error handling.
79
80
```python { .api }
81
from airbyte_cdk import HttpStream, HttpSubStream
82
from airbyte_cdk.sources.streams.core import Stream
83
from requests.auth import AuthBase
84
from typing import Any, Iterable, Mapping, Optional
85
86
class HttpStream(Stream):
87
"""
88
Base class for HTTP API data extraction streams.
89
"""
90
91
def __init__(self, authenticator: Optional[AuthBase] = None):
92
"""
93
Initialize HTTP stream.
94
95
Args:
96
authenticator: Authentication handler for HTTP requests
97
"""
98
99
@property
100
def url_base(self) -> str:
101
"""
102
Base URL for the API endpoint.
103
Example: "https://api.example.com/v1/"
104
"""
105
106
def path(self, **kwargs) -> str:
107
"""
108
Return the API endpoint path for this stream.
109
110
Returns:
111
Path component of the URL (e.g., "users", "posts")
112
"""
113
114
def request_params(self, **kwargs) -> Mapping[str, Any]:
115
"""
116
Return query parameters for the request.
117
118
Returns:
119
Dictionary of query parameters
120
"""
121
122
def request_headers(self, **kwargs) -> Mapping[str, Any]:
123
"""
124
Return headers for the request.
125
126
Returns:
127
Dictionary of HTTP headers
128
"""
129
130
def parse_response(self, response, **kwargs) -> Iterable[Mapping]:
131
"""
132
Parse HTTP response into records.
133
134
Args:
135
response: HTTP response object
136
137
Yields:
138
Dictionary records extracted from the response
139
"""
140
141
def next_page_token(self, response) -> Optional[Mapping[str, Any]]:
142
"""
143
Extract next page token for pagination.
144
145
Args:
146
response: HTTP response object
147
148
Returns:
149
Token for next page or None if no more pages
150
"""
151
152
class HttpSubStream(HttpStream):
153
"""
154
HTTP stream that depends on data from a parent stream.
155
"""
156
157
def __init__(self, parent: HttpStream, **kwargs):
158
"""
159
Initialize sub-stream with parent dependency.
160
161
Args:
162
parent: Parent stream that provides data for this sub-stream
163
"""
164
```
165
166
### Authentication
167
168
Authentication handlers for various HTTP authentication schemes.
169
170
```python { .api }
171
from airbyte_cdk import TokenAuthenticator, Oauth2Authenticator, BasicHttpAuthenticator
172
from requests.auth import AuthBase
173
174
class TokenAuthenticator(AuthBase):
175
"""
176
Authentication using API tokens in headers.
177
"""
178
179
def __init__(self, token: str, auth_method: str = "Bearer", auth_header: str = "Authorization"):
180
"""
181
Initialize token authenticator.
182
183
Args:
184
token: API token value
185
auth_method: Authentication method (e.g., "Bearer", "Token")
186
auth_header: Header name for authentication
187
"""
188
189
class Oauth2Authenticator(AuthBase):
190
"""
191
OAuth 2.0 authentication with automatic token refresh.
192
"""
193
194
def __init__(
195
self,
196
token_refresh_endpoint: str,
197
client_id: str,
198
client_secret: str,
199
refresh_token: str,
200
scopes: Optional[List[str]] = None,
201
token_expiry_date: Optional[str] = None,
202
access_token: Optional[str] = None
203
):
204
"""
205
Initialize OAuth2 authenticator.
206
207
Args:
208
token_refresh_endpoint: URL for token refresh
209
client_id: OAuth client ID
210
client_secret: OAuth client secret
211
refresh_token: Refresh token for obtaining access tokens
212
scopes: Optional list of OAuth scopes
213
token_expiry_date: When current access token expires
214
access_token: Current access token
215
"""
216
217
class BasicHttpAuthenticator(AuthBase):
218
"""
219
HTTP Basic authentication.
220
"""
221
222
def __init__(self, username: str, password: str):
223
"""
224
Initialize basic authentication.
225
226
Args:
227
username: Username for basic auth
228
password: Password for basic auth
229
"""
230
```
231
232
### Stream State Management
233
234
Classes for managing incremental synchronization state.
235
236
```python { .api }
237
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
238
from airbyte_cdk.sources.streams.core import IncrementalMixin
239
from typing import Any, Mapping, Optional
240
241
class IncrementalMixin:
242
"""
243
Mixin for streams that support incremental synchronization.
244
"""
245
246
@property
247
def cursor_field(self) -> str:
248
"""
249
Field name used as cursor for incremental sync.
250
251
Returns:
252
Name of the cursor field (e.g., "updated_at", "id")
253
"""
254
255
def get_updated_state(
256
self,
257
current_stream_state: Mapping[str, Any],
258
latest_record: Mapping[str, Any]
259
) -> Mapping[str, Any]:
260
"""
261
Update stream state based on the latest record.
262
263
Args:
264
current_stream_state: Current state for this stream
265
latest_record: Latest record processed
266
267
Returns:
268
Updated state dictionary
269
"""
270
271
class ConnectorStateManager:
272
"""
273
Manages state across all streams in a connector.
274
"""
275
276
def get_stream_state(self, stream_name: str) -> Mapping[str, Any]:
277
"""
278
Get state for a specific stream.
279
280
Args:
281
stream_name: Name of the stream
282
283
Returns:
284
State dictionary for the stream
285
"""
286
287
def update_state_for_stream(
288
self,
289
stream_name: str,
290
state: Mapping[str, Any]
291
) -> None:
292
"""
293
Update state for a specific stream.
294
295
Args:
296
stream_name: Name of the stream
297
state: New state dictionary
298
"""
299
```
300
301
## Usage Examples
302
303
### Basic HTTP Source
304
305
```python
306
from airbyte_cdk import Source, HttpStream
307
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
308
import logging
309
from typing import Any, Mapping
310
311
class UsersStream(HttpStream):
312
url_base = "https://api.example.com/v1/"
313
primary_key = "id"
314
315
def __init__(self, config: Mapping[str, Any]):
316
authenticator = TokenAuthenticator(token=config["api_token"])
317
super().__init__(authenticator=authenticator)
318
self._config = config
319
320
def path(self, **kwargs) -> str:
321
return "users"
322
323
def parse_response(self, response, **kwargs):
324
data = response.json()
325
for user in data.get("users", []):
326
yield user
327
328
class ExampleSource(Source):
329
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]):
330
try:
331
# Test API connection
332
stream = UsersStream(config)
333
# Perform test request
334
return True, None
335
except Exception as e:
336
return False, str(e)
337
338
def streams(self, config: Mapping[str, Any]):
339
return [UsersStream(config)]
340
341
# Usage
342
source = ExampleSource()
343
config = {"api_token": "your_token_here"}
344
success, error = source.check_connection(logging.getLogger(), config)
345
```
346
347
### Incremental Stream with Pagination
348
349
```python
350
from airbyte_cdk import HttpStream
351
from airbyte_cdk.sources.streams.core import IncrementalMixin
352
from datetime import datetime
353
from typing import Any, Mapping, Optional
354
355
class OrdersStream(HttpStream, IncrementalMixin):
356
url_base = "https://api.example.com/v1/"
357
primary_key = "id"
358
cursor_field = "updated_at"
359
360
def path(self, **kwargs) -> str:
361
return "orders"
362
363
def request_params(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Mapping[str, Any]:
364
params = {"limit": 100}
365
366
# Add cursor for incremental sync
367
if stream_state and self.cursor_field in stream_state:
368
params["updated_since"] = stream_state[self.cursor_field]
369
370
# Add pagination token
371
if kwargs.get("next_page_token"):
372
params["page_token"] = kwargs["next_page_token"]
373
374
return params
375
376
def next_page_token(self, response) -> Optional[str]:
377
data = response.json()
378
return data.get("next_page_token")
379
380
def parse_response(self, response, **kwargs):
381
data = response.json()
382
for order in data.get("orders", []):
383
yield order
384
385
def get_updated_state(self, current_stream_state: Mapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
386
current_cursor = current_stream_state.get(self.cursor_field, "")
387
latest_cursor = latest_record.get(self.cursor_field, "")
388
389
return {self.cursor_field: max(current_cursor, latest_cursor)}
390
```
391
392
### OAuth2 Authentication
393
394
```python
395
from airbyte_cdk import HttpStream
396
from airbyte_cdk.sources.streams.http.requests_native_auth import Oauth2Authenticator
397
398
class AuthenticatedStream(HttpStream):
399
url_base = "https://api.example.com/v1/"
400
401
def __init__(self, config: Mapping[str, Any]):
402
authenticator = Oauth2Authenticator(
403
token_refresh_endpoint="https://api.example.com/oauth/token",
404
client_id=config["client_id"],
405
client_secret=config["client_secret"],
406
refresh_token=config["refresh_token"]
407
)
408
super().__init__(authenticator=authenticator)
409
```
410
411
### Sub-stream Implementation
412
413
```python
414
from airbyte_cdk import HttpSubStream
415
416
class UserPostsStream(HttpSubStream):
417
def __init__(self, parent: UsersStream, **kwargs):
418
super().__init__(parent=parent, **kwargs)
419
420
def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
421
user_id = stream_slice["parent"]["id"]
422
return f"users/{user_id}/posts"
423
424
def stream_slices(self, **kwargs):
425
# Use parent stream records as slices
426
for user in self.parent.read_records(**kwargs):
427
yield {"parent": user}
428
429
def parse_response(self, response, **kwargs):
430
data = response.json()
431
for post in data.get("posts", []):
432
yield post
433
```