0
# Incremental Data Streams
1
2
Fourteen incremental synchronization streams that efficiently sync only new or updated records since the last sync. These streams handle the majority of Xero's transactional and customer data with cursor-based incremental updates.
3
4
## Capabilities
5
6
### Base Incremental Stream
7
8
The foundation class for all incremental streams, providing cursor-based synchronization and state management.
9
10
```python { .api }
11
class IncrementalXeroStream(XeroStream, ABC):
12
"""
13
Abstract base class for incremental Xero data streams.
14
15
Provides cursor-based incremental synchronization using Xero's
16
UpdatedDateUTC field to track and sync only changed records.
17
"""
18
19
cursor_field: str = "UpdatedDateUTC" # Default cursor field
20
state_checkpoint_interval: int = 100 # Records between state saves
21
22
def __init__(self, start_date: datetime, **kwargs):
23
"""
24
Initialize incremental stream with start date.
25
26
Parameters:
27
- start_date: datetime object for initial sync start point
28
- **kwargs: Additional arguments passed to parent XeroStream
29
"""
30
31
def request_headers(self, stream_state, stream_slice=None, next_page_token=None) -> Mapping[str, Any]:
32
"""
33
Build request headers including If-Modified-Since for incremental sync.
34
35
Parameters:
36
- stream_state: Current sync state with cursor value
37
- stream_slice: Stream partition (unused in Xero streams)
38
- next_page_token: Pagination token for continued requests
39
40
Returns:
41
Headers mapping with If-Modified-Since header for incremental requests
42
"""
43
44
def get_updated_state(self, current_stream_state: Mapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
45
"""
46
Update stream state based on latest record's cursor value.
47
48
Parameters:
49
- current_stream_state: Current state mapping
50
- latest_record: Most recent record from API response
51
52
Returns:
53
Updated state mapping with new cursor value
54
"""
55
```
56
57
### Financial Transaction Streams
58
59
Core financial transaction streams for accounting data synchronization.
60
61
#### Bank Transactions
62
63
```python { .api }
64
class BankTransactions(IncrementalXeroStream):
65
"""
66
Bank transaction records including deposits, withdrawals, and transfers.
67
68
Primary Key: BankTransactionID
69
Cursor Field: UpdatedDateUTC
70
Pagination: Enabled (large datasets)
71
"""
72
73
primary_key = "BankTransactionID"
74
pagination = True
75
```
76
77
#### Invoices
78
79
```python { .api }
80
class Invoices(IncrementalXeroStream):
81
"""
82
Sales and purchase invoices with line items and payment status.
83
84
Primary Key: InvoiceID
85
Cursor Field: UpdatedDateUTC
86
Pagination: Enabled (large datasets)
87
"""
88
89
primary_key = "InvoiceID"
90
pagination = True
91
```
92
93
#### Credit Notes
94
95
```python { .api }
96
class CreditNotes(IncrementalXeroStream):
97
"""
98
Credit notes for invoice adjustments and refunds.
99
100
Primary Key: CreditNoteID
101
Cursor Field: UpdatedDateUTC
102
Pagination: Enabled
103
"""
104
105
primary_key = "CreditNoteID"
106
pagination = True
107
```
108
109
#### Payments
110
111
```python { .api }
112
class Payments(IncrementalXeroStream):
113
"""
114
Payment records linking invoices to bank transactions.
115
116
Primary Key: PaymentID
117
Cursor Field: UpdatedDateUTC
118
Pagination: Enabled
119
"""
120
121
primary_key = "PaymentID"
122
pagination = True
123
```
124
125
#### Manual Journals
126
127
```python { .api }
128
class ManualJournals(IncrementalXeroStream):
129
"""
130
Manual journal entries for accounting adjustments.
131
132
Primary Key: ManualJournalID
133
Cursor Field: UpdatedDateUTC
134
Pagination: Enabled
135
"""
136
137
primary_key = "ManualJournalID"
138
pagination = True
139
```
140
141
#### Purchase Orders
142
143
```python { .api }
144
class PurchaseOrders(IncrementalXeroStream):
145
"""
146
Purchase orders for tracking supplier orders and deliveries.
147
148
Primary Key: PurchaseOrderID
149
Cursor Field: UpdatedDateUTC
150
Pagination: Enabled
151
"""
152
153
primary_key = "PurchaseOrderID"
154
pagination = True
155
```
156
157
#### Overpayments
158
159
```python { .api }
160
class Overpayments(IncrementalXeroStream):
161
"""
162
Overpayment records for excess customer payments.
163
164
Primary Key: OverpaymentID
165
Cursor Field: UpdatedDateUTC
166
Pagination: Enabled
167
"""
168
169
primary_key = "OverpaymentID"
170
pagination = True
171
```
172
173
#### Prepayments
174
175
```python { .api }
176
class Prepayments(IncrementalXeroStream):
177
"""
178
Prepayment records for advance customer payments.
179
180
Primary Key: PrepaymentID
181
Cursor Field: UpdatedDateUTC
182
Pagination: Enabled
183
"""
184
185
primary_key = "PrepaymentID"
186
pagination = True
187
```
188
189
### Special Case: Bank Transfers
190
191
Bank transfers use a different cursor field than other streams.
192
193
```python { .api }
194
class BankTransfers(IncrementalXeroStream):
195
"""
196
Bank transfer records between accounts.
197
198
Primary Key: BankTransferID
199
Cursor Field: CreatedDateUTC (overridden from default)
200
Pagination: Enabled
201
"""
202
203
primary_key = "BankTransferID"
204
cursor_field = "CreatedDateUTC" # Uses creation date instead of update date
205
pagination = True
206
```
207
208
### Master Data Streams
209
210
Core business entity streams with less frequent updates.
211
212
#### Contacts
213
214
```python { .api }
215
class Contacts(IncrementalXeroStream):
216
"""
217
Customer and supplier contact information.
218
219
Primary Key: ContactID
220
Cursor Field: UpdatedDateUTC
221
Pagination: Enabled (large datasets)
222
"""
223
224
primary_key = "ContactID"
225
pagination = True
226
```
227
228
#### Accounts
229
230
```python { .api }
231
class Accounts(IncrementalXeroStream):
232
"""
233
Chart of accounts for financial categorization.
234
235
Primary Key: AccountID
236
Cursor Field: UpdatedDateUTC
237
Pagination: Disabled (manageable dataset size)
238
"""
239
240
primary_key = "AccountID"
241
pagination = False
242
```
243
244
#### Items
245
246
```python { .api }
247
class Items(IncrementalXeroStream):
248
"""
249
Product and service items for invoicing.
250
251
Primary Key: ItemID
252
Cursor Field: UpdatedDateUTC
253
Pagination: Disabled (manageable dataset size)
254
"""
255
256
primary_key = "ItemID"
257
pagination = False
258
```
259
260
#### Employees
261
262
```python { .api }
263
class Employees(IncrementalXeroStream):
264
"""
265
Employee records for payroll and expense management.
266
267
Primary Key: EmployeeID
268
Cursor Field: UpdatedDateUTC
269
Pagination: Enabled
270
"""
271
272
primary_key = "EmployeeID"
273
pagination = True
274
```
275
276
#### Users
277
278
```python { .api }
279
class Users(IncrementalXeroStream):
280
"""
281
Xero user accounts with system access permissions.
282
283
Primary Key: UserID
284
Cursor Field: UpdatedDateUTC
285
Pagination: Disabled (small dataset)
286
"""
287
288
primary_key = "UserID"
289
pagination = False
290
```
291
292
## Usage Examples
293
294
### Reading Incremental Stream
295
296
```python
297
from source_xero.streams import BankTransactions
298
from datetime import datetime
299
300
# Initialize stream with start date
301
start_date = datetime.fromisoformat("2023-01-01T00:00:00Z")
302
stream = BankTransactions(
303
tenant_id="your-tenant-id",
304
start_date=start_date,
305
authenticator=authenticator
306
)
307
308
# Read records with state management
309
stream_state = {"UpdatedDateUTC": "2023-06-01T00:00:00Z"}
310
records = []
311
312
for record in stream.read_records(
313
sync_mode=SyncMode.incremental,
314
stream_state=stream_state
315
):
316
records.append(record)
317
318
# Update state periodically (every 100 records)
319
if len(records) % 100 == 0:
320
stream_state = stream.get_updated_state(stream_state, record)
321
322
print(f"Synced {len(records)} records")
323
print(f"New state: {stream_state}")
324
```
325
326
### State Management
327
328
```python
329
# Initial state for first sync
330
initial_state = {}
331
332
# State after partial sync
333
partial_state = {
334
"UpdatedDateUTC": "2023-08-15T14:30:25Z"
335
}
336
337
# State comparison and updates
338
current_state = stream.get_updated_state(
339
current_stream_state=partial_state,
340
latest_record={"UpdatedDateUTC": "2023-08-15T16:45:30Z"}
341
)
342
343
# Result: {"UpdatedDateUTC": "2023-08-15T16:45:30Z"}
344
```
345
346
### Custom Date Range Sync
347
348
```python
349
from datetime import datetime, timedelta
350
351
# Sync last 30 days of transactions
352
thirty_days_ago = datetime.utcnow() - timedelta(days=30)
353
stream = Invoices(
354
tenant_id="your-tenant-id",
355
start_date=thirty_days_ago,
356
authenticator=authenticator
357
)
358
359
# The stream will automatically use If-Modified-Since headers
360
# to request only records updated since the start_date
361
```
362
363
## Performance Considerations
364
365
### Pagination Settings
366
367
Different streams use different pagination strategies based on typical data volumes:
368
369
```python
370
# Large dataset streams (enabled pagination)
371
LargeDatasetStreams = [
372
"BankTransactions", # High transaction volume
373
"Invoices", # High invoice volume
374
"Contacts", # Large customer bases
375
"Payments", # High payment volume
376
"CreditNotes", # Moderate volume
377
"ManualJournals", # Moderate volume
378
"PurchaseOrders", # Moderate volume
379
"Overpayments", # Moderate volume
380
"Prepayments", # Moderate volume
381
"BankTransfers", # Moderate volume
382
"Employees" # Depends on organization size
383
]
384
385
# Small dataset streams (disabled pagination)
386
SmallDatasetStreams = [
387
"Accounts", # Limited by chart of accounts
388
"Items", # Product catalog size
389
"Users" # System users only
390
]
391
```
392
393
### State Checkpointing
394
395
The `state_checkpoint_interval` of 100 records balances between:
396
397
- **Performance**: Reduces state update overhead
398
- **Reliability**: Limits data re-processing on failures
399
- **Memory Usage**: Prevents excessive state accumulation
400
401
### Cursor Field Selection
402
403
- **UpdatedDateUTC**: Used by 13 of 14 incremental streams
404
- **CreatedDateUTC**: Used only by BankTransfers (immutable records)
405
406
This ensures efficient incremental sync by tracking the most recent change timestamp.
407
408
## Error Handling
409
410
### Date Parsing Errors
411
412
Incremental streams handle various date formats from Xero:
413
414
```python
415
# Supported date formats:
416
# - ISO 8601: "2023-08-15T14:30:25Z"
417
# - .NET JSON: "/Date(1419937200000+0000)/"
418
# - Partial dates: "2023-08-15"
419
```
420
421
### State Recovery
422
423
If sync fails mid-stream:
424
425
1. **State Preservation**: Last checkpointed state is maintained
426
2. **Resumption**: Next sync resumes from last successful cursor position
427
3. **Deduplication**: Records are identified by primary key to prevent duplicates
428
429
### API Rate Limiting
430
431
Incremental streams implement rate limiting protection:
432
433
- **Backoff Strategy**: Exponential backoff for 429 responses
434
- **Retry Logic**: Automatic retry for transient failures
435
- **Request Spacing**: Built-in delays between paginated requests