0
# Core Entity Streams
1
2
This section covers the primary business entity streams that extract core data from Pipedrive with incremental synchronization capabilities.
3
4
## Overview
5
6
Core entity streams represent the main business objects in Pipedrive CRM. These streams support incremental synchronization using the `update_time` cursor field and utilize the custom `NullCheckedDpathExtractor` to handle Pipedrive's inconsistent API responses.
7
8
## Common Stream Configuration
9
10
All core entity streams share this base configuration pattern:
11
12
```yaml { .api }
13
# Base pattern for core entity streams
14
core_entity_stream:
15
type: DeclarativeStream
16
primary_key: [id]
17
retriever:
18
type: SimpleRetriever
19
requester:
20
$ref: "#/definitions/base_requester"
21
path: v1/recents
22
http_method: GET
23
request_parameters:
24
api_token: "{{ config['api_token'] }}"
25
limit: "50"
26
items: "{entity_type}"
27
record_selector:
28
type: RecordSelector
29
extractor:
30
type: CustomRecordExtractor
31
class_name: source_declarative_manifest.components.NullCheckedDpathExtractor
32
field_path: ["data", "*"]
33
nullable_nested_field: data
34
paginator:
35
type: DefaultPaginator
36
page_token_option:
37
type: RequestOption
38
inject_into: request_parameter
39
field_name: start
40
pagination_strategy:
41
type: CursorPagination
42
cursor_value: "{{ response['additional_data']['pagination']['next_start'] }}"
43
incremental_sync:
44
type: DatetimeBasedCursor
45
cursor_field: update_time
46
cursor_datetime_formats: ["%Y-%m-%d %H:%M:%S"]
47
datetime_format: "%Y-%m-%d %H:%M:%S"
48
start_datetime:
49
type: MinMaxDatetime
50
datetime: "{{ format_datetime(config['replication_start_date'], '%Y-%m-%d %H:%M:%S') }}"
51
datetime_format: "%Y-%m-%d %H:%M:%S"
52
start_time_option:
53
type: RequestOption
54
field_name: since_timestamp
55
inject_into: request_parameter
56
```
57
58
## Individual Entity Streams
59
60
### Deals
61
62
Extracts deal data from Pipedrive including deal information, values, and status.
63
64
```yaml { .api }
65
deals:
66
type: DeclarativeStream
67
name: deals
68
primary_key: [id]
69
retriever:
70
type: SimpleRetriever
71
requester:
72
$ref: "#/definitions/base_requester"
73
path: v1/recents
74
request_parameters:
75
api_token: "{{ config['api_token'] }}"
76
limit: "50"
77
items: deal
78
record_selector:
79
type: RecordSelector
80
extractor:
81
type: CustomRecordExtractor
82
class_name: source_declarative_manifest.components.NullCheckedDpathExtractor
83
field_path: ["data", "*"]
84
nullable_nested_field: data
85
incremental_sync:
86
type: DatetimeBasedCursor
87
cursor_field: update_time
88
schema_loader:
89
type: InlineSchemaLoader
90
schema:
91
$ref: "#/schemas/deals"
92
```
93
94
**Sync Modes**: full_refresh, incremental
95
**API Endpoint**: `v1/recents?items=deal`
96
97
98
### Persons
99
100
Extracts person/contact data from Pipedrive.
101
102
```yaml { .api }
103
persons:
104
type: DeclarativeStream
105
name: persons
106
primary_key: [id]
107
retriever:
108
requester:
109
path: v1/recents
110
request_parameters:
111
items: person
112
incremental_sync:
113
cursor_field: update_time
114
```
115
116
**Sync Modes**: full_refresh, incremental
117
**API Endpoint**: `v1/recents?items=person`
118
119
### Activities
120
121
Extracts activity data including calls, meetings, emails, and tasks.
122
123
```yaml { .api }
124
activities:
125
type: DeclarativeStream
126
name: activities
127
primary_key: [id]
128
retriever:
129
requester:
130
path: v1/recents
131
request_parameters:
132
items: activity
133
incremental_sync:
134
cursor_field: update_time
135
```
136
137
**Sync Modes**: full_refresh, incremental
138
**API Endpoint**: `v1/recents?items=activity`
139
140
### Notes
141
142
Extracts note data associated with deals, organizations, or persons.
143
144
```yaml { .api }
145
notes:
146
type: DeclarativeStream
147
name: notes
148
primary_key: [id]
149
retriever:
150
requester:
151
path: v1/recents
152
request_parameters:
153
items: note
154
incremental_sync:
155
cursor_field: update_time
156
```
157
158
**Sync Modes**: full_refresh, incremental
159
**API Endpoint**: `v1/recents?items=note`
160
161
### Files
162
163
Extracts file attachment data from Pipedrive.
164
165
```yaml { .api }
166
files:
167
type: DeclarativeStream
168
name: files
169
primary_key: [id]
170
retriever:
171
requester:
172
path: v1/recents
173
request_parameters:
174
items: file
175
incremental_sync:
176
cursor_field: update_time
177
```
178
179
**Sync Modes**: full_refresh, incremental
180
**API Endpoint**: `v1/recents?items=file`
181
182
### Products
183
184
Extracts product catalog data from Pipedrive.
185
186
```yaml { .api }
187
products:
188
type: DeclarativeStream
189
name: products
190
primary_key: [id]
191
retriever:
192
requester:
193
path: v1/recents
194
request_parameters:
195
items: product
196
incremental_sync:
197
cursor_field: update_time
198
```
199
200
**Sync Modes**: full_refresh, incremental
201
**API Endpoint**: `v1/recents?items=product`
202
203
### Leads
204
205
Extracts lead data from Pipedrive's lead management system.
206
207
```yaml { .api }
208
leads:
209
type: DeclarativeStream
210
name: leads
211
primary_key: [id]
212
retriever:
213
type: SimpleRetriever
214
requester:
215
path: v1/leads
216
request_parameters:
217
api_token: "{{ config['api_token'] }}"
218
limit: "50"
219
paginator:
220
type: DefaultPaginator
221
pagination_strategy:
222
type: CursorPagination
223
cursor_value: "{{ response['additional_data']['pagination']['next_start'] }}"
224
```
225
226
**Sync Modes**: full_refresh
227
**API Endpoint**: `v1/leads`
228
229
## Incremental Sync Configuration
230
231
All core entity streams (except leads) support incremental synchronization:
232
233
- **Cursor Field**: `update_time`
234
- **Date Format**: `%Y-%m-%d %H:%M:%S`
235
- **Start Parameter**: `since_timestamp`
236
- **Incremental Support**: Uses Pipedrive's timestamp-based filtering
237
238
## Pagination
239
240
Core entity streams use cursor-based pagination:
241
242
- **Page Size**: 50 records per request
243
- **Token Field**: `start` (request parameter)
244
- **Next Token**: `response['additional_data']['pagination']['next_start']`
245
246
## Data Extraction
247
248
All core entity streams use the `NullCheckedDpathExtractor` to handle Pipedrive's API inconsistencies where the `data` field may be null. When `data` is null, the extractor returns the parent object containing the record ID.