or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-utilities.mdfull-refresh-streams.mdincremental-streams.mdindex.mdoauth-authentication.mdsource-configuration.md

incremental-streams.mddocs/

0

# Incremental Data Streams

1

2

Fourteen incremental synchronization streams that efficiently sync only new or updated records since the last sync. These streams handle the majority of Xero's transactional and customer data with cursor-based incremental updates.

3

4

## Capabilities

5

6

### Base Incremental Stream

7

8

The foundation class for all incremental streams, providing cursor-based synchronization and state management.

9

10

```python { .api }

11

class IncrementalXeroStream(XeroStream, ABC):

12

"""

13

Abstract base class for incremental Xero data streams.

14

15

Provides cursor-based incremental synchronization using Xero's

16

UpdatedDateUTC field to track and sync only changed records.

17

"""

18

19

cursor_field: str = "UpdatedDateUTC" # Default cursor field

20

state_checkpoint_interval: int = 100 # Records between state saves

21

22

def __init__(self, start_date: datetime, **kwargs):

23

"""

24

Initialize incremental stream with start date.

25

26

Parameters:

27

- start_date: datetime object for initial sync start point

28

- **kwargs: Additional arguments passed to parent XeroStream

29

"""

30

31

def request_headers(self, stream_state, stream_slice=None, next_page_token=None) -> Mapping[str, Any]:

32

"""

33

Build request headers including If-Modified-Since for incremental sync.

34

35

Parameters:

36

- stream_state: Current sync state with cursor value

37

- stream_slice: Stream partition (unused in Xero streams)

38

- next_page_token: Pagination token for continued requests

39

40

Returns:

41

Headers mapping with If-Modified-Since header for incremental requests

42

"""

43

44

def get_updated_state(self, current_stream_state: Mapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:

45

"""

46

Update stream state based on latest record's cursor value.

47

48

Parameters:

49

- current_stream_state: Current state mapping

50

- latest_record: Most recent record from API response

51

52

Returns:

53

Updated state mapping with new cursor value

54

"""

55

```

56

57

### Financial Transaction Streams

58

59

Core financial transaction streams for accounting data synchronization.

60

61

#### Bank Transactions

62

63

```python { .api }

64

class BankTransactions(IncrementalXeroStream):

65

"""

66

Bank transaction records including deposits, withdrawals, and transfers.

67

68

Primary Key: BankTransactionID

69

Cursor Field: UpdatedDateUTC

70

Pagination: Enabled (large datasets)

71

"""

72

73

primary_key = "BankTransactionID"

74

pagination = True

75

```

76

77

#### Invoices

78

79

```python { .api }

80

class Invoices(IncrementalXeroStream):

81

"""

82

Sales and purchase invoices with line items and payment status.

83

84

Primary Key: InvoiceID

85

Cursor Field: UpdatedDateUTC

86

Pagination: Enabled (large datasets)

87

"""

88

89

primary_key = "InvoiceID"

90

pagination = True

91

```

92

93

#### Credit Notes

94

95

```python { .api }

96

class CreditNotes(IncrementalXeroStream):

97

"""

98

Credit notes for invoice adjustments and refunds.

99

100

Primary Key: CreditNoteID

101

Cursor Field: UpdatedDateUTC

102

Pagination: Enabled

103

"""

104

105

primary_key = "CreditNoteID"

106

pagination = True

107

```

108

109

#### Payments

110

111

```python { .api }

112

class Payments(IncrementalXeroStream):

113

"""

114

Payment records linking invoices to bank transactions.

115

116

Primary Key: PaymentID

117

Cursor Field: UpdatedDateUTC

118

Pagination: Enabled

119

"""

120

121

primary_key = "PaymentID"

122

pagination = True

123

```

124

125

#### Manual Journals

126

127

```python { .api }

128

class ManualJournals(IncrementalXeroStream):

129

"""

130

Manual journal entries for accounting adjustments.

131

132

Primary Key: ManualJournalID

133

Cursor Field: UpdatedDateUTC

134

Pagination: Enabled

135

"""

136

137

primary_key = "ManualJournalID"

138

pagination = True

139

```

