Airbyte source connector for extracting financial and accounting data from Xero's cloud-based accounting platform.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Fourteen incremental synchronization streams that efficiently sync only new or updated records since the last sync. These streams handle the majority of Xero's transactional and customer data with cursor-based incremental updates.
The foundation class for all incremental streams, providing cursor-based synchronization and state management.
class IncrementalXeroStream(XeroStream, ABC):
"""
Abstract base class for incremental Xero data streams.
Provides cursor-based incremental synchronization using Xero's
UpdatedDateUTC field to track and sync only changed records.
"""
cursor_field: str = "UpdatedDateUTC" # Default cursor field
state_checkpoint_interval: int = 100 # Records between state saves
def __init__(self, start_date: datetime, **kwargs):
"""
Initialize incremental stream with start date.
Parameters:
- start_date: datetime object for initial sync start point
- **kwargs: Additional arguments passed to parent XeroStream
"""
def request_headers(self, stream_state, stream_slice=None, next_page_token=None) -> Mapping[str, Any]:
"""
Build request headers including If-Modified-Since for incremental sync.
Parameters:
- stream_state: Current sync state with cursor value
- stream_slice: Stream partition (unused in Xero streams)
- next_page_token: Pagination token for continued requests
Returns:
Headers mapping with If-Modified-Since header for incremental requests
"""
def get_updated_state(self, current_stream_state: Mapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Update stream state based on latest record's cursor value.
Parameters:
- current_stream_state: Current state mapping
- latest_record: Most recent record from API response
Returns:
Updated state mapping with new cursor value
"""Core financial transaction streams for accounting data synchronization.
class BankTransactions(IncrementalXeroStream):
"""
Bank transaction records including deposits, withdrawals, and transfers.
Primary Key: BankTransactionID
Cursor Field: UpdatedDateUTC
Pagination: Enabled (large datasets)
"""
primary_key = "BankTransactionID"
pagination = Trueclass Invoices(IncrementalXeroStream):
"""
Sales and purchase invoices with line items and payment status.
Primary Key: InvoiceID
Cursor Field: UpdatedDateUTC
Pagination: Enabled (large datasets)
"""
primary_key = "InvoiceID"
pagination = Trueclass CreditNotes(IncrementalXeroStream):
"""
Credit notes for invoice adjustments and refunds.
Primary Key: CreditNoteID
Cursor Field: UpdatedDateUTC
Pagination: Enabled
"""
primary_key = "CreditNoteID"
pagination = Trueclass Payments(IncrementalXeroStream):
"""
Payment records linking invoices to bank transactions.
Primary Key: PaymentID
Cursor Field: UpdatedDateUTC
Pagination: Enabled
"""
primary_key = "PaymentID"
pagination = Trueclass ManualJournals(IncrementalXeroStream):
"""
Manual journal entries for accounting adjustments.
Primary Key: ManualJournalID
Cursor Field: UpdatedDateUTC
Pagination: Enabled
"""
primary_key = "ManualJournalID"
pagination = Trueclass PurchaseOrders(IncrementalXeroStream):
"""
Purchase orders for tracking supplier orders and deliveries.
Primary Key: PurchaseOrderID
Cursor Field: UpdatedDateUTC
Pagination: Enabled
"""
primary_key = "PurchaseOrderID"
pagination = Trueclass Overpayments(IncrementalXeroStream):
"""
Overpayment records for excess customer payments.
Primary Key: OverpaymentID
Cursor Field: UpdatedDateUTC
Pagination: Enabled
"""
primary_key = "OverpaymentID"
pagination = Trueclass Prepayments(IncrementalXeroStream):
"""
Prepayment records for advance customer payments.
Primary Key: PrepaymentID
Cursor Field: UpdatedDateUTC
Pagination: Enabled
"""
primary_key = "PrepaymentID"
pagination = TrueBank transfers use a different cursor field than other streams.
class BankTransfers(IncrementalXeroStream):
"""
Bank transfer records between accounts.
Primary Key: BankTransferID
Cursor Field: CreatedDateUTC (overridden from default)
Pagination: Enabled
"""
primary_key = "BankTransferID"
cursor_field = "CreatedDateUTC" # Uses creation date instead of update date
pagination = TrueCore business entity streams with less frequent updates.
class Contacts(IncrementalXeroStream):
"""
Customer and supplier contact information.
Primary Key: ContactID
Cursor Field: UpdatedDateUTC
Pagination: Enabled (large datasets)
"""
primary_key = "ContactID"
pagination = Trueclass Accounts(IncrementalXeroStream):
"""
Chart of accounts for financial categorization.
Primary Key: AccountID
Cursor Field: UpdatedDateUTC
Pagination: Disabled (manageable dataset size)
"""
primary_key = "AccountID"
pagination = Falseclass Items(IncrementalXeroStream):
"""
Product and service items for invoicing.
Primary Key: ItemID
Cursor Field: UpdatedDateUTC
Pagination: Disabled (manageable dataset size)
"""
primary_key = "ItemID"
pagination = Falseclass Employees(IncrementalXeroStream):
"""
Employee records for payroll and expense management.
Primary Key: EmployeeID
Cursor Field: UpdatedDateUTC
Pagination: Enabled
"""
primary_key = "EmployeeID"
pagination = Trueclass Users(IncrementalXeroStream):
"""
Xero user accounts with system access permissions.
Primary Key: UserID
Cursor Field: UpdatedDateUTC
Pagination: Disabled (small dataset)
"""
primary_key = "UserID"
pagination = Falsefrom source_xero.streams import BankTransactions
from datetime import datetime
# Initialize stream with start date
start_date = datetime.fromisoformat("2023-01-01T00:00:00Z")
stream = BankTransactions(
tenant_id="your-tenant-id",
start_date=start_date,
authenticator=authenticator
)
# Read records with state management
stream_state = {"UpdatedDateUTC": "2023-06-01T00:00:00Z"}
records = []
for record in stream.read_records(
sync_mode=SyncMode.incremental,
stream_state=stream_state
):
records.append(record)
# Update state periodically (every 100 records)
if len(records) % 100 == 0:
stream_state = stream.get_updated_state(stream_state, record)
print(f"Synced {len(records)} records")
print(f"New state: {stream_state}")# Initial state for first sync
initial_state = {}
# State after partial sync
partial_state = {
"UpdatedDateUTC": "2023-08-15T14:30:25Z"
}
# State comparison and updates
current_state = stream.get_updated_state(
current_stream_state=partial_state,
latest_record={"UpdatedDateUTC": "2023-08-15T16:45:30Z"}
)
# Result: {"UpdatedDateUTC": "2023-08-15T16:45:30Z"}from datetime import datetime, timedelta
# Sync last 30 days of transactions
thirty_days_ago = datetime.utcnow() - timedelta(days=30)
stream = Invoices(
tenant_id="your-tenant-id",
start_date=thirty_days_ago,
authenticator=authenticator
)
# The stream will automatically use If-Modified-Since headers
# to request only records updated since the start_dateDifferent streams use different pagination strategies based on typical data volumes:
# Large dataset streams (enabled pagination)
LargeDatasetStreams = [
"BankTransactions", # High transaction volume
"Invoices", # High invoice volume
"Contacts", # Large customer bases
"Payments", # High payment volume
"CreditNotes", # Moderate volume
"ManualJournals", # Moderate volume
"PurchaseOrders", # Moderate volume
"Overpayments", # Moderate volume
"Prepayments", # Moderate volume
"BankTransfers", # Moderate volume
"Employees" # Depends on organization size
]
# Small dataset streams (disabled pagination)
SmallDatasetStreams = [
"Accounts", # Limited by chart of accounts
"Items", # Product catalog size
"Users" # System users only
]The state_checkpoint_interval of 100 records balances between:
This ensures efficient incremental sync by tracking the most recent change timestamp.
Incremental streams handle various date formats from Xero:
# Supported date formats:
# - ISO 8601: "2023-08-15T14:30:25Z"
# - .NET JSON: "/Date(1419937200000+0000)/"
# - Partial dates: "2023-08-15"If sync fails mid-stream:
Incremental streams implement rate limiting protection:
Install with Tessl CLI
npx tessl i tessl/pypi-source-xero