0
# Relationship and Communication Streams
1
2
This section covers streams that handle relationships between entities and communication data in Pipedrive.
3
4
## Overview
5
6
Relationship and communication streams extract data that connects different entities together or represents communication activities. These streams handle complex data relationships and may use partition routing to extract related data.
7
8
## Relationship Streams
9
10
### Deal Products
11
12
Extracts products associated with specific deals, showing which products are included in each deal along with quantities and pricing.
13
14
```yaml { .api }
15
deal_products:
16
type: DeclarativeStream
17
name: deal_products
18
primary_key: [id]
19
retriever:
20
type: SimpleRetriever
21
requester:
22
$ref: "#/definitions/base_requester"
23
path: v1/deals/{{ stream_partition.parent_id }}/products
24
http_method: GET
25
request_parameters:
26
api_token: "{{ config['api_token'] }}"
27
limit: "50"
28
record_selector:
29
type: RecordSelector
30
extractor:
31
type: DpathExtractor
32
field_path: ["data"]
33
paginator:
34
type: DefaultPaginator
35
page_token_option:
36
type: RequestOption
37
inject_into: request_parameter
38
field_name: start
39
pagination_strategy:
40
type: CursorPagination
41
cursor_value: "{{ response['additional_data']['pagination']['next_start'] }}"
42
partition_router:
43
type: SubstreamPartitionRouter
44
parent_stream_configs:
45
- type: ParentStreamConfig
46
parent_key: id
47
partition_field: parent_id
48
stream:
49
$ref: "#/definitions/streams/deals"
50
schema_loader:
51
type: InlineSchemaLoader
52
schema:
53
$ref: "#/schemas/deal_products"
54
```
55
56
**Sync Modes**: full_refresh
57
**API Endpoint**: `v1/deals/{deal_id}/products`
58
**Parent Stream**: deals
59
**Relationship Type**: One-to-many (one deal can have multiple products)
60
61
This stream uses a `SubstreamPartitionRouter` to iterate through all deals and extract associated products for each deal.
62
63
## Communication Streams
64
65
These streams extract communication-related data including emails and message threads.
66
67
68
69
Extracts individual email messages from Pipedrive's mailbox integration.
70
71
```yaml { .api }
72
mail:
73
type: DeclarativeStream
74
name: mail
75
primary_key: [id]
76
retriever:
77
type: SimpleRetriever
78
requester:
79
$ref: "#/definitions/base_requester"
80
path: v1/mailbox/mailMessages
81
http_method: GET
82
request_parameters:
83
api_token: "{{ config['api_token'] }}"
84
limit: "50"
85
record_selector:
86
type: RecordSelector
87
extractor:
88
type: DpathExtractor
89
field_path: ["data"]
90
paginator:
91
type: DefaultPaginator
92
page_token_option:
93
type: RequestOption
94
inject_into: request_parameter
95
field_name: start
96
pagination_strategy:
97
type: CursorPagination
98
cursor_value: "{{ response['additional_data']['pagination']['next_start'] }}"
99
schema_loader:
100
type: InlineSchemaLoader
101
schema:
102
$ref: "#/schemas/mail"
103
```
104
105
**Sync Modes**: full_refresh
106
**API Endpoint**: `v1/mailbox/mailMessages`
107
108
### Mail Threads
109
110
Extracts email conversation threads from Pipedrive's mailbox integration.
111
112
```yaml { .api }
113
mailThreads:
114
type: DeclarativeStream
115
name: mailThreads
116
primary_key: [id]
117
retriever:
118
type: SimpleRetriever
119
requester:
120
$ref: "#/definitions/base_requester"
121
path: v1/mailbox/mailThreads
122
http_method: GET
123
request_parameters:
124
api_token: "{{ config['api_token'] }}"
125
limit: "50"
126
record_selector:
127
type: RecordSelector
128
extractor:
129
type: DpathExtractor
130
field_path: ["data"]
131
paginator:
132
type: DefaultPaginator
133
page_token_option:
134
type: RequestOption
135
inject_into: request_parameter
136
field_name: start
137
pagination_strategy:
138
type: CursorPagination
139
cursor_value: "{{ response['additional_data']['pagination']['next_start'] }}"
140
schema_loader:
141
type: InlineSchemaLoader
142
schema:
143
$ref: "#/schemas/mailThreads"
144
```
145
146
**Sync Modes**: full_refresh
147
**API Endpoint**: `v1/mailbox/mailThreads`
148
149
## Partition Routing
150
151
The `deal_products` stream demonstrates how partition routing works for relationship data:
152
153
### Partition Router Configuration
154
155
```yaml { .api }
156
partition_router:
157
type: SubstreamPartitionRouter
158
parent_stream_configs:
159
- type: ParentStreamConfig
160
parent_key: id
161
partition_field: parent_id
162
stream:
163
$ref: "#/definitions/streams/deals"
164
```
165
166
**Components**:
167
- **parent_key**: Field from parent stream used as identifier (`id` from deals)
168
- **partition_field**: Variable name used in child stream path (`parent_id`)
169
- **stream**: Reference to parent stream configuration
170
171
### Path Templating
172
173
The partition router enables dynamic path construction:
174
```yaml { .api }
175
path: v1/deals/{{ stream_partition.parent_id }}/products
176
```
177
178
This template is populated with each deal's ID from the parent stream.
179
180
## Data Extraction Patterns
181
182
### Standard Extraction
183
Communication streams use standard data extraction:
184
```yaml { .api }
185
record_selector:
186
type: RecordSelector
187
extractor:
188
type: DpathExtractor
189
field_path: ["data"]
190
```
191
192
### Relationship Data Flow
193
1. **Parent Stream Query**: Extract all deals
194
2. **Partition Creation**: Create partition for each deal ID
195
3. **Child Stream Query**: For each partition, query deal-specific products
196
4. **Data Combination**: Combine all product records across all deals
197
198
## Common Configuration
199
200
All relationship and communication streams share:
201
202
### Request Configuration
203
```yaml { .api }
204
requester:
205
$ref: "#/definitions/base_requester"
206
http_method: GET
207
request_parameters:
208
api_token: "{{ config['api_token'] }}"
209
limit: "50"
210
```
211
212
### Pagination Configuration
213
```yaml { .api }
214
paginator:
215
type: DefaultPaginator
216
page_token_option:
217
type: RequestOption
218
inject_into: request_parameter
219
field_name: start
220
pagination_strategy:
221
type: CursorPagination
222
cursor_value: "{{ response['additional_data']['pagination']['next_start'] }}"
223
```
224
225
### Schema Loading
226
```yaml { .api }
227
schema_loader:
228
type: InlineSchemaLoader
229
schema:
230
$ref: "#/schemas/{stream_name}"
231
```
232
233
## Use Cases
234
235
### Deal Products Stream
236
- Track which products are included in deals
237
- Analyze product performance across deals
238
- Calculate total deal values including product quantities
239
- Monitor product adoption and sales metrics
240
241
### Mail Streams
242
- Extract email communication history
243
- Analyze communication patterns with contacts
244
- Track email engagement and response rates
245
- Integrate email data with CRM activities
246
247
### Integration Patterns
248
These streams enable comprehensive data analysis by connecting:
249
- **Deal → Products**: Revenue analysis and product performance
250
- **Contacts → Communications**: Relationship tracking and engagement analysis
251
- **Activities → Communications**: Complete interaction history