or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

api-client.mdbase-stream-classes.mdcrm-streams.mdcustom-objects.mdengagement-streams.mderror-handling.mdindex.mdmarketing-sales-streams.mdproperty-history-streams.mdsource-connector.mdweb-analytics.md

crm-streams.mddocs/

0

# CRM Data Streams

1

2

Stream classes for core HubSpot CRM objects including contacts, companies, deals, and tickets. These streams provide access to the primary business objects in HubSpot with incremental sync capabilities and comprehensive property support.

3

4

## Capabilities

5

6

### Base CRM Stream Classes

7

8

Foundation classes that provide common functionality for CRM object streams.

9

10

```python { .api }

11

class BaseStream(HttpStream, ABC):

12

"""

13

Abstract base class for all HubSpot streams.

14

15

Properties:

16

- url_base: Base URL for API requests

17

- primary_key: Primary key field(s) for the stream

18

- properties: Available properties for the object type

19

"""

20

21

def scope_is_granted(self, granted_scopes: Set[str]) -> bool:

22

"""Check if required scopes are granted for this stream."""

23

24

def properties_scope_is_granted(self) -> bool:

25

"""Check if properties-related scopes are granted."""

26

27

class IncrementalStream(BaseStream, ABC):

28

"""

29

Abstract base class for incremental streams.

30

31

Properties:

32

- state_checkpoint_interval: Checkpoint interval for state management

33

- cursor_field: Field used for incremental sync

34

"""

35

36

def get_updated_state(self, current_stream_state, latest_record) -> MutableMapping[str, Any]:

37

"""Update stream state with latest record information."""

38

39

class CRMSearchStream(IncrementalStream, ABC):

40

"""

41

Base class for CRM object streams using HubSpot's search API.

42

43

Properties:

44

- search_url: Search endpoint URL

45

- associations: Object associations to include

46

- query_params: Query parameters for search

47

"""

48

```

49

50

### Contact Streams

51

52

Access to HubSpot contact data with full property support and incremental sync.

53

54

```python { .api }

55

class Contacts(CRMSearchStream):

56

"""

57

Stream for HubSpot contact records.

58

59

Provides access to contact data including:

60

- Basic contact information (name, email, phone)

61

- Custom contact properties

62

- Lifecycle stage information

63

- Contact associations

64

- Property history (if scopes granted)

65

"""

66

```

67

68

### Company Streams

69

70

Access to HubSpot company data with comprehensive business information.

71

72

```python { .api }

73

class Companies(CRMSearchStream):

74

"""

75

Stream for HubSpot company records.

76

77

Provides access to company data including:

78

- Company information (name, domain, industry)

79

- Custom company properties

80

- Company associations with contacts and deals

81

- Property history (if scopes granted)

82

"""

83

```

84

85

### Deal Streams

86

87

Access to HubSpot deal data including pipeline and sales information.

88

89

```python { .api }

90

class Deals(CRMSearchStream):

91

"""

92

Stream for HubSpot deal records.

93

94

Provides access to deal data including:

95

- Deal information (name, amount, stage)

96

- Pipeline and deal stage data

97

- Custom deal properties

98

- Deal associations with contacts and companies

99

- Property history (if scopes granted)

100

"""

101

102

class DealsArchived(ClientSideIncrementalStream):

103

"""

104

Stream for archived HubSpot deal records.

105

106

Provides access to deals that have been archived,

107

with client-side incremental sync capabilities.

108

"""

109

110

class DealSplits(CRMSearchStream):

111

"""

112

Stream for HubSpot deal split records.

113

114

Provides access to deal revenue splits and

115

commission tracking data.

116

"""

117

118

class Leads(CRMSearchStream):

119

"""

120

Stream for HubSpot lead records.

121

122

Provides access to lead data including:

123

- Lead qualification status

124

- Lead source and campaign attribution

125

- Lead scoring and ranking

126

- Conversion tracking

127

- Lead lifecycle stage progression

128

"""

129

```

130

131

### Ticket Streams

132

133

Access to HubSpot service ticket data for customer support workflows.

134

135

```python { .api }

136

class Tickets(CRMSearchStream):

137

"""

138

Stream for HubSpot ticket records.

139

140

Provides access to support ticket data including:

141

- Ticket information (subject, status, priority)

142

- Ticket pipeline and stage data

143

- Custom ticket properties

144

- Ticket associations with contacts and companies

145

"""

146

```

147

148

## Usage Examples

149

150

### Basic Contact Data Access

151

152

