0
# CRM Data Streams
1
2
Stream classes for core HubSpot CRM objects including contacts, companies, deals, and tickets. These streams provide access to the primary business objects in HubSpot with incremental sync capabilities and comprehensive property support.
3
4
## Capabilities
5
6
### Base CRM Stream Classes
7
8
Foundation classes that provide common functionality for CRM object streams.
9
10
```python { .api }
11
class BaseStream(HttpStream, ABC):
12
"""
13
Abstract base class for all HubSpot streams.
14
15
Properties:
16
- url_base: Base URL for API requests
17
- primary_key: Primary key field(s) for the stream
18
- properties: Available properties for the object type
19
"""
20
21
def scope_is_granted(self, granted_scopes: Set[str]) -> bool:
22
"""Check if required scopes are granted for this stream."""
23
24
def properties_scope_is_granted(self) -> bool:
25
"""Check if properties-related scopes are granted."""
26
27
class IncrementalStream(BaseStream, ABC):
28
"""
29
Abstract base class for incremental streams.
30
31
Properties:
32
- state_checkpoint_interval: Checkpoint interval for state management
33
- cursor_field: Field used for incremental sync
34
"""
35
36
def get_updated_state(self, current_stream_state, latest_record) -> MutableMapping[str, Any]:
37
"""Update stream state with latest record information."""
38
39
class CRMSearchStream(IncrementalStream, ABC):
40
"""
41
Base class for CRM object streams using HubSpot's search API.
42
43
Properties:
44
- search_url: Search endpoint URL
45
- associations: Object associations to include
46
- query_params: Query parameters for search
47
"""
48
```
49
50
### Contact Streams
51
52
Access to HubSpot contact data with full property support and incremental sync.
53
54
```python { .api }
55
class Contacts(CRMSearchStream):
56
"""
57
Stream for HubSpot contact records.
58
59
Provides access to contact data including:
60
- Basic contact information (name, email, phone)
61
- Custom contact properties
62
- Lifecycle stage information
63
- Contact associations
64
- Property history (if scopes granted)
65
"""
66
```
67
68
### Company Streams
69
70
Access to HubSpot company data with comprehensive business information.
71
72
```python { .api }
73
class Companies(CRMSearchStream):
74
"""
75
Stream for HubSpot company records.
76
77
Provides access to company data including:
78
- Company information (name, domain, industry)
79
- Custom company properties
80
- Company associations with contacts and deals
81
- Property history (if scopes granted)
82
"""
83
```
84
85
### Deal Streams
86
87
Access to HubSpot deal data including pipeline and sales information.
88
89
```python { .api }
90
class Deals(CRMSearchStream):
91
"""
92
Stream for HubSpot deal records.
93
94
Provides access to deal data including:
95
- Deal information (name, amount, stage)
96
- Pipeline and deal stage data
97
- Custom deal properties
98
- Deal associations with contacts and companies
99
- Property history (if scopes granted)
100
"""
101
102
class DealsArchived(ClientSideIncrementalStream):
103
"""
104
Stream for archived HubSpot deal records.
105
106
Provides access to deals that have been archived,
107
with client-side incremental sync capabilities.
108
"""
109
110
class DealSplits(CRMSearchStream):
111
"""
112
Stream for HubSpot deal split records.
113
114
Provides access to deal revenue splits and
115
commission tracking data.
116
"""
117
118
class Leads(CRMSearchStream):
119
"""
120
Stream for HubSpot lead records.
121
122
Provides access to lead data including:
123
- Lead qualification status
124
- Lead source and campaign attribution
125
- Lead scoring and ranking
126
- Conversion tracking
127
- Lead lifecycle stage progression
128
"""
129
```
130
131
### Ticket Streams
132
133
Access to HubSpot service ticket data for customer support workflows.
134
135
```python { .api }
136
class Tickets(CRMSearchStream):
137
"""
138
Stream for HubSpot ticket records.
139
140
Provides access to support ticket data including:
141
- Ticket information (subject, status, priority)
142
- Ticket pipeline and stage data
143
- Custom ticket properties
144
- Ticket associations with contacts and companies
145
"""
146
```
147
148
## Usage Examples
149
150
### Basic Contact Data Access
151
152
```python
153
from source_hubspot.streams import Contacts, API
154
155
# Setup API client
156
credentials = {
157
"credentials_title": "OAuth Credentials",
158
"client_id": "your_client_id",
159
"client_secret": "your_client_secret",
160
"refresh_token": "your_refresh_token"
161
}
162
api = API(credentials)
163
164
# Create contacts stream
165
contacts = Contacts(
166
api=api,
167
start_date="2023-01-01T00:00:00Z",
168
credentials=credentials
169
)
170
171
# Read contact data
172
for record in contacts.read_records(sync_mode="full_refresh"):
173
print(f"Contact: {record['properties']['email']}")
174
print(f"Name: {record['properties'].get('firstname', '')} {record['properties'].get('lastname', '')}")
175
```
176
177
### Incremental Company Sync
178
179
```python
180
from source_hubspot.streams import Companies
181
from airbyte_cdk.models import SyncMode
182
183
# Create companies stream
184
companies = Companies(
185
api=api,
186
start_date="2023-01-01T00:00:00Z",
187
credentials=credentials
188
)
189
190
# Read with incremental sync
191
stream_state = {}
192
for record in companies.read_records(
193
sync_mode=SyncMode.incremental,
194
stream_state=stream_state
195
):
196
print(f"Company: {record['properties']['name']}")
197
print(f"Domain: {record['properties'].get('domain', 'N/A')}")
198
199
# Update state
200
stream_state = companies.get_updated_state(stream_state, record)
201
```
202
203
### Deal Pipeline Analysis
204
205
```python
206
from source_hubspot.streams import Deals
207
208
deals = Deals(
209
api=api,
210
start_date="2023-01-01T00:00:00Z",
211
credentials=credentials
212
)
213
214
# Analyze deals by pipeline stage
215
deal_stages = {}
216
for record in deals.read_records(sync_mode="full_refresh"):
217
stage = record['properties'].get('dealstage', 'Unknown')
218
amount = float(record['properties'].get('amount', 0) or 0)
219
220
if stage not in deal_stages:
221
deal_stages[stage] = {'count': 0, 'total_amount': 0}
222
223
deal_stages[stage]['count'] += 1
224
deal_stages[stage]['total_amount'] += amount
225
226
for stage, data in deal_stages.items():
227
print(f"{stage}: {data['count']} deals, ${data['total_amount']:,.2f}")
228
```
229
230
### Support Ticket Stream
231
232
```python
233
from source_hubspot.streams import Tickets
234
235
tickets = Tickets(
236
api=api,
237
start_date="2023-01-01T00:00:00Z",
238
credentials=credentials
239
)
240
241
# Process support tickets
242
for record in tickets.read_records(sync_mode="full_refresh"):
243
ticket_id = record['id']
244
subject = record['properties'].get('subject', 'No subject')
245
status = record['properties'].get('hs_ticket_status', 'Unknown')
246
priority = record['properties'].get('hs_ticket_priority', 'Normal')
247
248
print(f"Ticket {ticket_id}: {subject}")
249
print(f"Status: {status}, Priority: {priority}")
250
```
251
252
### Working with Associations
253
254
```python
255
# CRM streams support associations between objects
256
contacts = Contacts(
257
api=api,
258
start_date="2023-01-01T00:00:00Z",
259
credentials=credentials
260
)
261
262
for record in contacts.read_records(sync_mode="full_refresh"):
263
contact_id = record['id']
264
email = record['properties']['email']
265
266
# Check for associated companies
267
associations = record.get('associations', {})
268
if 'companies' in associations:
269
company_ids = [assoc['id'] for assoc in associations['companies']['results']]
270
print(f"Contact {email} associated with companies: {company_ids}")
271
```
272
273
## Stream Configuration
274
275
### Required Parameters
276
277
All CRM streams require these common parameters:
278
279
- **api**: API client instance with authentication
280
- **start_date**: ISO 8601 datetime string for incremental sync starting point
281
- **credentials**: Authentication credentials object
282
283
### Optional Parameters
284
285
- **acceptance_test_config**: Configuration for acceptance testing (used internally)
286
287
### Stream Properties
288
289
Each CRM stream provides access to:
290
- **name**: Stream name (e.g., "contacts", "companies")
291
- **primary_key**: Primary key field (typically "id")
292
- **cursor_field**: Field used for incremental sync (typically "updatedAt")
293
- **properties**: Available object properties based on HubSpot schema
294
295
## OAuth Scopes
296
297
CRM streams require specific OAuth scopes:
298
299
- **Contacts**: `crm.objects.contacts.read`
300
- **Companies**: `crm.objects.contacts.read`, `crm.objects.companies.read`
301
- **Deals**: `contacts`, `crm.objects.deals.read`
302
- **Tickets**: `tickets`
303
304
Additional scopes for property history:
305
- **Contacts**: `crm.schemas.contacts.read`
306
- **Companies**: `crm.schemas.companies.read`
307
- **Deals**: `crm.schemas.deals.read`