or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdcore-entities.mdcustom-components.mdindex.mdmetadata-streams.mdrelationship-streams.md

custom-components.mddocs/

0

# Custom Components

1

2

This section covers the custom Python components developed specifically for handling Pipedrive's API inconsistencies and special requirements.

3

4

## Overview

5

6

The Airbyte Source Pipedrive connector includes custom Python components to handle specific challenges with Pipedrive's API, particularly inconsistent response formats where data fields may be null in certain conditions.

7

8

## NullCheckedDpathExtractor

9

10

The primary custom component that handles Pipedrive's inconsistent API responses.

11

12

```python { .api }

13

@dataclass

14

class NullCheckedDpathExtractor(RecordExtractor):

15

"""

16

Pipedrive requires a custom extractor because the format of its API responses is inconsistent.

17

18

Records are typically found in a nested "data" field, but sometimes the "data" field is null.

19

This extractor checks for null "data" fields and returns the parent object, which contains the record ID, instead.

20

"""

21

22

field_path: List[Union[InterpolatedString, str]]

23

nullable_nested_field: Union[InterpolatedString, str]

24

config: Config

25

parameters: InitVar[Mapping[str, Any]]

26

decoder: Decoder = JsonDecoder(parameters={})

27

28

def __post_init__(self, parameters: Mapping[str, Any]):

29

"""Initialize the internal DpathExtractor after dataclass initialization."""

30

31

def extract_records(self, response: requests.Response) -> List[Mapping[str, Any]]:

32

"""Extract records, handling null data fields by returning parent object."""

33

```

34

35

### Class Definition

36

37

```python { .api }

38

from dataclasses import InitVar, dataclass

39

from typing import Any, List, Mapping, Union

40

41

import requests

42

from airbyte_cdk.sources.declarative.decoders.decoder import Decoder

43

from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder

44

from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor

45

from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor

46

from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString

47

from airbyte_cdk.sources.declarative.types import Config

48

```

49

50

### Properties

51

52

```python { .api }

53

field_path: List[Union[InterpolatedString, str]]

54

# List of field paths for data extraction

55

# Example: ["data", "*"] to extract all items from data array

56

57

nullable_nested_field: Union[InterpolatedString, str]

58

# The field that may contain null values

59

# Example: "data" - the field that should be checked for null

60

61

config: Config

62

# Configuration object containing connection settings

63

64

parameters: InitVar[Mapping[str, Any]]

65

# Parameters passed during initialization

66

67

decoder: Decoder = JsonDecoder(parameters={})

68

# JSON decoder for response parsing (defaults to JsonDecoder)

69

```

70

71

### Methods

72

73

#### __post_init__

74

75

```python { .api }

76

def __post_init__(self, parameters: Mapping[str, Any]):

77

"""

78

Initialize the internal DpathExtractor after dataclass initialization.

79

80

Args:

81

parameters: Initialization parameters from the manifest

82

"""

83

self._dpath_extractor = DpathExtractor(

84

field_path=self.field_path,

85

config=self.config,

86

parameters=parameters,

87

decoder=self.decoder,

88

)

89

```

90

91

#### extract_records

92

93

```python { .api }

94

def extract_records(self, response: requests.Response) -> List[Mapping[str, Any]]:

95

"""

96

Extract records from API response, handling null data fields.

97

98

Args:

99

response: HTTP response from Pipedrive API

100

101

Returns:

102

List of extracted records, using parent object when data field is null

103

104

Behavior:

105

- Uses internal DpathExtractor to extract records normally

106

- For each record, checks if nullable_nested_field contains null value

107

- If null, returns the parent record object (contains ID)

108

- If not null, returns the nested data as usual

109

"""

110

records = self._dpath_extractor.extract_records(response)

111

return [record.get(self.nullable_nested_field) or record for record in records]

112

```

113

114

## Usage in Streams

115

116

The `NullCheckedDpathExtractor` is used in streams that access Pipedrive's `/v1/recents` endpoint:

117

118

### Stream Configuration

119

120

```yaml { .api }

121

record_selector:

122

type: RecordSelector

123

extractor:

124

type: CustomRecordExtractor

125

class_name: source_declarative_manifest.components.NullCheckedDpathExtractor

126

field_path:

127

- data

128

- "*"

129

nullable_nested_field: data

130

```

131

132

### Streams Using NullCheckedDpathExtractor

133

134

The following streams utilize this custom component:

135

136

- **deals**: Core deal extraction from `/v1/recents?items=deal`

137

- **organizations**: Organization data from `/v1/recents?items=organization`

138

- **persons**: Person data from `/v1/recents?items=person`

139

- **activities**: Activity data from `/v1/recents?items=activity`

140

- **notes**: Note data from `/v1/recents?items=note`

141

- **files**: File data from `/v1/recents?items=file`

142

- **products**: Product data from `/v1/recents?items=product`

143

- **pipelines**: Pipeline configurations from `/v1/pipelines`

144

- **stages**: Stage configurations from `/v1/stages`

145

- **filters**: Filter configurations from `/v1/filters`

146

147

## Problem Solved

148

149

### API Response Inconsistency

150

151

Pipedrive's API sometimes returns responses where the `data` field is null:

152

153

```json

154

{

155

"item": "file",

156

"id": 12345,

157

"data": null

158

}

159

```

160

161

### Standard vs Custom Extraction

162

163

**Standard DpathExtractor behavior**:

164

- Extracts `data` field

165

- Returns `null` when data is null

166

- Loses the record entirely

167

168

**NullCheckedDpathExtractor behavior**:

169

- Extracts `data` field when available

170

- Falls back to parent object when data is null

171

- Preserves record with ID information

172

173

### Example Handling

174

175

```python

176

# Input record with null data

177

record = {

178

"item": "file",

179

"id": 12345,

180

"data": null

181

}

182

183

# Standard extractor would return: null

184

# Custom extractor returns: {"item": "file", "id": 12345, "data": null}

185

```

186

187

## Integration with Airbyte CDK

188

189

The component extends Airbyte CDK's `RecordExtractor` base class:

190

191

```python { .api }

192

from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor

193

```

194

195

### Base Class Interface

196

197

```python { .api }

198

class RecordExtractor:

199

"""Base class for extracting records from API responses."""

200

201

def extract_records(self, response: requests.Response) -> List[Mapping[str, Any]]:

202

"""Extract records from HTTP response."""

203

pass

204

```

205

206

### Custom Implementation

207

208

The `NullCheckedDpathExtractor` implements this interface while delegating to the standard `DpathExtractor` for normal processing and adding special handling for null data fields.

209

210

## Component Registration

211

212

The component is registered in the manifest as:

213

214

```yaml { .api }

215

type: CustomRecordExtractor

216

class_name: source_declarative_manifest.components.NullCheckedDpathExtractor

217

```

218

219

This allows the Airbyte declarative framework to instantiate and use the custom component during stream processing.