or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdcore-connector.mddata-processing.mddata-streams.mdindex.md

data-streams.mddocs/

0

# Data Streams

1

2

Access to 21 different Xero data streams organized into transactional data (with incremental sync) and reference data (snapshot sync). Each stream provides structured access to specific Xero accounting entities with appropriate sync strategies.

3

4

## Capabilities

5

6

### Transactional Streams (Incremental Sync)

7

8

Streams that support incremental synchronization using UpdatedDateUTC cursor field for efficient data replication.

9

10

```python { .api }

11

# Stream configuration for incremental sync

12

TransactionalStreams = {

13

"bank_transactions": {

14

"primary_key": "BankTransactionID",

15

"path": "/BankTransactions",

16

"cursor_field": "UpdatedDateUTC",

17

"supports_incremental": True

18

},

19

"contacts": {

20

"primary_key": "ContactID",

21

"path": "/Contacts",

22

"cursor_field": "UpdatedDateUTC",

23

"supports_incremental": True

24

},

25

"credit_notes": {

26

"primary_key": "CreditNoteID",

27

"path": "/CreditNotes",

28

"cursor_field": "UpdatedDateUTC",

29

"supports_incremental": True

30

},

31

"invoices": {

32

"primary_key": "InvoiceID",

33

"path": "/Invoices",

34

"cursor_field": "UpdatedDateUTC",

35

"supports_incremental": True

36

},

37

"manual_journals": {

38

"primary_key": "ManualJournalID",

39

"path": "/ManualJournals",

40

"cursor_field": "UpdatedDateUTC",

41

"supports_incremental": True

42

},

43

"overpayments": {

44

"primary_key": "OverpaymentID",

45

"path": "/Overpayments",

46

"cursor_field": "UpdatedDateUTC",

47

"supports_incremental": True

48

},

49

"prepayments": {

50

"primary_key": "PrepaymentID",

51

"path": "/Prepayments",

52

"cursor_field": "UpdatedDateUTC",

53

"supports_incremental": True

54

},

55

"purchase_orders": {

56

"primary_key": "PurchaseOrderID",

57

"path": "/PurchaseOrders",

58

"cursor_field": "UpdatedDateUTC",

59

"supports_incremental": True

60

},

61

"payments": {

62

"primary_key": "PaymentID",

63

"path": "/Payments",

64

"cursor_field": "UpdatedDateUTC",

65

"supports_incremental": True

66

}

67

}

68

"""

69

Nine transactional streams with incremental sync capabilities.

70

71

These streams track business transactions and frequently changing data:

72

- All use UpdatedDateUTC as cursor field for incremental sync

73

- Support configurable start_date for initial sync boundaries

74

- Page-based pagination with 100 records per page default

75

- Automatic date format conversion from Xero .NET JSON to ISO 8601

76

"""

77

```

78

79

### Reference Data Streams (Snapshot Sync)

80

81

Streams that perform full refresh synchronization for relatively static reference data.

82

83

```python { .api }

84

# Stream configuration for snapshot sync

85

ReferenceStreams = {

86

"accounts": {

87

"primary_key": "AccountID",

88

"path": "/Accounts",

89

"supports_incremental": False,

90

"description": "Chart of accounts and account structure"

91

},

92

"bank_transfers": {

93

"primary_key": "BankTransferID",

94

"path": "/BankTransfers",

95

"supports_incremental": False,

96

"description": "Bank transfer records between accounts"

97

},

98

"employees": {

99

"primary_key": "EmployeeID",

100

"path": "/Employees",

101

"supports_incremental": False,

102

"description": "Employee information and details"

103

},

104

"items": {

105

"primary_key": "ItemID",

106

"path": "/Items",

107

"supports_incremental": False,

108

"description": "Inventory items and product catalog"

109

},

110

"users": {

111

"primary_key": "UserID",

112

"path": "/Users",

113

"supports_incremental": False,

114

"description": "User accounts and access permissions"

115

},

116

"branding_themes": {

117

"primary_key": "BrandingThemeID",

118

"path": "/BrandingThemes",

119

"supports_incremental": False,

120

"description": "Invoice branding and theme configurations"

121

},

122

"contact_groups": {

123

"primary_key": "ContactGroupID",

124

"path": "/ContactGroups",

125

"supports_incremental": False,

126

"description": "Contact groupings and categories"

127

},

128

"currencies": {

129

"primary_key": "Code",

130

"path": "/Currencies",

131

"supports_incremental": False,

132

"description": "Currency definitions and exchange rates"

133

},

134

"organisations": {

135

"primary_key": "OrganisationID",

136

"path": "/Organisation",

137

"supports_incremental": False,

138

"description": "Organization details and settings"

139

},

140

"repeating_invoices": {

141

"primary_key": "RepeatingInvoiceID",

142

"path": "/RepeatingInvoices",

143

"supports_incremental": False,

144

"description": "Recurring invoice templates and schedules"

145

},

146

"tax_rates": {

147

"primary_key": "TaxType",

148

"path": "/TaxRates",

149

"supports_incremental": False,

150

"description": "Tax rate configurations and rules"

151

},

152

"tracking_categories": {

153

"primary_key": "TrackingCategoryID",

154

"path": "/TrackingCategories",

155

"supports_incremental": False,

156

"description": "Tracking category definitions for reporting"

157

}

158

}

159

"""

160

Twelve reference data streams with snapshot sync.

161

162

These streams contain relatively static configuration and reference data:

163

- Full refresh synchronization on each sync

164

- No cursor field or incremental capabilities

165

- Generally smaller datasets that change infrequently

166

- Provide lookup data and configuration for transactional streams

167

"""

168

```

