CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-airbyte-source-xero

Airbyte source connector for extracting data from the Xero accounting API with support for 21 data streams and incremental sync capabilities

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

data-streams.mddocs/

Data Streams

Access to 21 different Xero data streams organized into transactional data (with incremental sync) and reference data (snapshot sync). Each stream provides structured access to specific Xero accounting entities with appropriate sync strategies.

Capabilities

Transactional Streams (Incremental Sync)

Streams that support incremental synchronization using UpdatedDateUTC cursor field for efficient data replication.

# Stream configuration for incremental sync
TransactionalStreams = {
    "bank_transactions": {
        "primary_key": "BankTransactionID",
        "path": "/BankTransactions",
        "cursor_field": "UpdatedDateUTC",
        "supports_incremental": True
    },
    "contacts": {
        "primary_key": "ContactID", 
        "path": "/Contacts",
        "cursor_field": "UpdatedDateUTC",
        "supports_incremental": True
    },
    "credit_notes": {
        "primary_key": "CreditNoteID",
        "path": "/CreditNotes", 
        "cursor_field": "UpdatedDateUTC",
        "supports_incremental": True
    },
    "invoices": {
        "primary_key": "InvoiceID",
        "path": "/Invoices",
        "cursor_field": "UpdatedDateUTC", 
        "supports_incremental": True
    },
    "manual_journals": {
        "primary_key": "ManualJournalID",
        "path": "/ManualJournals",
        "cursor_field": "UpdatedDateUTC",
        "supports_incremental": True
    },
    "overpayments": {
        "primary_key": "OverpaymentID",
        "path": "/Overpayments",
        "cursor_field": "UpdatedDateUTC",
        "supports_incremental": True
    },
    "prepayments": {
        "primary_key": "PrepaymentID", 
        "path": "/Prepayments",
        "cursor_field": "UpdatedDateUTC",
        "supports_incremental": True
    },
    "purchase_orders": {
        "primary_key": "PurchaseOrderID",
        "path": "/PurchaseOrders",
        "cursor_field": "UpdatedDateUTC",
        "supports_incremental": True
    },
    "payments": {
        "primary_key": "PaymentID",
        "path": "/Payments", 
        "cursor_field": "UpdatedDateUTC",
        "supports_incremental": True
    }
}
"""
Nine transactional streams with incremental sync capabilities.

These streams track business transactions and frequently changing data:
- All use UpdatedDateUTC as cursor field for incremental sync
- Support configurable start_date for initial sync boundaries
- Page-based pagination with 100 records per page default
- Automatic date format conversion from Xero .NET JSON to ISO 8601
"""

Reference Data Streams (Snapshot Sync)

Streams that perform full refresh synchronization for relatively static reference data.

# Stream configuration for snapshot sync
ReferenceStreams = {
    "accounts": {
        "primary_key": "AccountID",
        "path": "/Accounts",
        "supports_incremental": False,
        "description": "Chart of accounts and account structure"
    },
    "bank_transfers": {
        "primary_key": "BankTransferID", 
        "path": "/BankTransfers",
        "supports_incremental": False,
        "description": "Bank transfer records between accounts"
    },
    "employees": {
        "primary_key": "EmployeeID",
        "path": "/Employees", 
        "supports_incremental": False,
        "description": "Employee information and details"
    },
    "items": {
        "primary_key": "ItemID",
        "path": "/Items",
        "supports_incremental": False,
        "description": "Inventory items and product catalog"
    },
    "users": {
        "primary_key": "UserID",
        "path": "/Users",
        "supports_incremental": False,
        "description": "User accounts and access permissions"  
    },
    "branding_themes": {
        "primary_key": "BrandingThemeID",
        "path": "/BrandingThemes",
        "supports_incremental": False,
        "description": "Invoice branding and theme configurations"
    },
    "contact_groups": {
        "primary_key": "ContactGroupID",
        "path": "/ContactGroups", 
        "supports_incremental": False,
        "description": "Contact groupings and categories"
    },
    "currencies": {
        "primary_key": "Code",
        "path": "/Currencies",
        "supports_incremental": False,
        "description": "Currency definitions and exchange rates"
    },
    "organisations": {
        "primary_key": "OrganisationID",
        "path": "/Organisation",
        "supports_incremental": False,
        "description": "Organization details and settings"
    },
    "repeating_invoices": {
        "primary_key": "RepeatingInvoiceID",
        "path": "/RepeatingInvoices",
        "supports_incremental": False, 
        "description": "Recurring invoice templates and schedules"
    },
    "tax_rates": {
        "primary_key": "TaxType",
        "path": "/TaxRates",
        "supports_incremental": False,
        "description": "Tax rate configurations and rules"
    },
    "tracking_categories": {
        "primary_key": "TrackingCategoryID",
        "path": "/TrackingCategories",
        "supports_incremental": False,
        "description": "Tracking category definitions for reporting"
    }
}
"""
Twelve reference data streams with snapshot sync.

These streams contain relatively static configuration and reference data:
- Full refresh synchronization on each sync
- No cursor field or incremental capabilities  
- Generally smaller datasets that change infrequently
- Provide lookup data and configuration for transactional streams
"""

