Airbyte source connector for extracting data from the Xero accounting API with support for 21 data streams and incremental sync capabilities
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Access to 21 different Xero data streams organized into transactional data (with incremental sync) and reference data (snapshot sync). Each stream provides structured access to specific Xero accounting entities with appropriate sync strategies.
Streams that support incremental synchronization using UpdatedDateUTC cursor field for efficient data replication.
# Stream configuration for incremental sync
TransactionalStreams = {
"bank_transactions": {
"primary_key": "BankTransactionID",
"path": "/BankTransactions",
"cursor_field": "UpdatedDateUTC",
"supports_incremental": True
},
"contacts": {
"primary_key": "ContactID",
"path": "/Contacts",
"cursor_field": "UpdatedDateUTC",
"supports_incremental": True
},
"credit_notes": {
"primary_key": "CreditNoteID",
"path": "/CreditNotes",
"cursor_field": "UpdatedDateUTC",
"supports_incremental": True
},
"invoices": {
"primary_key": "InvoiceID",
"path": "/Invoices",
"cursor_field": "UpdatedDateUTC",
"supports_incremental": True
},
"manual_journals": {
"primary_key": "ManualJournalID",
"path": "/ManualJournals",
"cursor_field": "UpdatedDateUTC",
"supports_incremental": True
},
"overpayments": {
"primary_key": "OverpaymentID",
"path": "/Overpayments",
"cursor_field": "UpdatedDateUTC",
"supports_incremental": True
},
"prepayments": {
"primary_key": "PrepaymentID",
"path": "/Prepayments",
"cursor_field": "UpdatedDateUTC",
"supports_incremental": True
},
"purchase_orders": {
"primary_key": "PurchaseOrderID",
"path": "/PurchaseOrders",
"cursor_field": "UpdatedDateUTC",
"supports_incremental": True
},
"payments": {
"primary_key": "PaymentID",
"path": "/Payments",
"cursor_field": "UpdatedDateUTC",
"supports_incremental": True
}
}
"""
Nine transactional streams with incremental sync capabilities.
These streams track business transactions and frequently changing data:
- All use UpdatedDateUTC as cursor field for incremental sync
- Support configurable start_date for initial sync boundaries
- Page-based pagination with 100 records per page default
- Automatic date format conversion from Xero .NET JSON to ISO 8601
"""Streams that perform full refresh synchronization for relatively static reference data.
# Stream configuration for snapshot sync
ReferenceStreams = {
"accounts": {
"primary_key": "AccountID",
"path": "/Accounts",
"supports_incremental": False,
"description": "Chart of accounts and account structure"
},
"bank_transfers": {
"primary_key": "BankTransferID",
"path": "/BankTransfers",
"supports_incremental": False,
"description": "Bank transfer records between accounts"
},
"employees": {
"primary_key": "EmployeeID",
"path": "/Employees",
"supports_incremental": False,
"description": "Employee information and details"
},
"items": {
"primary_key": "ItemID",
"path": "/Items",
"supports_incremental": False,
"description": "Inventory items and product catalog"
},
"users": {
"primary_key": "UserID",
"path": "/Users",
"supports_incremental": False,
"description": "User accounts and access permissions"
},
"branding_themes": {
"primary_key": "BrandingThemeID",
"path": "/BrandingThemes",
"supports_incremental": False,
"description": "Invoice branding and theme configurations"
},
"contact_groups": {
"primary_key": "ContactGroupID",
"path": "/ContactGroups",
"supports_incremental": False,
"description": "Contact groupings and categories"
},
"currencies": {
"primary_key": "Code",
"path": "/Currencies",
"supports_incremental": False,
"description": "Currency definitions and exchange rates"
},
"organisations": {
"primary_key": "OrganisationID",
"path": "/Organisation",
"supports_incremental": False,
"description": "Organization details and settings"
},
"repeating_invoices": {
"primary_key": "RepeatingInvoiceID",
"path": "/RepeatingInvoices",
"supports_incremental": False,
"description": "Recurring invoice templates and schedules"
},
"tax_rates": {
"primary_key": "TaxType",
"path": "/TaxRates",
"supports_incremental": False,
"description": "Tax rate configurations and rules"
},
"tracking_categories": {
"primary_key": "TrackingCategoryID",
"path": "/TrackingCategories",
"supports_incremental": False,
"description": "Tracking category definitions for reporting"
}
}
"""
Twelve reference data streams with snapshot sync.
These streams contain relatively static configuration and reference data:
- Full refresh synchronization on each sync
- No cursor field or incremental capabilities
- Generally smaller datasets that change infrequently
- Provide lookup data and configuration for transactional streams
"""Common patterns for accessing and working with stream data from the connector.
def get_stream_by_name(source: SourceXero, config: dict, stream_name: str):
"""
Retrieve a specific stream by name from the connector.
Args:
source: Initialized SourceXero connector instance
config: Valid configuration dictionary
stream_name: Name of the stream to retrieve
Returns:
Stream object or None if not found
"""
def list_all_streams(source: SourceXero, config: dict) -> list[dict]:
"""
Get information about all available streams.
Args:
source: Initialized SourceXero connector instance
config: Valid configuration dictionary
Returns:
List of stream information dictionaries containing:
- name: Stream name
- primary_key: Primary key field(s)
- supports_incremental: Boolean incremental sync support
- cursor_field: Cursor field name (if incremental)
"""
def get_stream_schema(stream) -> dict:
"""
Retrieve the JSON schema for a specific stream.
Args:
stream: Stream object from connector
Returns:
JSON schema dictionary defining the stream's data structure
"""from source_xero import SourceXero
def explore_available_streams():
"""Discover and examine available streams."""
source = SourceXero()
config = {
"access_token": "your_token",
"tenant_id": "your_tenant",
"start_date": "2023-01-01T00:00:00Z"
}
# Get all streams
streams = source.streams(config)
# Categorize streams by sync type
incremental_streams = []
snapshot_streams = []
for stream in streams:
stream_info = {
"name": stream.name,
"primary_key": getattr(stream, 'primary_key', None),
"supports_incremental": hasattr(stream, 'incremental_sync')
}
if stream_info["supports_incremental"]:
incremental_streams.append(stream_info)
else:
snapshot_streams.append(stream_info)
print(f"Incremental streams: {len(incremental_streams)}")
for stream in incremental_streams:
print(f" - {stream['name']} (key: {stream['primary_key']})")
print(f"Snapshot streams: {len(snapshot_streams)}")
for stream in snapshot_streams:
print(f" - {stream['name']} (key: {stream['primary_key']})")
# Run discovery
explore_available_streams()from source_xero import SourceXero
import json
def examine_stream_details(stream_name: str):
"""Get detailed information about a specific stream."""
source = SourceXero()
config = {
"access_token": "your_token",
"tenant_id": "your_tenant",
"start_date": "2023-01-01T00:00:00Z"
}
# Find the specific stream
streams = source.streams(config)
target_stream = None
for stream in streams:
if stream.name == stream_name:
target_stream = stream
break
if target_stream:
print(f"Stream: {target_stream.name}")
print(f"Primary Key: {getattr(target_stream, 'primary_key', 'None')}")
print(f"Incremental: {hasattr(target_stream, 'incremental_sync')}")
# Get schema information
try:
catalog = source.discover(None, config)
for stream_catalog in catalog.streams:
if stream_catalog.stream.name == stream_name:
schema = stream_catalog.stream.json_schema
properties = schema.get('properties', {})
print(f"Fields: {len(properties)}")
print("Key fields:")
for field_name, field_def in list(properties.items())[:5]:
field_type = field_def.get('type', 'unknown')
print(f" - {field_name}: {field_type}")
break
except Exception as e:
print(f"Schema discovery failed: {e}")
else:
print(f"Stream '{stream_name}' not found")
# Example usage
examine_stream_details("invoices")
examine_stream_details("accounts")def create_catalog_for_streams(stream_names: list[str]) -> dict:
"""Create Airbyte catalog configuration for specific streams."""
catalog = {
"streams": []
}
# Stream sync configurations
stream_configs = {
# Incremental streams
"bank_transactions": {
"sync_mode": "incremental",
"destination_sync_mode": "append_dedup",
"cursor_field": ["UpdatedDateUTC"]
},
"contacts": {
"sync_mode": "incremental",
"destination_sync_mode": "append_dedup",
"cursor_field": ["UpdatedDateUTC"]
},
"invoices": {
"sync_mode": "incremental",
"destination_sync_mode": "append_dedup",
"cursor_field": ["UpdatedDateUTC"]
},
# Snapshot streams
"accounts": {
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
"currencies": {
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
}
for stream_name in stream_names:
if stream_name in stream_configs:
stream_config = {
"stream": {
"name": stream_name,
"supported_sync_modes": ["full_refresh", "incremental"] if stream_configs[stream_name]["sync_mode"] == "incremental" else ["full_refresh"]
},
"config": stream_configs[stream_name]
}
catalog["streams"].append(stream_config)
return catalog
# Create catalog for selected streams
selected_streams = ["invoices", "contacts", "accounts", "currencies"]
catalog_config = create_catalog_for_streams(selected_streams)
print(json.dumps(catalog_config, indent=2))All streams automatically convert Xero's .NET JSON date format to ISO 8601:
"/Date(1419937200000+0000)/""2014-12-30T07:00:00+00:00"This conversion happens transparently for all date fields in all streams using the CustomExtractor component.
Streams support page-based pagination:
For streams with incremental sync support:
Each stream inherits the connector's error handling configuration:
Install with Tessl CLI
npx tessl i tessl/pypi-airbyte-source-xero