0
# Custom Components
1
2
This section covers the custom Python components developed specifically for handling Pipedrive's API inconsistencies and special requirements.
3
4
## Overview
5
6
The Airbyte Source Pipedrive connector includes custom Python components to handle specific challenges with Pipedrive's API, particularly inconsistent response formats where data fields may be null in certain conditions.
7
8
## NullCheckedDpathExtractor
9
10
The primary custom component that handles Pipedrive's inconsistent API responses.
11
12
```python { .api }
13
@dataclass
14
class NullCheckedDpathExtractor(RecordExtractor):
15
"""
16
Pipedrive requires a custom extractor because the format of its API responses is inconsistent.
17
18
Records are typically found in a nested "data" field, but sometimes the "data" field is null.
19
This extractor checks for null "data" fields and returns the parent object, which contains the record ID, instead.
20
"""
21
22
field_path: List[Union[InterpolatedString, str]]
23
nullable_nested_field: Union[InterpolatedString, str]
24
config: Config
25
parameters: InitVar[Mapping[str, Any]]
26
decoder: Decoder = JsonDecoder(parameters={})
27
28
def __post_init__(self, parameters: Mapping[str, Any]):
29
"""Initialize the internal DpathExtractor after dataclass initialization."""
30
31
def extract_records(self, response: requests.Response) -> List[Mapping[str, Any]]:
32
"""Extract records, handling null data fields by returning parent object."""
33
```
34
35
### Class Definition
36
37
```python { .api }
38
from dataclasses import InitVar, dataclass
39
from typing import Any, List, Mapping, Union
40
41
import requests
42
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
43
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
44
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor
45
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
46
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
47
from airbyte_cdk.sources.declarative.types import Config
48
```
49
50
### Properties
51
52
```python { .api }
53
field_path: List[Union[InterpolatedString, str]]
54
# List of field paths for data extraction
55
# Example: ["data", "*"] to extract all items from data array
56
57
nullable_nested_field: Union[InterpolatedString, str]
58
# The field that may contain null values
59
# Example: "data" - the field that should be checked for null
60
61
config: Config
62
# Configuration object containing connection settings
63
64
parameters: InitVar[Mapping[str, Any]]
65
# Parameters passed during initialization
66
67
decoder: Decoder = JsonDecoder(parameters={})
68
# JSON decoder for response parsing (defaults to JsonDecoder)
69
```
70
71
### Methods
72
73
#### __post_init__
74
75
```python { .api }
76
def __post_init__(self, parameters: Mapping[str, Any]):
77
"""
78
Initialize the internal DpathExtractor after dataclass initialization.
79
80
Args:
81
parameters: Initialization parameters from the manifest
82
"""
83
self._dpath_extractor = DpathExtractor(
84
field_path=self.field_path,
85
config=self.config,
86
parameters=parameters,
87
decoder=self.decoder,
88
)
89
```
90
91
#### extract_records
92
93
```python { .api }
94
def extract_records(self, response: requests.Response) -> List[Mapping[str, Any]]:
95
"""
96
Extract records from API response, handling null data fields.
97
98
Args:
99
response: HTTP response from Pipedrive API
100
101
Returns:
102
List of extracted records, using parent object when data field is null
103
104
Behavior:
105
- Uses internal DpathExtractor to extract records normally
106
- For each record, checks if nullable_nested_field contains null value
107
- If null, returns the parent record object (contains ID)
108
- If not null, returns the nested data as usual
109
"""
110
records = self._dpath_extractor.extract_records(response)
111
return [record.get(self.nullable_nested_field) or record for record in records]
112
```
113
114
## Usage in Streams
115
116
The `NullCheckedDpathExtractor` is used in streams that access Pipedrive's `/v1/recents` endpoint:
117
118
### Stream Configuration
119
120
```yaml { .api }
121
record_selector:
122
type: RecordSelector
123
extractor:
124
type: CustomRecordExtractor
125
class_name: source_declarative_manifest.components.NullCheckedDpathExtractor
126
field_path:
127
- data
128
- "*"
129
nullable_nested_field: data
130
```
131
132
### Streams Using NullCheckedDpathExtractor
133
134
The following streams utilize this custom component:
135
136
- **deals**: Core deal extraction from `/v1/recents?items=deal`
137
- **organizations**: Organization data from `/v1/recents?items=organization`
138
- **persons**: Person data from `/v1/recents?items=person`
139
- **activities**: Activity data from `/v1/recents?items=activity`
140
- **notes**: Note data from `/v1/recents?items=note`
141
- **files**: File data from `/v1/recents?items=file`
142
- **products**: Product data from `/v1/recents?items=product`
143
- **pipelines**: Pipeline configurations from `/v1/pipelines`
144
- **stages**: Stage configurations from `/v1/stages`
145
- **filters**: Filter configurations from `/v1/filters`
146
147
## Problem Solved
148
149
### API Response Inconsistency
150
151
Pipedrive's API sometimes returns responses where the `data` field is null:
152
153
```json
154
{
155
"item": "file",
156
"id": 12345,
157
"data": null
158
}
159
```
160
161
### Standard vs Custom Extraction
162
163
**Standard DpathExtractor behavior**:
164
- Extracts `data` field
165
- Returns `null` when data is null
166
- Loses the record entirely
167
168
**NullCheckedDpathExtractor behavior**:
169
- Extracts `data` field when available
170
- Falls back to parent object when data is null
171
- Preserves record with ID information
172
173
### Example Handling
174
175
```python
176
# Input record with null data
177
record = {
178
"item": "file",
179
"id": 12345,
180
"data": null
181
}
182
183
# Standard extractor would return: null
184
# Custom extractor returns: {"item": "file", "id": 12345, "data": null}
185
```
186
187
## Integration with Airbyte CDK
188
189
The component extends Airbyte CDK's `RecordExtractor` base class:
190
191
```python { .api }
192
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
193
```
194
195
### Base Class Interface
196
197
```python { .api }
198
class RecordExtractor:
199
"""Base class for extracting records from API responses."""
200
201
def extract_records(self, response: requests.Response) -> List[Mapping[str, Any]]:
202
"""Extract records from HTTP response."""
203
pass
204
```
205
206
### Custom Implementation
207
208
The `NullCheckedDpathExtractor` implements this interface while delegating to the standard `DpathExtractor` for normal processing and adding special handling for null data fields.
209
210
## Component Registration
211
212
The component is registered in the manifest as:
213
214
```yaml { .api }
215
type: CustomRecordExtractor
216
class_name: source_declarative_manifest.components.NullCheckedDpathExtractor
217
```
218
219
This allows the Airbyte declarative framework to instantiate and use the custom component during stream processing.