140

141

#### Purchase Orders

142

143

```python { .api }

144

class PurchaseOrders(IncrementalXeroStream):

145

"""

146

Purchase orders for tracking supplier orders and deliveries.

147

148

Primary Key: PurchaseOrderID

149

Cursor Field: UpdatedDateUTC

150

Pagination: Enabled

151

"""

152

153

primary_key = "PurchaseOrderID"

154

pagination = True

155

```

156

157

#### Overpayments

158

159

```python { .api }

160

class Overpayments(IncrementalXeroStream):

161

"""

162

Overpayment records for excess customer payments.

163

164

Primary Key: OverpaymentID

165

Cursor Field: UpdatedDateUTC

166

Pagination: Enabled

167

"""

168

169

primary_key = "OverpaymentID"

170

pagination = True

171

```

172

173

#### Prepayments

174

175

```python { .api }

176

class Prepayments(IncrementalXeroStream):

177

"""

178

Prepayment records for advance customer payments.

179

180

Primary Key: PrepaymentID

181

Cursor Field: UpdatedDateUTC

182

Pagination: Enabled

183

"""

184

185

primary_key = "PrepaymentID"

186

pagination = True

187

```

188

189

### Special Case: Bank Transfers

190

191

Bank transfers use a different cursor field than other streams.

192

193

```python { .api }

194

class BankTransfers(IncrementalXeroStream):

195

"""

196

Bank transfer records between accounts.

197

198

Primary Key: BankTransferID

199

Cursor Field: CreatedDateUTC (overridden from default)

200

Pagination: Enabled

201

"""

202

203

primary_key = "BankTransferID"

204

cursor_field = "CreatedDateUTC" # Uses creation date instead of update date

205

pagination = True

206

```

207

208

### Master Data Streams

209

210

Core business entity streams with less frequent updates.

211

212

#### Contacts

213

214

```python { .api }

215

class Contacts(IncrementalXeroStream):

216

"""

217

Customer and supplier contact information.

218

219

Primary Key: ContactID

220

Cursor Field: UpdatedDateUTC

221

Pagination: Enabled (large datasets)

222

"""

223

224

primary_key = "ContactID"

225

pagination = True

226

```

227

228

#### Accounts

229

230

```python { .api }

231

class Accounts(IncrementalXeroStream):

232

"""

233

Chart of accounts for financial categorization.

234

235

Primary Key: AccountID

236

Cursor Field: UpdatedDateUTC

237

Pagination: Disabled (manageable dataset size)

238

"""

239

240

primary_key = "AccountID"

241

pagination = False

242

```

243

244

#### Items

245

246

```python { .api }

247

class Items(IncrementalXeroStream):

248

"""

249

Product and service items for invoicing.

250

251

Primary Key: ItemID

252

Cursor Field: UpdatedDateUTC

253

Pagination: Disabled (manageable dataset size)

254

"""

255

256

primary_key = "ItemID"

257

pagination = False

258

```

259

260

#### Employees

261

262

```python { .api }

263

class Employees(IncrementalXeroStream):

264

"""

265

Employee records for payroll and expense management.

266

267

Primary Key: EmployeeID

268

Cursor Field: UpdatedDateUTC

269

Pagination: Enabled

270

"""

271

272

primary_key = "EmployeeID"

273

pagination = True

274

```

275

276

#### Users

277

278

```python { .api }

279

class Users(IncrementalXeroStream):

280

"""

281

Xero user accounts with system access permissions.

282

283

Primary Key: UserID

284

Cursor Field: UpdatedDateUTC

285

Pagination: Disabled (small dataset)

286

"""

287

288

primary_key = "UserID"

289

pagination = False

290

```

291

292

## Usage Examples

293

294

### Reading Incremental Stream

295

296