169

170

### Stream Access Patterns

171

172

Common patterns for accessing and working with stream data from the connector.

173

174

```python { .api }

175

def get_stream_by_name(source: SourceXero, config: dict, stream_name: str):

176

"""

177

Retrieve a specific stream by name from the connector.

178

179

Args:

180

source: Initialized SourceXero connector instance

181

config: Valid configuration dictionary

182

stream_name: Name of the stream to retrieve

183

184

Returns:

185

Stream object or None if not found

186

"""

187

188

def list_all_streams(source: SourceXero, config: dict) -> list[dict]:

189

"""

190

Get information about all available streams.

191

192

Args:

193

source: Initialized SourceXero connector instance

194

config: Valid configuration dictionary

195

196

Returns:

197

List of stream information dictionaries containing:

198

- name: Stream name

199

- primary_key: Primary key field(s)

200

- supports_incremental: Boolean incremental sync support

201

- cursor_field: Cursor field name (if incremental)

202

"""

203

204

def get_stream_schema(stream) -> dict:

205

"""

206

Retrieve the JSON schema for a specific stream.

207

208

Args:

209

stream: Stream object from connector

210

211

Returns:

212

JSON schema dictionary defining the stream's data structure

213

"""

214

```

215

216

## Usage Examples

217

218

### Stream Discovery and Information

219

220

```python

221

from source_xero import SourceXero

222

223

def explore_available_streams():

224

"""Discover and examine available streams."""

225

source = SourceXero()

226

config = {

227

"access_token": "your_token",

228

"tenant_id": "your_tenant",

229

"start_date": "2023-01-01T00:00:00Z"

230

}

231

232

# Get all streams

233

streams = source.streams(config)

234

235

# Categorize streams by sync type

236

incremental_streams = []

237

snapshot_streams = []

238

239

for stream in streams:

240

stream_info = {

241

"name": stream.name,

242

"primary_key": getattr(stream, 'primary_key', None),

243

"supports_incremental": hasattr(stream, 'incremental_sync')

244

}

245

246

if stream_info["supports_incremental"]:

247

incremental_streams.append(stream_info)

248

else:

249

snapshot_streams.append(stream_info)

250

251

print(f"Incremental streams: {len(incremental_streams)}")

252

for stream in incremental_streams:

253

print(f" - {stream['name']} (key: {stream['primary_key']})")

254

255

print(f"Snapshot streams: {len(snapshot_streams)}")

256

for stream in snapshot_streams:

257

print(f" - {stream['name']} (key: {stream['primary_key']})")

258

259

# Run discovery

260

explore_available_streams()

261

```

262

263

### Working with Specific Streams

264

265

