or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

api-client.mdbase-stream-classes.mdcrm-streams.mdcustom-objects.mdengagement-streams.mderror-handling.mdindex.mdmarketing-sales-streams.mdproperty-history-streams.mdsource-connector.mdweb-analytics.md

base-stream-classes.mddocs/

0

# Base Stream Classes

1

2

Foundation classes for extending and customizing HubSpot stream functionality. These abstract base classes provide common patterns and interfaces that can be extended to create custom streams or modify existing behavior.

3

4

## Capabilities

5

6

### Core Base Classes

7

8

Foundation classes that provide common HTTP and streaming functionality.

9

10

```python { .api }

11

class BaseStream(HttpStream, ABC):

12

"""

13

Abstract base class for all HubSpot streams.

14

15

Provides common functionality including:

16

- HTTP client configuration and authentication

17

- Error handling and retry logic

18

- Scope validation and permissions checking

19

- Common request headers and parameters

20

- Response parsing and transformation

21

22

Abstract methods to implement:

23

- path(): API endpoint path

24

- parse_response(): Response parsing logic

25

"""

26

27

@abstractmethod

28

def path(self, **kwargs) -> str:

29

"""Return the API endpoint path for this stream."""

30

31

@abstractmethod

32

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping[str, Any]]:

33

"""Parse HTTP response into stream records."""

34

35

def scope_is_granted(self, granted_scopes: Set[str]) -> bool:

36

"""Check if required OAuth scopes are granted."""

37

38

def properties_scope_is_granted(self) -> bool:

39

"""Check if property schema scopes are granted."""

40

```

41

42

### Incremental Sync Classes

43

44

Classes that support incremental data synchronization with cursor-based state management.

45

46

```python { .api }

47

class IncrementalStream(BaseStream, ABC):

48

"""

49

Abstract base class for streams supporting incremental sync.

50

51

Provides incremental sync functionality including:

52

- Cursor-based state management

53

- Automatic filtering by update timestamps

54

- State persistence and recovery

55

- Lookback window handling

56

- Duplicate record detection

57

58

Abstract methods to implement:

59

- cursor_field: Field name for cursor tracking

60

- get_updated_state(): State update logic

61

"""

62

63

@property

64

@abstractmethod

65

def cursor_field(self) -> str:

66

"""Field name used for incremental sync cursor."""

67

68

@abstractmethod

69

def get_updated_state(

70

self,

71

current_stream_state: MutableMapping[str, Any],

72

latest_record: Mapping[str, Any]

73

) -> MutableMapping[str, Any]:

74

"""Update stream state with latest cursor value."""

75

76

class ClientSideIncrementalStream(BaseStream, CheckpointMixin):

77

"""

78

Base class for client-side incremental streams.

79

80

Handles incremental sync logic on the client side when

81

the API doesn't support server-side filtering.

82

83

Features:

84

- Client-side record filtering by timestamp

85

- Checkpoint-based state management

86

- Memory-efficient processing

87

- Automatic deduplication

88

"""

89

```

90

91

### CRM-Specific Classes

92

93

Specialized classes for HubSpot CRM object streaming with built-in CRM patterns.

94

95

```python { .api }

96

class CRMSearchStream(IncrementalStream, ABC):

97

"""

98

Base class for CRM objects using HubSpot's search API.

99

100

Provides CRM-specific functionality including:

101

- Search API integration with filters

102

- Association loading

103

- Property schema discovery

104

- Bulk property handling

105

- Archived record handling

106

107

Features:

108

- Automatic property discovery from HubSpot schemas

109

- Support for custom properties

110

- Association data loading

111

- Search-based pagination

112

- Incremental sync with search filters

113

"""

114

115

@property

116

@abstractmethod

117

def entity(self) -> str:

118

"""CRM entity name (e.g., 'contact', 'company', 'deal')."""

119

120

class CRMObjectStream(BaseStream):

121

"""

122

Base class for simple CRM object streams.

123

124

For CRM objects that use basic list endpoints

125

rather than the search API.

126

127

Features:

128

- Simple pagination

129

- Property filtering

130

- Basic error handling

131

- Standard CRM object patterns

132

"""

133

134

class CRMObjectIncrementalStream(CRMObjectStream, IncrementalStream):

135

"""

136

Incremental version of CRMObjectStream.

137

138

Combines simple CRM object access with incremental

139

sync capabilities for objects that support timestamp filtering.

140

141

Features:

142

- Timestamp-based incremental sync

143

- Simple CRM object endpoints

144

- Efficient state management

145

- Property-based filtering

146

"""

147

```

148

149

### Specialized Pattern Classes

150

151

Classes for specific HubSpot API patterns and use cases.

152

153

```python { .api }

154

class AssociationsStream(BaseStream):

155

"""

156

Base class for loading object associations.

157

158

Handles HubSpot's association API patterns including:

159

- Association type management

160

- Bidirectional relationship handling

161

- Association metadata

162

- Bulk association loading

163

164

Features:

165

- Multiple association type support

166

- Pagination for large association sets

167

- Association metadata inclusion

168

- Performance optimizations

169

"""

170

171

def __init__(

172

self,

173

parent_stream: Stream,

174

identifiers: Iterable[Union[int, str]],

175

*args,

176

**kwargs

177

):

178

"""

179

Initialize associations stream.

180

181

Parameters:

182

- parent_stream: Source stream for object IDs

183

- identifiers: Object IDs to load associations for

184

"""

185

```

186

187

## Usage Examples