```python

153

from source_hubspot.streams import Contacts, API

154

155

# Setup API client

156

credentials = {

157

"credentials_title": "OAuth Credentials",

158

"client_id": "your_client_id",

159

"client_secret": "your_client_secret",

160

"refresh_token": "your_refresh_token"

161

}

162

api = API(credentials)

163

164

# Create contacts stream

165

contacts = Contacts(

166

api=api,

167

start_date="2023-01-01T00:00:00Z",

168

credentials=credentials

169

)

170

171

# Read contact data

172

for record in contacts.read_records(sync_mode="full_refresh"):

173

print(f"Contact: {record['properties']['email']}")

174

print(f"Name: {record['properties'].get('firstname', '')} {record['properties'].get('lastname', '')}")

175

```

176

177

### Incremental Company Sync

178

179

```python

180

from source_hubspot.streams import Companies

181

from airbyte_cdk.models import SyncMode

182

183

# Create companies stream

184

companies = Companies(

185

api=api,

186

start_date="2023-01-01T00:00:00Z",

187

credentials=credentials

188

)

189

190

# Read with incremental sync

191

stream_state = {}

192

for record in companies.read_records(

193

sync_mode=SyncMode.incremental,

194

stream_state=stream_state

195

):

196

print(f"Company: {record['properties']['name']}")

197

print(f"Domain: {record['properties'].get('domain', 'N/A')}")

198

199

# Update state

200

stream_state = companies.get_updated_state(stream_state, record)

201

```

202

203

### Deal Pipeline Analysis

204

205

```python

206

from source_hubspot.streams import Deals

207

208

deals = Deals(

209

api=api,

210

start_date="2023-01-01T00:00:00Z",

211

credentials=credentials

212

)

213

214

# Analyze deals by pipeline stage

215

deal_stages = {}

216

for record in deals.read_records(sync_mode="full_refresh"):

217

stage = record['properties'].get('dealstage', 'Unknown')

218

amount = float(record['properties'].get('amount', 0) or 0)

219

220

if stage not in deal_stages:

221

deal_stages[stage] = {'count': 0, 'total_amount': 0}

222

223

deal_stages[stage]['count'] += 1

224

deal_stages[stage]['total_amount'] += amount

225

226

for stage, data in deal_stages.items():

227

print(f"{stage}: {data['count']} deals, ${data['total_amount']:,.2f}")

228

```

229

230

### Support Ticket Stream

231

232

```python

233

from source_hubspot.streams import Tickets

234

235

tickets = Tickets(

236

api=api,

237

start_date="2023-01-01T00:00:00Z",

238

credentials=credentials

239

)

240

241

# Process support tickets

242

for record in tickets.read_records(sync_mode="full_refresh"):

243

ticket_id = record['id']

244

subject = record['properties'].get('subject', 'No subject')

245

status = record['properties'].get('hs_ticket_status', 'Unknown')

246

priority = record['properties'].get('hs_ticket_priority', 'Normal')

247

248

print(f"Ticket {ticket_id}: {subject}")

249

print(f"Status: {status}, Priority: {priority}")

250

```

251

252

### Working with Associations

253

254

```python

255

# CRM streams support associations between objects

256

contacts = Contacts(

257

api=api,

258

start_date="2023-01-01T00:00:00Z",

259

credentials=credentials

260

)

261

262

for record in contacts.read_records(sync_mode="full_refresh"):

263

contact_id = record['id']

264

email = record['properties']['email']

265

266

# Check for associated companies

267

associations = record.get('associations', {})

268

if 'companies' in associations:

269

company_ids = [assoc['id'] for assoc in associations['companies']['results']]

270

print(f"Contact {email} associated with companies: {company_ids}")

271

```

272

273

## Stream Configuration

274

275

### Required Parameters

276

277

All CRM streams require these common parameters:

278

279

- **api**: API client instance with authentication

280

- **start_date**: ISO 8601 datetime string for incremental sync starting point

281

- **credentials**: Authentication credentials object

282

283

### Optional Parameters

284

285

- **acceptance_test_config**: Configuration for acceptance testing (used internally)

286

287

### Stream Properties

288

289

Each CRM stream provides access to:

290

- **name**: Stream name (e.g., "contacts", "companies")

291

- **primary_key**: Primary key field (typically "id")

292

- **cursor_field**: Field used for incremental sync (typically "updatedAt")

293

- **properties**: Available object properties based on HubSpot schema

294

295

## OAuth Scopes

296

297

CRM streams require specific OAuth scopes:

298

299

- **Contacts**: `crm.objects.contacts.read`

300

- **Companies**: `crm.objects.contacts.read`, `crm.objects.companies.read`

301

- **Deals**: `contacts`, `crm.objects.deals.read`

302

- **Tickets**: `tickets`

303

304

Additional scopes for property history:

305

- **Contacts**: `crm.schemas.contacts.read`

306

- **Companies**: `crm.schemas.companies.read`

307

- **Deals**: `crm.schemas.deals.read`