Stream Access Patterns

Common patterns for accessing and working with stream data from the connector.

def get_stream_by_name(source: SourceXero, config: dict, stream_name: str):
    """
    Retrieve a specific stream by name from the connector.
    
    Args:
        source: Initialized SourceXero connector instance
        config: Valid configuration dictionary
        stream_name: Name of the stream to retrieve
        
    Returns:
        Stream object or None if not found
    """

def list_all_streams(source: SourceXero, config: dict) -> list[dict]:
    """
    Get information about all available streams.
    
    Args:
        source: Initialized SourceXero connector instance  
        config: Valid configuration dictionary
        
    Returns:
        List of stream information dictionaries containing:
        - name: Stream name
        - primary_key: Primary key field(s)
        - supports_incremental: Boolean incremental sync support
        - cursor_field: Cursor field name (if incremental)
    """

def get_stream_schema(stream) -> dict:
    """
    Retrieve the JSON schema for a specific stream.
    
    Args:
        stream: Stream object from connector
        
    Returns:
        JSON schema dictionary defining the stream's data structure
    """

Usage Examples

Stream Discovery and Information

from source_xero import SourceXero

def explore_available_streams():
    """Discover and examine available streams."""
    source = SourceXero()
    config = {
        "access_token": "your_token",
        "tenant_id": "your_tenant",
        "start_date": "2023-01-01T00:00:00Z"
    }
    
    # Get all streams
    streams = source.streams(config)
    
    # Categorize streams by sync type
    incremental_streams = []
    snapshot_streams = []
    
    for stream in streams:
        stream_info = {
            "name": stream.name,
            "primary_key": getattr(stream, 'primary_key', None),
            "supports_incremental": hasattr(stream, 'incremental_sync')
        }
        
        if stream_info["supports_incremental"]:
            incremental_streams.append(stream_info)
        else:
            snapshot_streams.append(stream_info)
    
    print(f"Incremental streams: {len(incremental_streams)}")
    for stream in incremental_streams:
        print(f"  - {stream['name']} (key: {stream['primary_key']})")
    
    print(f"Snapshot streams: {len(snapshot_streams)}")
    for stream in snapshot_streams:
        print(f"  - {stream['name']} (key: {stream['primary_key']})")

# Run discovery
explore_available_streams()

Working with Specific Streams

from source_xero import SourceXero
import json

def examine_stream_details(stream_name: str):
    """Get detailed information about a specific stream."""
    source = SourceXero()
    config = {
        "access_token": "your_token",
        "tenant_id": "your_tenant", 
        "start_date": "2023-01-01T00:00:00Z"
    }
    
    # Find the specific stream
    streams = source.streams(config)
    target_stream = None
    
    for stream in streams:
        if stream.name == stream_name:
            target_stream = stream
            break
    
    if target_stream:
        print(f"Stream: {target_stream.name}")
        print(f"Primary Key: {getattr(target_stream, 'primary_key', 'None')}")
        print(f"Incremental: {hasattr(target_stream, 'incremental_sync')}")
        
        # Get schema information
        try:
            catalog = source.discover(None, config)
            for stream_catalog in catalog.streams:
                if stream_catalog.stream.name == stream_name:
                    schema = stream_catalog.stream.json_schema
                    properties = schema.get('properties', {})
                    print(f"Fields: {len(properties)}")
                    print("Key fields:")
                    for field_name, field_def in list(properties.items())[:5]:
                        field_type = field_def.get('type', 'unknown')
                        print(f"  - {field_name}: {field_type}")
                    break
        except Exception as e:
            print(f"Schema discovery failed: {e}")
    else:
        print(f"Stream '{stream_name}' not found")

# Example usage
examine_stream_details("invoices")
examine_stream_details("accounts")

Stream Configuration for Airbyte

def create_catalog_for_streams(stream_names: list[str]) -> dict:
    """Create Airbyte catalog configuration for specific streams."""
    catalog = {
        "streams": []
    }
    
    # Stream sync configurations
    stream_configs = {
        # Incremental streams
        "bank_transactions": {
            "sync_mode": "incremental",
            "destination_sync_mode": "append_dedup",
            "cursor_field": ["UpdatedDateUTC"]
        },
        "contacts": {
            "sync_mode": "incremental", 
            "destination_sync_mode": "append_dedup",
            "cursor_field": ["UpdatedDateUTC"]
        },
        "invoices": {
            "sync_mode": "incremental",
            "destination_sync_mode": "append_dedup", 
            "cursor_field": ["UpdatedDateUTC"]
        },
        # Snapshot streams
        "accounts": {
            "sync_mode": "full_refresh",
            "destination_sync_mode": "overwrite"
        },
        "currencies": {
            "sync_mode": "full_refresh",
            "destination_sync_mode": "overwrite"
        }
    }
    
    for stream_name in stream_names:
        if stream_name in stream_configs:
            stream_config = {
                "stream": {
                    "name": stream_name,
                    "supported_sync_modes": ["full_refresh", "incremental"] if stream_configs[stream_name]["sync_mode"] == "incremental" else ["full_refresh"]
                },
                "config": stream_configs[stream_name]
            }
            catalog["streams"].append(stream_config)
    
    return catalog

# Create catalog for selected streams
selected_streams = ["invoices", "contacts", "accounts", "currencies"]
catalog_config = create_catalog_for_streams(selected_streams)
print(json.dumps(catalog_config, indent=2))

Data Processing Features

Automatic Date Conversion

All streams automatically convert Xero's .NET JSON date format to ISO 8601:

  • Input: "/Date(1419937200000+0000)/"
  • Output: "2014-12-30T07:00:00+00:00"

This conversion happens transparently for all date fields in all streams using the CustomExtractor component.

Pagination Support

Streams support page-based pagination:

  • Default page size: 100 records
  • Configurable: Can be adjusted via page_size parameter
  • Automatic: Handled by Airbyte CDK DefaultPaginator
  • Progress tracking: Automatic state management for large datasets

Incremental Sync Behavior

For streams with incremental sync support:

  • Cursor field: UpdatedDateUTC (automatically managed)
  • State management: Automatic checkpoint storage and recovery
  • Boundary filtering: Records filtered by UpdatedDateUTC >= start_time
  • Timezone handling: All dates normalized to UTC for consistency

Error Handling per Stream

Each stream inherits the connector's error handling configuration:

  • 401 responses: Sync fails with authentication error
  • 403 responses: Individual records skipped, sync continues
  • 429 responses: Automatic retry after 30-second delay
  • Network errors: Standard retry logic with exponential backoff

Install with Tessl CLI

npx tessl i tessl/pypi-airbyte-source-xero

docs

configuration.md

core-connector.md

data-processing.md

data-streams.md

index.md

tile.json