Airbyte source connector for extracting financial and accounting data from Xero's cloud-based accounting platform.
npx @tessl/cli install tessl/pypi-source-xero@0.2.0An Airbyte source connector for extracting financial and accounting data from Xero's cloud-based accounting platform. This connector implements the Airbyte CDK framework to enable data synchronization from Xero's API endpoints including accounts, contacts, invoices, and other financial data with OAuth 2.0 authentication support.
airbyte-cdk~=0.40from source_xero import SourceXeroFor individual stream classes:
from source_xero.source import SourceXero
from source_xero.streams import Accounts, BankTransactions, Contacts, Invoices
from source_xero.oauth import XeroSingleUseRefreshTokenOauth2Authenticatorfrom source_xero import SourceXero
import logging
# Configure logger
logger = logging.getLogger("airbyte")
# Configuration with OAuth2 credentials
config = {
"authentication": {
"client_id": "your-xero-client-id",
"client_secret": "your-xero-client-secret",
"refresh_token": "your-refresh-token",
"access_token": "your-access-token",
"token_expiry_date": "2024-12-31T23:59:59Z"
},
"tenant_id": "your-xero-tenant-id",
"start_date": "2023-01-01T00:00:00Z"
}
# Initialize the source
source = SourceXero()
# Test connection
is_connected, error = source.check_connection(logger, config)
if is_connected:
# Get available streams
streams = source.streams(config)
print(f"Available streams: {len(streams)}")
else:
print(f"Connection failed: {error}")The connector follows Airbyte's architecture pattern with distinct layers:
SourceXero class implementing AbstractSource interfaceThe connector supports both incremental and full-refresh synchronization modes, with incremental streams using UpdatedDateUTC (or CreatedDateUTC for bank transfers) as cursor fields.
Core source configuration and connection validation functionality. Handles OAuth2 authentication setup, connection testing, and stream discovery for the Xero API.
class SourceXero(AbstractSource):
def check_connection(self, logger, config) -> Tuple[bool, any]: ...
def streams(self, config: Mapping[str, Any]) -> List[Stream]: ...
@staticmethod
def get_authenticator(config: Mapping[str, Any]) -> XeroSingleUseRefreshTokenOauth2Authenticator: ...OAuth2 authentication implementation with automatic token refresh, rate limiting handling, and secure credential management for Xero API access.
class XeroSingleUseRefreshTokenOauth2Authenticator(SingleUseRefreshTokenOauth2Authenticator):
def build_refresh_request_body(self) -> Mapping[str, Any]: ...
def build_refresh_request_headers(self) -> Mapping[str, Any]: ...Fourteen incremental sync streams that synchronize only new or updated records since the last sync. These streams handle financial transactions, customer data, and accounting records with efficient incremental updates.
class IncrementalXeroStream(XeroStream, ABC):
cursor_field: str = "UpdatedDateUTC"
state_checkpoint_interval: int = 100
def get_updated_state(self, current_stream_state, latest_record) -> Mapping[str, Any]: ...Seven full-refresh streams that synchronize complete datasets on each sync. These streams handle reference data like currencies, tax rates, and organizational settings that require complete snapshots.
class XeroStream(HttpStream, ABC):
url_base: str = "https://api.xero.com/api.xro/2.0/"
page_size: int = 100
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: ...Utility functions for parsing Xero's custom date formats and JSON data structures, ensuring proper data type conversion and RFC3339 compliance.
def parse_date(value: str) -> Optional[datetime]: ...
def _json_load_object_hook(_dict: dict) -> dict: ...The connector requires specific configuration for OAuth2 authentication and tenant identification:
ConfigSpec = {
"authentication": {
"client_id": str, # Required: Xero application Client ID
"client_secret": str, # Required: Xero application Client Secret (secret)
"refresh_token": str, # Required: OAuth refresh token (secret)
"access_token": str, # Required: OAuth access token (secret)
"token_expiry_date": str # Required: Token expiry date-time
},
"tenant_id": str, # Required: Xero organization Tenant ID (secret)
"start_date": str # Required: UTC date in YYYY-MM-DDTHH-mm-ssZ format
}from typing import Any, List, Mapping, Tuple, Optional, Iterable, MutableMapping
from datetime import datetime
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.requests_native_auth import SingleUseRefreshTokenOauth2Authenticator
import requests
# Core type aliases
ConfigMapping = Mapping[str, Any]
StreamState = MutableMapping[str, Any]
RecordMapping = Mapping[str, Any]
# Authentication types
class XeroSingleUseRefreshTokenOauth2Authenticator(SingleUseRefreshTokenOauth2Authenticator):
"""OAuth2 authenticator with Xero-specific token refresh handling."""
pass
# Stream base classes
class XeroStream(HttpStream):
"""Base class for all Xero API streams."""
url_base: str
page_size: int
current_page: int
pagination: bool
primary_key: str
tenant_id: str
class IncrementalXeroStream(XeroStream):
"""Base class for incremental sync streams."""
cursor_field: str
state_checkpoint_interval: int
start_date: datetimeThe connector implements comprehensive error handling for common Xero API scenarios: