0
# Base Stream Classes
1
2
Foundation classes for extending and customizing HubSpot stream functionality. These abstract base classes provide common patterns and interfaces that can be extended to create custom streams or modify existing behavior.
3
4
## Capabilities
5
6
### Core Base Classes
7
8
Foundation classes that provide common HTTP and streaming functionality.
9
10
```python { .api }
11
class BaseStream(HttpStream, ABC):
12
"""
13
Abstract base class for all HubSpot streams.
14
15
Provides common functionality including:
16
- HTTP client configuration and authentication
17
- Error handling and retry logic
18
- Scope validation and permissions checking
19
- Common request headers and parameters
20
- Response parsing and transformation
21
22
Abstract methods to implement:
23
- path(): API endpoint path
24
- parse_response(): Response parsing logic
25
"""
26
27
@abstractmethod
28
def path(self, **kwargs) -> str:
29
"""Return the API endpoint path for this stream."""
30
31
@abstractmethod
32
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping[str, Any]]:
33
"""Parse HTTP response into stream records."""
34
35
def scope_is_granted(self, granted_scopes: Set[str]) -> bool:
36
"""Check if required OAuth scopes are granted."""
37
38
def properties_scope_is_granted(self) -> bool:
39
"""Check if property schema scopes are granted."""
40
```
41
42
### Incremental Sync Classes
43
44
Classes that support incremental data synchronization with cursor-based state management.
45
46
```python { .api }
47
class IncrementalStream(BaseStream, ABC):
48
"""
49
Abstract base class for streams supporting incremental sync.
50
51
Provides incremental sync functionality including:
52
- Cursor-based state management
53
- Automatic filtering by update timestamps
54
- State persistence and recovery
55
- Lookback window handling
56
- Duplicate record detection
57
58
Abstract methods to implement:
59
- cursor_field: Field name for cursor tracking
60
- get_updated_state(): State update logic
61
"""
62
63
@property
64
@abstractmethod
65
def cursor_field(self) -> str:
66
"""Field name used for incremental sync cursor."""
67
68
@abstractmethod
69
def get_updated_state(
70
self,
71
current_stream_state: MutableMapping[str, Any],
72
latest_record: Mapping[str, Any]
73
) -> MutableMapping[str, Any]:
74
"""Update stream state with latest cursor value."""
75
76
class ClientSideIncrementalStream(BaseStream, CheckpointMixin):
77
"""
78
Base class for client-side incremental streams.
79
80
Handles incremental sync logic on the client side when
81
the API doesn't support server-side filtering.
82
83
Features:
84
- Client-side record filtering by timestamp
85
- Checkpoint-based state management
86
- Memory-efficient processing
87
- Automatic deduplication
88
"""
89
```
90
91
### CRM-Specific Classes
92
93
Specialized classes for HubSpot CRM object streaming with built-in CRM patterns.
94
95
```python { .api }
96
class CRMSearchStream(IncrementalStream, ABC):
97
"""
98
Base class for CRM objects using HubSpot's search API.
99
100
Provides CRM-specific functionality including:
101
- Search API integration with filters
102
- Association loading
103
- Property schema discovery
104
- Bulk property handling
105
- Archived record handling
106
107
Features:
108
- Automatic property discovery from HubSpot schemas
109
- Support for custom properties
110
- Association data loading
111
- Search-based pagination
112
- Incremental sync with search filters
113
"""
114
115
@property
116
@abstractmethod
117
def entity(self) -> str:
118
"""CRM entity name (e.g., 'contact', 'company', 'deal')."""
119
120
class CRMObjectStream(BaseStream):
121
"""
122
Base class for simple CRM object streams.
123
124
For CRM objects that use basic list endpoints
125
rather than the search API.
126
127
Features:
128
- Simple pagination
129
- Property filtering
130
- Basic error handling
131
- Standard CRM object patterns
132
"""
133
134
class CRMObjectIncrementalStream(CRMObjectStream, IncrementalStream):
135
"""
136
Incremental version of CRMObjectStream.
137
138
Combines simple CRM object access with incremental
139
sync capabilities for objects that support timestamp filtering.
140
141
Features:
142
- Timestamp-based incremental sync
143
- Simple CRM object endpoints
144
- Efficient state management
145
- Property-based filtering
146
"""
147
```
148
149
### Specialized Pattern Classes
150
151
Classes for specific HubSpot API patterns and use cases.
152
153
```python { .api }
154
class AssociationsStream(BaseStream):
155
"""
156
Base class for loading object associations.
157
158
Handles HubSpot's association API patterns including:
159
- Association type management
160
- Bidirectional relationship handling
161
- Association metadata
162
- Bulk association loading
163
164
Features:
165
- Multiple association type support
166
- Pagination for large association sets
167
- Association metadata inclusion
168
- Performance optimizations
169
"""
170
171
def __init__(
172
self,
173
parent_stream: Stream,
174
identifiers: Iterable[Union[int, str]],
175
*args,
176
**kwargs
177
):
178
"""
179
Initialize associations stream.
180
181
Parameters:
182
- parent_stream: Source stream for object IDs
183
- identifiers: Object IDs to load associations for
184
"""
185
```
186
187
## Usage Examples
188
189
### Custom CRM Stream
190
191
```python
192
from source_hubspot.streams import CRMSearchStream
193
from typing import Any, Iterable, Mapping, MutableMapping
194
195
class CustomObjectStream(CRMSearchStream):
196
"""Custom stream for a specific HubSpot custom object."""
197
198
entity = "my_custom_object" # HubSpot custom object name
199
scopes = {"crm.objects.custom.read", "crm.schemas.custom.read"}
200
201
def path(self, **kwargs) -> str:
202
return f"/crm/v3/objects/{self.entity}"
203
204
def get_json_schema(self) -> Mapping[str, Any]:
205
# Define custom schema or use dynamic discovery
206
return {
207
"type": "object",
208
"properties": {
209
"id": {"type": "string"},
210
"properties": {"type": "object"},
211
"createdAt": {"type": "string", "format": "date-time"},
212
"updatedAt": {"type": "string", "format": "date-time"}
213
}
214
}
215
216
# Usage
217
custom_stream = CustomObjectStream(
218
api=api,
219
start_date="2023-01-01T00:00:00Z",
220
credentials=credentials
221
)
222
```
223
224
### Custom Incremental Stream
225
226
```python
227
class CustomIncrementalStream(IncrementalStream):
228
"""Custom stream with incremental sync."""
229
230
primary_key = "id"
231
cursor_field = "updatedAt"
232
233
def path(self, **kwargs) -> str:
234
return "/custom/api/endpoint"
235
236
def request_params(
237
self,
238
stream_state: Mapping[str, Any],
239
stream_slice: Mapping[str, Any] = None,
240
next_page_token: Mapping[str, Any] = None
241
) -> MutableMapping[str, Any]:
242
params = {"limit": 100}
243
244
# Add incremental filter
245
if stream_state and self.cursor_field in stream_state:
246
params["since"] = stream_state[self.cursor_field]
247
248
# Add pagination
249
if next_page_token:
250
params["offset"] = next_page_token["offset"]
251
252
return params
253
254
def parse_response(
255
self,
256
response: requests.Response,
257
**kwargs
258
) -> Iterable[Mapping[str, Any]]:
259
data = response.json()
260
yield from data.get("results", [])
261
262
def get_updated_state(
263
self,
264
current_stream_state: MutableMapping[str, Any],
265
latest_record: Mapping[str, Any]
266
) -> MutableMapping[str, Any]:
267
current_cursor = current_stream_state.get(self.cursor_field)
268
latest_cursor = latest_record.get(self.cursor_field)
269
270
if not current_cursor or latest_cursor > current_cursor:
271
return {self.cursor_field: latest_cursor}
272
return current_stream_state
273
```
274
275
### Client-Side Incremental Stream
276
277
```python
278
class CustomClientSideStream(ClientSideIncrementalStream):
279
"""Stream that handles incremental sync client-side."""
280
281
primary_key = "id"
282
cursor_field = "modified_date"
283
284
def path(self, **kwargs) -> str:
285
return "/api/all-records" # API doesn't support filtering
286
287
def parse_response(
288
self,
289
response: requests.Response,
290
**kwargs
291
) -> Iterable[Mapping[str, Any]]:
292
data = response.json()
293
records = data.get("items", [])
294
295
# Client-side filtering based on state
296
stream_state = kwargs.get("stream_state", {})
297
last_modified = stream_state.get(self.cursor_field)
298
299
for record in records:
300
if not last_modified or record[self.cursor_field] > last_modified:
301
yield record
302
```
303
304
## Extension Patterns
305
306
### Adding Custom Properties
307
308
```python
309
class EnhancedContactsStream(Contacts):
310
"""Contacts stream with additional custom processing."""
311
312
def parse_response(
313
self,
314
response: requests.Response,
315
**kwargs
316
) -> Iterable[Mapping[str, Any]]:
317
# Get base records
318
for record in super().parse_response(response, **kwargs):
319
# Add custom processing
320
if "properties" in record:
321
record["computed_score"] = self._calculate_score(record["properties"])
322
yield record
323
324
def _calculate_score(self, properties: Mapping[str, Any]) -> int:
325
"""Custom scoring logic."""
326
score = 0
327
if properties.get("email"):
328
score += 10
329
if properties.get("company"):
330
score += 20
331
return score
332
```
333
334
### Custom Error Handling
335
336
```python
337
class RobustStream(BaseStream):
338
"""Stream with enhanced error handling."""
339
340
def parse_response(
341
self,
342
response: requests.Response,
343
**kwargs
344
) -> Iterable[Mapping[str, Any]]:
345
try:
346
data = response.json()
347
except json.JSONDecodeError:
348
self.logger.warning(f"Invalid JSON response: {response.text}")
349
return
350
351
if "errors" in data:
352
for error in data["errors"]:
353
self.logger.error(f"API Error: {error}")
354
return
355
356
yield from data.get("results", [])
357
```
358
359
## Abstract Method Requirements
360
361
When extending base classes, you must implement these abstract methods:
362
363
### BaseStream Requirements
364
- `path()`: Return API endpoint path
365
- `parse_response()`: Parse HTTP response to records
366
367
### IncrementalStream Additional Requirements
368
- `cursor_field`: Property name for cursor tracking
369
- `get_updated_state()`: Update state with latest cursor
370
371
### CRMSearchStream Additional Requirements
372
- `entity`: CRM object type name
373
374
Failure to implement required abstract methods will result in a `TypeError` at runtime.