```python

266

from source_xero import SourceXero

267

import json

268

269

def examine_stream_details(stream_name: str):

270

"""Get detailed information about a specific stream."""

271

source = SourceXero()

272

config = {

273

"access_token": "your_token",

274

"tenant_id": "your_tenant",

275

"start_date": "2023-01-01T00:00:00Z"

276

}

277

278

# Find the specific stream

279

streams = source.streams(config)

280

target_stream = None

281

282

for stream in streams:

283

if stream.name == stream_name:

284

target_stream = stream

285

break

286

287

if target_stream:

288

print(f"Stream: {target_stream.name}")

289

print(f"Primary Key: {getattr(target_stream, 'primary_key', 'None')}")

290

print(f"Incremental: {hasattr(target_stream, 'incremental_sync')}")

291

292

# Get schema information

293

try:

294

catalog = source.discover(None, config)

295

for stream_catalog in catalog.streams:

296

if stream_catalog.stream.name == stream_name:

297

schema = stream_catalog.stream.json_schema

298

properties = schema.get('properties', {})

299

print(f"Fields: {len(properties)}")

300

print("Key fields:")

301

for field_name, field_def in list(properties.items())[:5]:

302

field_type = field_def.get('type', 'unknown')

303

print(f" - {field_name}: {field_type}")

304

break

305

except Exception as e:

306

print(f"Schema discovery failed: {e}")

307

else:

308

print(f"Stream '{stream_name}' not found")

309

310

# Example usage

311

examine_stream_details("invoices")

312

examine_stream_details("accounts")

313

```

314

315

### Stream Configuration for Airbyte

316

317

```python

318

def create_catalog_for_streams(stream_names: list[str]) -> dict:

319

"""Create Airbyte catalog configuration for specific streams."""

320

catalog = {

321

"streams": []

322

}

323

324

# Stream sync configurations

325

stream_configs = {

326

# Incremental streams

327

"bank_transactions": {

328

"sync_mode": "incremental",

329

"destination_sync_mode": "append_dedup",

330

"cursor_field": ["UpdatedDateUTC"]

331

},

332

"contacts": {

333

"sync_mode": "incremental",

334

"destination_sync_mode": "append_dedup",

335

"cursor_field": ["UpdatedDateUTC"]

336

},

337

"invoices": {

338

"sync_mode": "incremental",

339

"destination_sync_mode": "append_dedup",

340

"cursor_field": ["UpdatedDateUTC"]

341

},

342

# Snapshot streams

343

"accounts": {

344

"sync_mode": "full_refresh",

345

"destination_sync_mode": "overwrite"

346

},

347

"currencies": {

348

"sync_mode": "full_refresh",

349

"destination_sync_mode": "overwrite"

350

}

351

}

352

353

for stream_name in stream_names:

354

if stream_name in stream_configs:

355

stream_config = {

356

"stream": {

357

"name": stream_name,

358

"supported_sync_modes": ["full_refresh", "incremental"] if stream_configs[stream_name]["sync_mode"] == "incremental" else ["full_refresh"]

359

},

360

"config": stream_configs[stream_name]

361

}

362

catalog["streams"].append(stream_config)

363

364

return catalog

365

366

# Create catalog for selected streams

367

selected_streams = ["invoices", "contacts", "accounts", "currencies"]

368

catalog_config = create_catalog_for_streams(selected_streams)

369

print(json.dumps(catalog_config, indent=2))

370

```

371

372

## Data Processing Features

373

374

### Automatic Date Conversion

375

376

All streams automatically convert Xero's .NET JSON date format to ISO 8601:

377

378

- **Input**: `"/Date(1419937200000+0000)/"`

379

- **Output**: `"2014-12-30T07:00:00+00:00"`

380

381

This conversion happens transparently for all date fields in all streams using the CustomExtractor component.

382

383

### Pagination Support

384

385

Streams support page-based pagination:

386

387

- **Default page size**: 100 records

388

- **Configurable**: Can be adjusted via page_size parameter

389

- **Automatic**: Handled by Airbyte CDK DefaultPaginator

390

- **Progress tracking**: Automatic state management for large datasets

391

392

### Incremental Sync Behavior

393

394

For streams with incremental sync support:

395

396

- **Cursor field**: UpdatedDateUTC (automatically managed)

397

- **State management**: Automatic checkpoint storage and recovery

398

- **Boundary filtering**: Records filtered by UpdatedDateUTC >= start_time

399

- **Timezone handling**: All dates normalized to UTC for consistency

400

401

### Error Handling per Stream

402

403

Each stream inherits the connector's error handling configuration:

404

405

- **401 responses**: Sync fails with authentication error

406

- **403 responses**: Individual records skipped, sync continues

407

- **429 responses**: Automatic retry after 30-second delay

408

- **Network errors**: Standard retry logic with exponential backoff