188

189

### Custom CRM Stream

190

191

```python

192

from source_hubspot.streams import CRMSearchStream

193

from typing import Any, Iterable, Mapping, MutableMapping

194

195

class CustomObjectStream(CRMSearchStream):

196

"""Custom stream for a specific HubSpot custom object."""

197

198

entity = "my_custom_object" # HubSpot custom object name

199

scopes = {"crm.objects.custom.read", "crm.schemas.custom.read"}

200

201

def path(self, **kwargs) -> str:

202

return f"/crm/v3/objects/{self.entity}"

203

204

def get_json_schema(self) -> Mapping[str, Any]:

205

# Define custom schema or use dynamic discovery

206

return {

207

"type": "object",

208

"properties": {

209

"id": {"type": "string"},

210

"properties": {"type": "object"},

211

"createdAt": {"type": "string", "format": "date-time"},

212

"updatedAt": {"type": "string", "format": "date-time"}

213

}

214

}

215

216

# Usage

217

custom_stream = CustomObjectStream(

218

api=api,

219

start_date="2023-01-01T00:00:00Z",

220

credentials=credentials

221

)

222

```

223

224

### Custom Incremental Stream

225

226

```python

227

class CustomIncrementalStream(IncrementalStream):

228

"""Custom stream with incremental sync."""

229

230

primary_key = "id"

231

cursor_field = "updatedAt"

232

233

def path(self, **kwargs) -> str:

234

return "/custom/api/endpoint"

235

236

def request_params(

237

self,

238

stream_state: Mapping[str, Any],

239

stream_slice: Mapping[str, Any] = None,

240

next_page_token: Mapping[str, Any] = None

241

) -> MutableMapping[str, Any]:

242

params = {"limit": 100}

243

244

# Add incremental filter

245

if stream_state and self.cursor_field in stream_state:

246

params["since"] = stream_state[self.cursor_field]

247

248

# Add pagination

249

if next_page_token:

250

params["offset"] = next_page_token["offset"]

251

252

return params

253

254

def parse_response(

255

self,

256

response: requests.Response,

257

**kwargs

258

) -> Iterable[Mapping[str, Any]]:

259

data = response.json()

260

yield from data.get("results", [])

261

262

def get_updated_state(

263

self,

264

current_stream_state: MutableMapping[str, Any],

265

latest_record: Mapping[str, Any]

266

) -> MutableMapping[str, Any]:

267

current_cursor = current_stream_state.get(self.cursor_field)

268

latest_cursor = latest_record.get(self.cursor_field)

269

270

if not current_cursor or latest_cursor > current_cursor:

271

return {self.cursor_field: latest_cursor}

272

return current_stream_state

273

```

274

275

### Client-Side Incremental Stream

276

277

```python

278

class CustomClientSideStream(ClientSideIncrementalStream):

279

"""Stream that handles incremental sync client-side."""

280

281

primary_key = "id"

282

cursor_field = "modified_date"

283

284

def path(self, **kwargs) -> str:

285

return "/api/all-records" # API doesn't support filtering

286

287

def parse_response(

288

self,

289

response: requests.Response,

290

**kwargs

291

) -> Iterable[Mapping[str, Any]]:

292

data = response.json()

293

records = data.get("items", [])

294

295

# Client-side filtering based on state

296

stream_state = kwargs.get("stream_state", {})

297

last_modified = stream_state.get(self.cursor_field)

298

299

for record in records:

300

if not last_modified or record[self.cursor_field] > last_modified:

301

yield record

302

```

303

304

## Extension Patterns

305

306

### Adding Custom Properties

307

308

```python

309

class EnhancedContactsStream(Contacts):

310

"""Contacts stream with additional custom processing."""

311

312

def parse_response(

313

self,

314

response: requests.Response,

315

**kwargs

316

) -> Iterable[Mapping[str, Any]]:

317

# Get base records

318

for record in super().parse_response(response, **kwargs):

319

# Add custom processing

320

if "properties" in record:

321

record["computed_score"] = self._calculate_score(record["properties"])

322

yield record

323

324

def _calculate_score(self, properties: Mapping[str, Any]) -> int:

325

"""Custom scoring logic."""

326

score = 0

327

if properties.get("email"):

328

score += 10

329

if properties.get("company"):

330

score += 20

331

return score

332

```

333

334

### Custom Error Handling

335

336

```python

337

class RobustStream(BaseStream):

338

"""Stream with enhanced error handling."""

339

340

def parse_response(

341

self,

342

response: requests.Response,

343

**kwargs

344

) -> Iterable[Mapping[str, Any]]:

345

try:

346

data = response.json()

347

except json.JSONDecodeError:

348

self.logger.warning(f"Invalid JSON response: {response.text}")

349

return

350

351

if "errors" in data:

352

for error in data["errors"]:

353

self.logger.error(f"API Error: {error}")

354

return

355

356

yield from data.get("results", [])

357

```

358

359

## Abstract Method Requirements

360

361

When extending base classes, you must implement these abstract methods:

362

363

### BaseStream Requirements

364

- `path()`: Return API endpoint path

365

- `parse_response()`: Parse HTTP response to records

366

367

### IncrementalStream Additional Requirements

368

- `cursor_field`: Property name for cursor tracking

369

- `get_updated_state()`: Update state with latest cursor

370

371

### CRMSearchStream Additional Requirements

372

- `entity`: CRM object type name

373

374

Failure to implement required abstract methods will result in a `TypeError` at runtime.