Airbyte source connector for extracting financial and accounting data from Xero's cloud-based accounting platform.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Core source configuration and connection validation functionality for the Airbyte Source Xero connector. This module handles OAuth2 authentication setup, connection testing, and stream discovery.
from typing import Any, List, Mapping, Tuple
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from source_xero.oauth import XeroSingleUseRefreshTokenOauth2AuthenticatorThe main SourceXero class that implements the Airbyte source interface for connecting to and extracting data from Xero.
class SourceXero(AbstractSource):
"""
Airbyte source connector for Xero accounting data.
Inherits from airbyte_cdk.sources.AbstractSource and implements
the required methods for connection testing and stream discovery.
"""
def check_connection(self, logger, config) -> Tuple[bool, any]:
"""
Test connection to Xero API by attempting to fetch Organizations.
Parameters:
- logger: Logger instance for connection status reporting
- config: Configuration dict with authentication and connection details
Returns:
Tuple of (success: bool, error_message: any)
- success: True if connection succeeds, False otherwise
- error_message: None on success, error details on failure
"""
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
Discover and return all available Xero data streams.
Parameters:
- config: Configuration mapping with authentication details
Returns:
List of 21 stream instances (14 incremental, 7 full-refresh):
- Incremental: Accounts, BankTransactions, BankTransfers, Contacts,
CreditNotes, Employees, Invoices, Items, ManualJournals,
Overpayments, Payments, Prepayments, PurchaseOrders, Users
- Full-refresh: BrandingThemes, ContactGroups, Currencies,
Organisations, RepeatingInvoices, TaxRates, TrackingCategories
"""
@staticmethod
def get_authenticator(config: Mapping[str, Any]) -> XeroSingleUseRefreshTokenOauth2Authenticator:
"""
Create and configure OAuth2 authenticator for Xero API.
Parameters:
- config: Configuration mapping containing OAuth2 credentials
Returns:
XeroSingleUseRefreshTokenOauth2Authenticator instance configured with:
- token_refresh_endpoint: "https://identity.xero.com/connect/token"
- client_id: From config["authentication"]["client_id"]
- client_secret: From config["authentication"]["client_secret"]
- access_token_config_path: ["authentication", "access_token"]
- refresh_token_config_path: ["authentication", "refresh_token"]
- token_expiry_date_config_path: ["authentication", "token_expiry_date"]
"""Internal method for validating and transforming configuration parameters.
def _validate_and_transform(self, config: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Validate start_date format and transform configuration.
Parameters:
- config: Raw configuration mapping
Returns:
Validated and transformed configuration mapping
Raises:
ValueError: If start_date format is invalid
"""AuthenticationConfig = {
"client_id": str, # Xero application Client ID
"client_secret": str, # Xero application Client Secret
"refresh_token": str, # OAuth2 refresh token
"access_token": str, # OAuth2 access token
"token_expiry_date": str # Token expiry in ISO format
}ConnectionConfig = {
"authentication": AuthenticationConfig, # OAuth2 credentials
"tenant_id": str, # Xero organization tenant ID
"start_date": str # UTC date in YYYY-MM-DDTHH:mm:ssZ
}from source_xero import SourceXero
import logging
logger = logging.getLogger("airbyte")
config = {
"authentication": {
"client_id": "your-client-id",
"client_secret": "your-client-secret",
"refresh_token": "your-refresh-token",
"access_token": "your-access-token",
"token_expiry_date": "2024-12-31T23:59:59Z"
},
"tenant_id": "your-tenant-id",
"start_date": "2023-01-01T00:00:00Z"
}
source = SourceXero()
is_connected, error = source.check_connection(logger, config)
if is_connected:
print("Successfully connected to Xero")
streams = source.streams(config)
print(f"Found {len(streams)} available streams")
else:
print(f"Connection failed: {error}")source = SourceXero()
streams = source.streams(config)
# List all stream names
stream_names = [stream.name for stream in streams]
print("Available streams:", stream_names)
# Separate incremental and full-refresh streams
incremental_streams = [s for s in streams if hasattr(s, 'cursor_field')]
full_refresh_streams = [s for s in streams if not hasattr(s, 'cursor_field')]
print(f"Incremental streams: {len(incremental_streams)}")
print(f"Full-refresh streams: {len(full_refresh_streams)}")# Get configured authenticator
authenticator = SourceXero.get_authenticator(config)
# The authenticator handles:
# - OAuth2 token refresh
# - Rate limiting and retries
# - Request header management
# - Error handling for 401/429/500+ responsesThe check_connection method catches and reports various connection issues:
The _validate_and_transform method validates:
start_date must be valid ISO 8601 formatThis source connector integrates with the Airbyte platform through:
AbstractSource for platform compatibilityInstall with Tessl CLI
npx tessl i tessl/pypi-source-xero