0
# Declarative Low-Code CDK
1
2
YAML-based framework for building connectors without writing Python code. The Declarative CDK supports most common connector patterns through declarative configuration with authentication, pagination, transformations, and incremental sync. This approach allows developers to create robust data connectors using only configuration files.
3
4
## Capabilities
5
6
### Declarative Source Classes
7
8
Core classes for creating low-code connectors using YAML manifests.
9
10
```python { .api }
11
from airbyte_cdk import YamlDeclarativeSource, ManifestDeclarativeSource
12
from typing import Any, Dict, List, Mapping
13
14
class YamlDeclarativeSource(ManifestDeclarativeSource):
15
"""
16
Declarative source defined by a YAML file.
17
"""
18
19
def __init__(self, path_to_yaml: str, debug: bool = False):
20
"""
21
Initialize source from YAML manifest file.
22
23
Args:
24
path_to_yaml: Path to the YAML manifest file
25
debug: Enable debug logging for manifest parsing
26
"""
27
28
class ManifestDeclarativeSource:
29
"""
30
Declarative source defined by a manifest of low-code components.
31
"""
32
33
def __init__(
34
self,
35
source_config: Dict[str, Any],
36
debug: bool = False,
37
emit_connector_builder_messages: bool = False
38
):
39
"""
40
Initialize source from manifest configuration.
41
42
Args:
43
source_config: The manifest configuration dictionary
44
debug: Enable debug logging
45
emit_connector_builder_messages: Enable Connector Builder specific messages
46
"""
47
```
48
49
### Stream Configuration
50
51
Declarative stream definitions using YAML configuration.
52
53
```python { .api }
54
from airbyte_cdk import DeclarativeStream
55
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
56
from airbyte_cdk.sources.declarative.requesters import HttpRequester
57
58
class DeclarativeStream:
59
"""
60
Stream defined through declarative configuration.
61
"""
62
63
def __init__(
64
self,
65
name: str,
66
retriever: SimpleRetriever,
67
config: Mapping[str, Any],
68
parameters: Mapping[str, Any]
69
):
70
"""
71
Initialize declarative stream.
72
73
Args:
74
name: Stream name
75
retriever: Data retriever component
76
config: Stream configuration
77
parameters: Runtime parameters
78
"""
79
80
class SimpleRetriever:
81
"""
82
Retriever that extracts data from HTTP responses.
83
"""
84
85
def __init__(
86
self,
87
requester: HttpRequester,
88
record_selector,
89
paginator = None,
90
partition_router = None
91
):
92
"""
93
Initialize simple retriever.
94
95
Args:
96
requester: HTTP requester for making API calls
97
record_selector: Component to extract records from responses
98
paginator: Optional pagination handler
99
partition_router: Optional partitioning strategy
100
"""
101
```
102
103
### HTTP Requester Components
104
105
Components for making HTTP requests with authentication and error handling.
106
107
```python { .api }
108
from airbyte_cdk import HttpRequester
109
from airbyte_cdk.sources.declarative.requesters.request_options import RequestOption
110
from airbyte_cdk.sources.declarative.auth import DeclarativeAuthenticator
111
112
class HttpRequester:
113
"""
114
HTTP requester for declarative streams.
115
"""
116
117
def __init__(
118
self,
119
name: str,
120
url_base: str,
121
path: str,
122
http_method: str = "GET",
123
request_options_provider = None,
124
authenticator: DeclarativeAuthenticator = None,
125
error_handler = None,
126
config: Mapping[str, Any] = None,
127
parameters: Mapping[str, Any] = None
128
):
129
"""
130
Initialize HTTP requester.
131
132
Args:
133
name: Name of the requester
134
url_base: Base URL for requests
135
path: Request path template
136
http_method: HTTP method (GET, POST, etc.)
137
request_options_provider: Provider for request options
138
authenticator: Authentication handler
139
error_handler: Error handling strategy
140
config: Configuration mapping
141
parameters: Runtime parameters
142
"""
143
144
class RequestOption:
145
"""
146
Configuration for request options (headers, params, body).
147
"""
148
149
def __init__(
150
self,
151
field_name: str,
152
inject_into: str, # "request_parameter", "header", "body_data", "body_json"
153
value: Any
154
):
155
"""
156
Initialize request option.
157
158
Args:
159
field_name: Name of the option field
160
inject_into: Where to inject the option
161
value: Value or value template
162
"""
163
```
164
165
### Authentication Components
166
167
Declarative authentication handlers for various schemes.
168
169
```python { .api }
170
from airbyte_cdk import (
171
NoAuth,
172
BasicHttpAuthenticator,
173
BearerAuthenticator,
174
ApiKeyAuthenticator,
175
DeclarativeOauth2Authenticator
176
)
177
178
class NoAuth:
179
"""
180
No authentication required.
181
"""
182
pass
183
184
class BasicHttpAuthenticator:
185
"""
186
HTTP Basic authentication.
187
"""
188
189
def __init__(self, username: str, password: str):
190
"""
191
Initialize basic authentication.
192
193
Args:
194
username: Username for authentication
195
password: Password for authentication
196
"""
197
198
class BearerAuthenticator:
199
"""
200
Bearer token authentication.
201
"""
202
203
def __init__(self, api_token: str):
204
"""
205
Initialize bearer token auth.
206
207
Args:
208
api_token: Bearer token for authentication
209
"""
210
211
class ApiKeyAuthenticator:
212
"""
213
API key authentication via header or query parameter.
214
"""
215
216
def __init__(
217
self,
218
api_token: str,
219
header: str = None,
220
request_param: str = None
221
):
222
"""
223
Initialize API key authentication.
224
225
Args:
226
api_token: API key value
227
header: Header name for API key (if using header)
228
request_param: Parameter name for API key (if using query param)
229
"""
230
231
class DeclarativeOauth2Authenticator:
232
"""
233
OAuth 2.0 authentication with token refresh.
234
"""
235
236
def __init__(
237
self,
238
token_refresh_endpoint: str,
239
client_id: str,
240
client_secret: str,
241
refresh_token: str,
242
scopes: List[str] = None,
243
token_expiry_date: str = None,
244
access_token: str = None,
245
refresh_request_body: Dict[str, Any] = None
246
):
247
"""
248
Initialize OAuth2 authentication.
249
250
Args:
251
token_refresh_endpoint: URL for refreshing tokens
252
client_id: OAuth client ID
253
client_secret: OAuth client secret
254
refresh_token: Refresh token
255
scopes: OAuth scopes
256
token_expiry_date: When current token expires
257
access_token: Current access token
258
refresh_request_body: Additional refresh request parameters
259
"""
260
```
261
262
### Pagination Strategies
263
264
Components for handling API pagination patterns.
265
266
```python { .api }
267
from airbyte_cdk import DefaultPaginator
268
from airbyte_cdk.sources.declarative.requesters.paginators.strategies import (
269
OffsetIncrement,
270
PageIncrement,
271
CursorPaginationStrategy
272
)
273
274
class DefaultPaginator:
275
"""
276
Default paginator with configurable pagination strategy.
277
"""
278
279
def __init__(
280
self,
281
pagination_strategy,
282
page_size_option: RequestOption = None,
283
page_token_option: RequestOption = None,
284
config: Mapping[str, Any] = None,
285
parameters: Mapping[str, Any] = None
286
):
287
"""
288
Initialize paginator.
289
290
Args:
291
pagination_strategy: Strategy for extracting pagination info
292
page_size_option: How to specify page size in requests
293
page_token_option: How to specify page token in requests
294
config: Configuration mapping
295
parameters: Runtime parameters
296
"""
297
298
class OffsetIncrement:
299
"""
300
Pagination using offset-based increments.
301
"""
302
303
def __init__(self, page_size: int, offset_param: str = "offset"):
304
"""
305
Initialize offset pagination.
306
307
Args:
308
page_size: Number of records per page
309
offset_param: Parameter name for offset
310
"""
311
312
class PageIncrement:
313
"""
314
Pagination using page number increments.
315
"""
316
317
def __init__(self, page_size: int = None, start_from_page: int = 1):
318
"""
319
Initialize page-based pagination.
320
321
Args:
322
page_size: Number of records per page
323
start_from_page: Starting page number
324
"""
325
326
class CursorPaginationStrategy:
327
"""
328
Pagination using cursor tokens.
329
"""
330
331
def __init__(
332
self,
333
cursor_value: str,
334
stop_condition: str = None,
335
page_size: int = None
336
):
337
"""
338
Initialize cursor pagination.
339
340
Args:
341
cursor_value: JSONPath to extract cursor from response
342
stop_condition: Condition to stop pagination
343
page_size: Number of records per page
344
"""
345
```
346
347
### Record Processing
348
349
Components for extracting and transforming records from API responses.
350
351
```python { .api }
352
from airbyte_cdk import RecordSelector, DpathExtractor
353
from airbyte_cdk.sources.declarative.transformations import AddFields, RecordTransformation
354
355
class RecordSelector:
356
"""
357
Selects records from API responses.
358
"""
359
360
def __init__(
361
self,
362
extractor: DpathExtractor,
363
record_filter = None,
364
transformations: List[RecordTransformation] = None
365
):
366
"""
367
Initialize record selector.
368
369
Args:
370
extractor: Component to extract records from responses
371
record_filter: Optional filter for records
372
transformations: List of transformations to apply
373
"""
374
375
class DpathExtractor:
376
"""
377
Extracts records using JSONPath expressions.
378
"""
379
380
def __init__(self, field_path: List[str], config: Mapping[str, Any] = None):
381
"""
382
Initialize dpath extractor.
383
384
Args:
385
field_path: JSONPath to records in response
386
config: Configuration mapping
387
"""
388
389
class AddFields:
390
"""
391
Transformation that adds fields to records.
392
"""
393
394
def __init__(self, fields: List[dict]):
395
"""
396
Initialize add fields transformation.
397
398
Args:
399
fields: List of field definitions to add
400
"""
401
```
402
403
## Usage Examples
404
405
### Basic YAML Manifest
406
407
```yaml
408
version: "0.29.0"
409
type: DeclarativeSource
410
411
check:
412
type: CheckStream
413
stream_names: ["users"]
414
415
streams:
416
- type: DeclarativeStream
417
name: users
418
primary_key: ["id"]
419
retriever:
420
type: SimpleRetriever
421
requester:
422
type: HttpRequester
423
url_base: "https://api.example.com/v1/"
424
path: "users"
425
http_method: "GET"
426
authenticator:
427
type: BearerAuthenticator
428
api_token: "{{ config['api_token'] }}"
429
record_selector:
430
type: RecordSelector
431
extractor:
432
type: DpathExtractor
433
field_path: ["data"]
434
435
spec:
436
type: Spec
437
connection_specification:
438
type: object
439
properties:
440
api_token:
441
type: string
442
title: API Token
443
airbyte_secret: true
444
required:
445
- api_token
446
```
447
448
### Python Implementation
449
450
```python
451
from airbyte_cdk import YamlDeclarativeSource
452
453
class ExampleSource(YamlDeclarativeSource):
454
def __init__(self):
455
super().__init__(path_to_yaml="manifest.yaml")
456
457
# The source is now fully functional with the YAML manifest
458
```
459
460
### Advanced Manifest with Pagination and Incremental Sync
461
462
```yaml
463
version: "0.29.0"
464
type: DeclarativeSource
465
466
definitions:
467
base_requester:
468
type: HttpRequester
469
url_base: "https://api.example.com/v1/"
470
authenticator:
471
type: BearerAuthenticator
472
api_token: "{{ config['api_token'] }}"
473
474
base_retriever:
475
type: SimpleRetriever
476
requester:
477
$ref: "#/definitions/base_requester"
478
record_selector:
479
type: RecordSelector
480
extractor:
481
type: DpathExtractor
482
field_path: ["data"]
483
paginator:
484
type: DefaultPaginator
485
pagination_strategy:
486
type: CursorPaginationStrategy
487
cursor_value: "{{ response.get('next_page_token') }}"
488
stop_condition: "{{ not response.get('next_page_token') }}"
489
page_token_option:
490
type: RequestOption
491
field_name: "page_token"
492
inject_into: "request_parameter"
493
494
streams:
495
- type: DeclarativeStream
496
name: orders
497
primary_key: ["id"]
498
retriever:
499
$ref: "#/definitions/base_retriever"
500
requester:
501
$ref: "#/definitions/base_requester"
502
path: "orders"
503
request_parameters:
504
updated_since: "{{ stream_state.get('updated_at', config.get('start_date')) }}"
505
incremental_sync:
506
type: DatetimeBasedCursor
507
cursor_field: "updated_at"
508
datetime_format: "%Y-%m-%dT%H:%M:%SZ"
509
cursor_granularity: "PT1S"
510
start_datetime:
511
type: MinMaxDatetime
512
datetime: "{{ config['start_date'] }}"
513
datetime_format: "%Y-%m-%d"
514
515
check:
516
type: CheckStream
517
stream_names: ["orders"]
518
519
spec:
520
type: Spec
521
connection_specification:
522
type: object
523
properties:
524
api_token:
525
type: string
526
title: API Token
527
airbyte_secret: true
528
start_date:
529
type: string
530
title: Start Date
531
format: date
532
pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}$"
533
required:
534
- api_token
535
- start_date
536
```
537
538
### OAuth2 Authentication Manifest
539
540
```yaml
541
version: "0.29.0"
542
type: DeclarativeSource
543
544
definitions:
545
oauth_authenticator:
546
type: DeclarativeOauth2Authenticator
547
token_refresh_endpoint: "https://api.example.com/oauth/token"
548
client_id: "{{ config['client_id'] }}"
549
client_secret: "{{ config['client_secret'] }}"
550
refresh_token: "{{ config['refresh_token'] }}"
551
scopes: ["read:users", "read:data"]
552
553
streams:
554
- type: DeclarativeStream
555
name: protected_data
556
retriever:
557
type: SimpleRetriever
558
requester:
559
type: HttpRequester
560
url_base: "https://api.example.com/v1/"
561
path: "protected"
562
authenticator:
563
$ref: "#/definitions/oauth_authenticator"
564
record_selector:
565
type: RecordSelector
566
extractor:
567
type: DpathExtractor
568
field_path: ["items"]
569
570
spec:
571
type: Spec
572
connection_specification:
573
type: object
574
properties:
575
client_id:
576
type: string
577
title: Client ID
578
client_secret:
579
type: string
580
title: Client Secret
581
airbyte_secret: true
582
refresh_token:
583
type: string
584
title: Refresh Token
585
airbyte_secret: true
586
required:
587
- client_id
588
- client_secret
589
- refresh_token
590
```
591
592
### Record Transformations
593
594
```yaml
595
streams:
596
- type: DeclarativeStream
597
name: transformed_users
598
retriever:
599
type: SimpleRetriever
600
requester:
601
type: HttpRequester
602
url_base: "https://api.example.com/v1/"
603
path: "users"
604
record_selector:
605
type: RecordSelector
606
extractor:
607
type: DpathExtractor
608
field_path: ["users"]
609
transformations:
610
- type: AddFields
611
fields:
612
- path: ["source"]
613
value: "api"
614
- path: ["extracted_at"]
615
value: "{{ now_utc() }}"
616
- path: ["full_name"]
617
value: "{{ record['first_name'] }} {{ record['last_name'] }}"
618
```
619
620
### Sub-streams (Parent-Child Relationships)
621
622
```yaml
623
streams:
624
- type: DeclarativeStream
625
name: users
626
primary_key: ["id"]
627
retriever:
628
type: SimpleRetriever
629
requester:
630
type: HttpRequester
631
url_base: "https://api.example.com/v1/"
632
path: "users"
633
record_selector:
634
type: RecordSelector
635
extractor:
636
type: DpathExtractor
637
field_path: ["users"]
638
639
- type: DeclarativeStream
640
name: user_posts
641
primary_key: ["id"]
642
retriever:
643
type: SimpleRetriever
644
requester:
645
type: HttpRequester
646
url_base: "https://api.example.com/v1/"
647
path: "users/{{ stream_slice.user_id }}/posts"
648
record_selector:
649
type: RecordSelector
650
extractor:
651
type: DpathExtractor
652
field_path: ["posts"]
653
partition_router:
654
type: SubstreamPartitionRouter
655
parent_stream_configs:
656
- stream: "#/streams/0" # Reference to users stream
657
parent_key: "id"
658
partition_field: "user_id"
659
```