CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-source-xero

Airbyte source connector for extracting financial and accounting data from Xero's cloud-based accounting platform.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

incremental-streams.mddocs/

Incremental Data Streams

Fourteen incremental synchronization streams that efficiently sync only new or updated records since the last sync. These streams handle the majority of Xero's transactional and customer data with cursor-based incremental updates.

Capabilities

Base Incremental Stream

The foundation class for all incremental streams, providing cursor-based synchronization and state management.

class IncrementalXeroStream(XeroStream, ABC):
    """
    Abstract base class for incremental Xero data streams.
    
    Provides cursor-based incremental synchronization using Xero's
    UpdatedDateUTC field to track and sync only changed records.
    """
    
    cursor_field: str = "UpdatedDateUTC"        # Default cursor field
    state_checkpoint_interval: int = 100        # Records between state saves
    
    def __init__(self, start_date: datetime, **kwargs):
        """
        Initialize incremental stream with start date.
        
        Parameters:
        - start_date: datetime object for initial sync start point
        - **kwargs: Additional arguments passed to parent XeroStream
        """
    
    def request_headers(self, stream_state, stream_slice=None, next_page_token=None) -> Mapping[str, Any]:
        """
        Build request headers including If-Modified-Since for incremental sync.
        
        Parameters:
        - stream_state: Current sync state with cursor value
        - stream_slice: Stream partition (unused in Xero streams)
        - next_page_token: Pagination token for continued requests
        
        Returns:
        Headers mapping with If-Modified-Since header for incremental requests
        """
    
    def get_updated_state(self, current_stream_state: Mapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
        """
        Update stream state based on latest record's cursor value.
        
        Parameters:
        - current_stream_state: Current state mapping
        - latest_record: Most recent record from API response
        
        Returns:
        Updated state mapping with new cursor value
        """

Financial Transaction Streams

Core financial transaction streams for accounting data synchronization.

Bank Transactions

class BankTransactions(IncrementalXeroStream):
    """
    Bank transaction records including deposits, withdrawals, and transfers.
    
    Primary Key: BankTransactionID
    Cursor Field: UpdatedDateUTC
    Pagination: Enabled (large datasets)
    """
    
    primary_key = "BankTransactionID"
    pagination = True

Invoices

class Invoices(IncrementalXeroStream):
    """
    Sales and purchase invoices with line items and payment status.
    
    Primary Key: InvoiceID
    Cursor Field: UpdatedDateUTC
    Pagination: Enabled (large datasets)
    """
    
    primary_key = "InvoiceID"
    pagination = True

Credit Notes

class CreditNotes(IncrementalXeroStream):
    """
    Credit notes for invoice adjustments and refunds.
    
    Primary Key: CreditNoteID
    Cursor Field: UpdatedDateUTC
    Pagination: Enabled
    """
    
    primary_key = "CreditNoteID"
    pagination = True

Payments

class Payments(IncrementalXeroStream):
    """
    Payment records linking invoices to bank transactions.
    
    Primary Key: PaymentID
    Cursor Field: UpdatedDateUTC
    Pagination: Enabled
    """
    
    primary_key = "PaymentID"
    pagination = True

Manual Journals

class ManualJournals(IncrementalXeroStream):
    """
    Manual journal entries for accounting adjustments.
    
    Primary Key: ManualJournalID
    Cursor Field: UpdatedDateUTC
    Pagination: Enabled
    """
    
    primary_key = "ManualJournalID"
    pagination = True

Purchase Orders

class PurchaseOrders(IncrementalXeroStream):
    """
    Purchase orders for tracking supplier orders and deliveries.
    
    Primary Key: PurchaseOrderID
    Cursor Field: UpdatedDateUTC
    Pagination: Enabled
    """
    
    primary_key = "PurchaseOrderID"
    pagination = True

Overpayments

class Overpayments(IncrementalXeroStream):
    """
    Overpayment records for excess customer payments.
    
    Primary Key: OverpaymentID
    Cursor Field: UpdatedDateUTC
    Pagination: Enabled
    """
    
    primary_key = "OverpaymentID"
    pagination = True

Prepayments

class Prepayments(IncrementalXeroStream):
    """
    Prepayment records for advance customer payments.
    
    Primary Key: PrepaymentID
    Cursor Field: UpdatedDateUTC
    Pagination: Enabled
    """
    
    primary_key = "PrepaymentID"
    pagination = True

Special Case: Bank Transfers

Bank transfers use a different cursor field than other streams.

class BankTransfers(IncrementalXeroStream):
    """
    Bank transfer records between accounts.
    
    Primary Key: BankTransferID
    Cursor Field: CreatedDateUTC (overridden from default)
    Pagination: Enabled
    """
    
    primary_key = "BankTransferID"
    cursor_field = "CreatedDateUTC"  # Uses creation date instead of update date
    pagination = True

Master Data Streams

Core business entity streams with less frequent updates.

Contacts

class Contacts(IncrementalXeroStream):
    """
    Customer and supplier contact information.
    
    Primary Key: ContactID
    Cursor Field: UpdatedDateUTC
    Pagination: Enabled (large datasets)
    """
    
    primary_key = "ContactID"
    pagination = True

Accounts

class Accounts(IncrementalXeroStream):
    """
    Chart of accounts for financial categorization.
    
    Primary Key: AccountID
    Cursor Field: UpdatedDateUTC
    Pagination: Disabled (manageable dataset size)
    """
    
    primary_key = "AccountID"
    pagination = False

Items

class Items(IncrementalXeroStream):
    """
    Product and service items for invoicing.
    
    Primary Key: ItemID
    Cursor Field: UpdatedDateUTC
    Pagination: Disabled (manageable dataset size)
    """
    
    primary_key = "ItemID"
    pagination = False

Employees

class Employees(IncrementalXeroStream):
    """
    Employee records for payroll and expense management.
    
    Primary Key: EmployeeID
    Cursor Field: UpdatedDateUTC
    Pagination: Enabled
    """
    
    primary_key = "EmployeeID"
    pagination = True

Users

class Users(IncrementalXeroStream):
    """
    Xero user accounts with system access permissions.
    
    Primary Key: UserID
    Cursor Field: UpdatedDateUTC
    Pagination: Disabled (small dataset)
    """
    
    primary_key = "UserID"
    pagination = False

Usage Examples

Reading Incremental Stream

from source_xero.streams import BankTransactions
from datetime import datetime

# Initialize stream with start date
start_date = datetime.fromisoformat("2023-01-01T00:00:00Z")
stream = BankTransactions(
    tenant_id="your-tenant-id",
    start_date=start_date,
    authenticator=authenticator
)

# Read records with state management
stream_state = {"UpdatedDateUTC": "2023-06-01T00:00:00Z"}
records = []

for record in stream.read_records(
    sync_mode=SyncMode.incremental,
    stream_state=stream_state
):
    records.append(record)
    
    # Update state periodically (every 100 records)
    if len(records) % 100 == 0:
        stream_state = stream.get_updated_state(stream_state, record)

print(f"Synced {len(records)} records")
print(f"New state: {stream_state}")

State Management

# Initial state for first sync
initial_state = {}

# State after partial sync
partial_state = {
    "UpdatedDateUTC": "2023-08-15T14:30:25Z"
}

# State comparison and updates
current_state = stream.get_updated_state(
    current_stream_state=partial_state,
    latest_record={"UpdatedDateUTC": "2023-08-15T16:45:30Z"}
)

# Result: {"UpdatedDateUTC": "2023-08-15T16:45:30Z"}

Custom Date Range Sync

from datetime import datetime, timedelta

# Sync last 30 days of transactions
thirty_days_ago = datetime.utcnow() - timedelta(days=30)
stream = Invoices(
    tenant_id="your-tenant-id",
    start_date=thirty_days_ago,
    authenticator=authenticator
)

# The stream will automatically use If-Modified-Since headers
# to request only records updated since the start_date

Performance Considerations

Pagination Settings

Different streams use different pagination strategies based on typical data volumes:

# Large dataset streams (enabled pagination)
LargeDatasetStreams = [
    "BankTransactions",    # High transaction volume
    "Invoices",           # High invoice volume  
    "Contacts",           # Large customer bases
    "Payments",           # High payment volume
    "CreditNotes",        # Moderate volume
    "ManualJournals",     # Moderate volume
    "PurchaseOrders",     # Moderate volume
    "Overpayments",       # Moderate volume
    "Prepayments",        # Moderate volume
    "BankTransfers",      # Moderate volume
    "Employees"           # Depends on organization size
]

# Small dataset streams (disabled pagination)
SmallDatasetStreams = [
    "Accounts",           # Limited by chart of accounts
    "Items",              # Product catalog size
    "Users"               # System users only
]

State Checkpointing

The state_checkpoint_interval of 100 records balances between:

  • Performance: Reduces state update overhead
  • Reliability: Limits data re-processing on failures
  • Memory Usage: Prevents excessive state accumulation

Cursor Field Selection

  • UpdatedDateUTC: Used by 13 of 14 incremental streams
  • CreatedDateUTC: Used only by BankTransfers (immutable records)

This ensures efficient incremental sync by tracking the most recent change timestamp.

Error Handling

Date Parsing Errors

Incremental streams handle various date formats from Xero:

# Supported date formats:
# - ISO 8601: "2023-08-15T14:30:25Z"
# - .NET JSON: "/Date(1419937200000+0000)/"
# - Partial dates: "2023-08-15"

State Recovery

If sync fails mid-stream:

  1. State Preservation: Last checkpointed state is maintained
  2. Resumption: Next sync resumes from last successful cursor position
  3. Deduplication: Records are identified by primary key to prevent duplicates

API Rate Limiting

Incremental streams implement rate limiting protection:

  • Backoff Strategy: Exponential backoff for 429 responses
  • Retry Logic: Automatic retry for transient failures
  • Request Spacing: Built-in delays between paginated requests

Install with Tessl CLI

npx tessl i tessl/pypi-source-xero

docs

data-utilities.md

full-refresh-streams.md

incremental-streams.md

index.md

oauth-authentication.md

source-configuration.md

tile.json