0
# Data Streams
1
2
Access to 21 different Xero data streams organized into transactional data (with incremental sync) and reference data (snapshot sync). Each stream provides structured access to specific Xero accounting entities with appropriate sync strategies.
3
4
## Capabilities
5
6
### Transactional Streams (Incremental Sync)
7
8
Streams that support incremental synchronization using UpdatedDateUTC cursor field for efficient data replication.
9
10
```python { .api }
11
# Stream configuration for incremental sync
12
TransactionalStreams = {
13
"bank_transactions": {
14
"primary_key": "BankTransactionID",
15
"path": "/BankTransactions",
16
"cursor_field": "UpdatedDateUTC",
17
"supports_incremental": True
18
},
19
"contacts": {
20
"primary_key": "ContactID",
21
"path": "/Contacts",
22
"cursor_field": "UpdatedDateUTC",
23
"supports_incremental": True
24
},
25
"credit_notes": {
26
"primary_key": "CreditNoteID",
27
"path": "/CreditNotes",
28
"cursor_field": "UpdatedDateUTC",
29
"supports_incremental": True
30
},
31
"invoices": {
32
"primary_key": "InvoiceID",
33
"path": "/Invoices",
34
"cursor_field": "UpdatedDateUTC",
35
"supports_incremental": True
36
},
37
"manual_journals": {
38
"primary_key": "ManualJournalID",
39
"path": "/ManualJournals",
40
"cursor_field": "UpdatedDateUTC",
41
"supports_incremental": True
42
},
43
"overpayments": {
44
"primary_key": "OverpaymentID",
45
"path": "/Overpayments",
46
"cursor_field": "UpdatedDateUTC",
47
"supports_incremental": True
48
},
49
"prepayments": {
50
"primary_key": "PrepaymentID",
51
"path": "/Prepayments",
52
"cursor_field": "UpdatedDateUTC",
53
"supports_incremental": True
54
},
55
"purchase_orders": {
56
"primary_key": "PurchaseOrderID",
57
"path": "/PurchaseOrders",
58
"cursor_field": "UpdatedDateUTC",
59
"supports_incremental": True
60
},
61
"payments": {
62
"primary_key": "PaymentID",
63
"path": "/Payments",
64
"cursor_field": "UpdatedDateUTC",
65
"supports_incremental": True
66
}
67
}
68
"""
69
Nine transactional streams with incremental sync capabilities.
70
71
These streams track business transactions and frequently changing data:
72
- All use UpdatedDateUTC as cursor field for incremental sync
73
- Support configurable start_date for initial sync boundaries
74
- Page-based pagination with 100 records per page default
75
- Automatic date format conversion from Xero .NET JSON to ISO 8601
76
"""
77
```
78
79
### Reference Data Streams (Snapshot Sync)
80
81
Streams that perform full refresh synchronization for relatively static reference data.
82
83
```python { .api }
84
# Stream configuration for snapshot sync
85
ReferenceStreams = {
86
"accounts": {
87
"primary_key": "AccountID",
88
"path": "/Accounts",
89
"supports_incremental": False,
90
"description": "Chart of accounts and account structure"
91
},
92
"bank_transfers": {
93
"primary_key": "BankTransferID",
94
"path": "/BankTransfers",
95
"supports_incremental": False,
96
"description": "Bank transfer records between accounts"
97
},
98
"employees": {
99
"primary_key": "EmployeeID",
100
"path": "/Employees",
101
"supports_incremental": False,
102
"description": "Employee information and details"
103
},
104
"items": {
105
"primary_key": "ItemID",
106
"path": "/Items",
107
"supports_incremental": False,
108
"description": "Inventory items and product catalog"
109
},
110
"users": {
111
"primary_key": "UserID",
112
"path": "/Users",
113
"supports_incremental": False,
114
"description": "User accounts and access permissions"
115
},
116
"branding_themes": {
117
"primary_key": "BrandingThemeID",
118
"path": "/BrandingThemes",
119
"supports_incremental": False,
120
"description": "Invoice branding and theme configurations"
121
},
122
"contact_groups": {
123
"primary_key": "ContactGroupID",
124
"path": "/ContactGroups",
125
"supports_incremental": False,
126
"description": "Contact groupings and categories"
127
},
128
"currencies": {
129
"primary_key": "Code",
130
"path": "/Currencies",
131
"supports_incremental": False,
132
"description": "Currency definitions and exchange rates"
133
},
134
"organisations": {
135
"primary_key": "OrganisationID",
136
"path": "/Organisation",
137
"supports_incremental": False,
138
"description": "Organization details and settings"
139
},
140
"repeating_invoices": {
141
"primary_key": "RepeatingInvoiceID",
142
"path": "/RepeatingInvoices",
143
"supports_incremental": False,
144
"description": "Recurring invoice templates and schedules"
145
},
146
"tax_rates": {
147
"primary_key": "TaxType",
148
"path": "/TaxRates",
149
"supports_incremental": False,
150
"description": "Tax rate configurations and rules"
151
},
152
"tracking_categories": {
153
"primary_key": "TrackingCategoryID",
154
"path": "/TrackingCategories",
155
"supports_incremental": False,
156
"description": "Tracking category definitions for reporting"
157
}
158
}
159
"""
160
Twelve reference data streams with snapshot sync.
161
162
These streams contain relatively static configuration and reference data:
163
- Full refresh synchronization on each sync
164
- No cursor field or incremental capabilities
165
- Generally smaller datasets that change infrequently
166
- Provide lookup data and configuration for transactional streams
167
"""
168
```
169
170
### Stream Access Patterns
171
172
Common patterns for accessing and working with stream data from the connector.
173
174
```python { .api }
175
def get_stream_by_name(source: SourceXero, config: dict, stream_name: str):
176
"""
177
Retrieve a specific stream by name from the connector.
178
179
Args:
180
source: Initialized SourceXero connector instance
181
config: Valid configuration dictionary
182
stream_name: Name of the stream to retrieve
183
184
Returns:
185
Stream object or None if not found
186
"""
187
188
def list_all_streams(source: SourceXero, config: dict) -> list[dict]:
189
"""
190
Get information about all available streams.
191
192
Args:
193
source: Initialized SourceXero connector instance
194
config: Valid configuration dictionary
195
196
Returns:
197
List of stream information dictionaries containing:
198
- name: Stream name
199
- primary_key: Primary key field(s)
200
- supports_incremental: Boolean incremental sync support
201
- cursor_field: Cursor field name (if incremental)
202
"""
203
204
def get_stream_schema(stream) -> dict:
205
"""
206
Retrieve the JSON schema for a specific stream.
207
208
Args:
209
stream: Stream object from connector
210
211
Returns:
212
JSON schema dictionary defining the stream's data structure
213
"""
214
```
215
216
## Usage Examples
217
218
### Stream Discovery and Information
219
220
```python
221
from source_xero import SourceXero
222
223
def explore_available_streams():
224
"""Discover and examine available streams."""
225
source = SourceXero()
226
config = {
227
"access_token": "your_token",
228
"tenant_id": "your_tenant",
229
"start_date": "2023-01-01T00:00:00Z"
230
}
231
232
# Get all streams
233
streams = source.streams(config)
234
235
# Categorize streams by sync type
236
incremental_streams = []
237
snapshot_streams = []
238
239
for stream in streams:
240
stream_info = {
241
"name": stream.name,
242
"primary_key": getattr(stream, 'primary_key', None),
243
"supports_incremental": hasattr(stream, 'incremental_sync')
244
}
245
246
if stream_info["supports_incremental"]:
247
incremental_streams.append(stream_info)
248
else:
249
snapshot_streams.append(stream_info)
250
251
print(f"Incremental streams: {len(incremental_streams)}")
252
for stream in incremental_streams:
253
print(f" - {stream['name']} (key: {stream['primary_key']})")
254
255
print(f"Snapshot streams: {len(snapshot_streams)}")
256
for stream in snapshot_streams:
257
print(f" - {stream['name']} (key: {stream['primary_key']})")
258
259
# Run discovery
260
explore_available_streams()
261
```
262
263
### Working with Specific Streams
264
265
```python
266
from source_xero import SourceXero
267
import json
268
269
def examine_stream_details(stream_name: str):
270
"""Get detailed information about a specific stream."""
271
source = SourceXero()
272
config = {
273
"access_token": "your_token",
274
"tenant_id": "your_tenant",
275
"start_date": "2023-01-01T00:00:00Z"
276
}
277
278
# Find the specific stream
279
streams = source.streams(config)
280
target_stream = None
281
282
for stream in streams:
283
if stream.name == stream_name:
284
target_stream = stream
285
break
286
287
if target_stream:
288
print(f"Stream: {target_stream.name}")
289
print(f"Primary Key: {getattr(target_stream, 'primary_key', 'None')}")
290
print(f"Incremental: {hasattr(target_stream, 'incremental_sync')}")
291
292
# Get schema information
293
try:
294
catalog = source.discover(None, config)
295
for stream_catalog in catalog.streams:
296
if stream_catalog.stream.name == stream_name:
297
schema = stream_catalog.stream.json_schema
298
properties = schema.get('properties', {})
299
print(f"Fields: {len(properties)}")
300
print("Key fields:")
301
for field_name, field_def in list(properties.items())[:5]:
302
field_type = field_def.get('type', 'unknown')
303
print(f" - {field_name}: {field_type}")
304
break
305
except Exception as e:
306
print(f"Schema discovery failed: {e}")
307
else:
308
print(f"Stream '{stream_name}' not found")
309
310
# Example usage
311
examine_stream_details("invoices")
312
examine_stream_details("accounts")
313
```
314
315
### Stream Configuration for Airbyte
316
317
```python
318
def create_catalog_for_streams(stream_names: list[str]) -> dict:
319
"""Create Airbyte catalog configuration for specific streams."""
320
catalog = {
321
"streams": []
322
}
323
324
# Stream sync configurations
325
stream_configs = {
326
# Incremental streams
327
"bank_transactions": {
328
"sync_mode": "incremental",
329
"destination_sync_mode": "append_dedup",
330
"cursor_field": ["UpdatedDateUTC"]
331
},
332
"contacts": {
333
"sync_mode": "incremental",
334
"destination_sync_mode": "append_dedup",
335
"cursor_field": ["UpdatedDateUTC"]
336
},
337
"invoices": {
338
"sync_mode": "incremental",
339
"destination_sync_mode": "append_dedup",
340
"cursor_field": ["UpdatedDateUTC"]
341
},
342
# Snapshot streams
343
"accounts": {
344
"sync_mode": "full_refresh",
345
"destination_sync_mode": "overwrite"
346
},
347
"currencies": {
348
"sync_mode": "full_refresh",
349
"destination_sync_mode": "overwrite"
350
}
351
}
352
353
for stream_name in stream_names:
354
if stream_name in stream_configs:
355
stream_config = {
356
"stream": {
357
"name": stream_name,
358
"supported_sync_modes": ["full_refresh", "incremental"] if stream_configs[stream_name]["sync_mode"] == "incremental" else ["full_refresh"]
359
},
360
"config": stream_configs[stream_name]
361
}
362
catalog["streams"].append(stream_config)
363
364
return catalog
365
366
# Create catalog for selected streams
367
selected_streams = ["invoices", "contacts", "accounts", "currencies"]
368
catalog_config = create_catalog_for_streams(selected_streams)
369
print(json.dumps(catalog_config, indent=2))
370
```
371
372
## Data Processing Features
373
374
### Automatic Date Conversion
375
376
All streams automatically convert Xero's .NET JSON date format to ISO 8601:
377
378
- **Input**: `"/Date(1419937200000+0000)/"`
379
- **Output**: `"2014-12-30T07:00:00+00:00"`
380
381
This conversion happens transparently for all date fields in all streams using the CustomExtractor component.
382
383
### Pagination Support
384
385
Streams support page-based pagination:
386
387
- **Default page size**: 100 records
388
- **Configurable**: Can be adjusted via page_size parameter
389
- **Automatic**: Handled by Airbyte CDK DefaultPaginator
390
- **Progress tracking**: Automatic state management for large datasets
391
392
### Incremental Sync Behavior
393
394
For streams with incremental sync support:
395
396
- **Cursor field**: UpdatedDateUTC (automatically managed)
397
- **State management**: Automatic checkpoint storage and recovery
398
- **Boundary filtering**: Records filtered by UpdatedDateUTC >= start_time
399
- **Timezone handling**: All dates normalized to UTC for consistency
400
401
### Error Handling per Stream
402
403
Each stream inherits the connector's error handling configuration:
404
405
- **401 responses**: Sync fails with authentication error
406
- **403 responses**: Individual records skipped, sync continues
407
- **429 responses**: Automatic retry after 30-second delay
408
- **Network errors**: Standard retry logic with exponential backoff