```python

297

from source_xero.streams import BankTransactions

298

from datetime import datetime

299

300

# Initialize stream with start date

301

start_date = datetime.fromisoformat("2023-01-01T00:00:00Z")

302

stream = BankTransactions(

303

tenant_id="your-tenant-id",

304

start_date=start_date,

305

authenticator=authenticator

306

)

307

308

# Read records with state management

309

stream_state = {"UpdatedDateUTC": "2023-06-01T00:00:00Z"}

310

records = []

311

312

for record in stream.read_records(

313

sync_mode=SyncMode.incremental,

314

stream_state=stream_state

315

):

316

records.append(record)

317

318

# Update state periodically (every 100 records)

319

if len(records) % 100 == 0:

320

stream_state = stream.get_updated_state(stream_state, record)

321

322

print(f"Synced {len(records)} records")

323

print(f"New state: {stream_state}")

324

```

325

326

### State Management

327

328

```python

329

# Initial state for first sync

330

initial_state = {}

331

332

# State after partial sync

333

partial_state = {

334

"UpdatedDateUTC": "2023-08-15T14:30:25Z"

335

}

336

337

# State comparison and updates

338

current_state = stream.get_updated_state(

339

current_stream_state=partial_state,

340

latest_record={"UpdatedDateUTC": "2023-08-15T16:45:30Z"}

341

)

342

343

# Result: {"UpdatedDateUTC": "2023-08-15T16:45:30Z"}

344

```

345

346

### Custom Date Range Sync

347

348

```python

349

from datetime import datetime, timedelta

350

351

# Sync last 30 days of transactions

352

thirty_days_ago = datetime.utcnow() - timedelta(days=30)

353

stream = Invoices(

354

tenant_id="your-tenant-id",

355

start_date=thirty_days_ago,

356

authenticator=authenticator

357

)

358

359

# The stream will automatically use If-Modified-Since headers

360

# to request only records updated since the start_date

361

```

362

363

## Performance Considerations

364

365

### Pagination Settings

366

367

Different streams use different pagination strategies based on typical data volumes:

368

369

```python

370

# Large dataset streams (enabled pagination)

371

LargeDatasetStreams = [

372

"BankTransactions", # High transaction volume

373

"Invoices", # High invoice volume

374

"Contacts", # Large customer bases

375

"Payments", # High payment volume

376

"CreditNotes", # Moderate volume

377

"ManualJournals", # Moderate volume

378

"PurchaseOrders", # Moderate volume

379

"Overpayments", # Moderate volume

380

"Prepayments", # Moderate volume

381

"BankTransfers", # Moderate volume

382

"Employees" # Depends on organization size

383

]

384

385

# Small dataset streams (disabled pagination)

386

SmallDatasetStreams = [

387

"Accounts", # Limited by chart of accounts

388

"Items", # Product catalog size

389

"Users" # System users only

390

]

391

```

392

393

### State Checkpointing

394

395

The `state_checkpoint_interval` of 100 records balances between:

396

397

- **Performance**: Reduces state update overhead

398

- **Reliability**: Limits data re-processing on failures

399

- **Memory Usage**: Prevents excessive state accumulation

400

401

### Cursor Field Selection

402

403

- **UpdatedDateUTC**: Used by 13 of 14 incremental streams

404

- **CreatedDateUTC**: Used only by BankTransfers (immutable records)

405

406

This ensures efficient incremental sync by tracking the most recent change timestamp.

407

408

## Error Handling

409

410

### Date Parsing Errors

411

412

Incremental streams handle various date formats from Xero:

413

414

```python

415

# Supported date formats:

416

# - ISO 8601: "2023-08-15T14:30:25Z"

417

# - .NET JSON: "/Date(1419937200000+0000)/"

418

# - Partial dates: "2023-08-15"

419

```

420

421

### State Recovery

422

423

If sync fails mid-stream:

424

425

1. **State Preservation**: Last checkpointed state is maintained

426

2. **Resumption**: Next sync resumes from last successful cursor position

427

3. **Deduplication**: Records are identified by primary key to prevent duplicates

428

429

### API Rate Limiting

430

431

Incremental streams implement rate limiting protection:

432

433

- **Backoff Strategy**: Exponential backoff for 429 responses

434

- **Retry Logic**: Automatic retry for transient failures

435

- **Request Spacing**: Built-in delays between paginated requests