CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-airbyte-source-notion

Airbyte source connector for extracting data from Notion workspaces with OAuth2.0 and token authentication support.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

stream-management.mddocs/

Stream Management

Base classes and functionality for managing Notion data streams with pagination, error handling, and incremental sync capabilities.

Capabilities

Base Notion Stream

The foundational class for all Notion API streams with common functionality for rate limiting, pagination, and error handling.

class NotionStream(HttpStream, ABC):
    """
    Abstract base class for Notion API streams.
    Provides common functionality including rate limiting, pagination,
    error handling, and Notion-specific API behaviors.
    """
    
    url_base: str = "https://api.notion.com/v1/"
    primary_key: str = "id"
    page_size: int = 100
    raise_on_http_errors: bool = True
    
    def __init__(self, config: Mapping[str, Any], **kwargs):
        """
        Initializes stream with configuration and sets start_date.
        If start_date not provided, defaults to 2 years ago.
        
        Args:
            config: Stream configuration mapping
            **kwargs: Additional stream parameters
        """
    
    @property
    def availability_strategy(self) -> HttpAvailabilityStrategy:
        """Returns NotionAvailabilityStrategy for custom error handling."""
    
    @property
    def retry_factor(self) -> int:
        """Retry factor for exponential backoff (5)."""
    
    @property
    def max_retries(self) -> int:
        """Maximum number of retry attempts (7)."""
    
    @property
    def max_time(self) -> int:
        """Maximum time in seconds for retries (660)."""
    
    @staticmethod
    def check_invalid_start_cursor(response: requests.Response) -> Optional[str]:
        """
        Checks if response contains invalid start cursor error.
        
        Args:
            response: HTTP response object
            
        Returns:
            Error message if invalid cursor detected, None otherwise
        """
    
    @staticmethod
    def throttle_request_page_size(current_page_size: int) -> int:
        """
        Reduces page size for retry after 504 Gateway Timeout.
        
        Args:
            current_page_size: Current page size value
            
        Returns:
            Throttled page size (minimum 10)
        """
    
    def backoff_time(self, response: requests.Response) -> Optional[float]:
        """
        Custom backoff logic for Notion API rate limiting.
        Uses retry-after header for 429 responses (~3 req/sec limit).
        
        Args:
            response: HTTP response object
            
        Returns:
            Backoff time in seconds
        """
    
    def should_retry(self, response: requests.Response) -> bool:
        """
        Custom retry logic with page size throttling for 504 errors.
        Automatically reduces page_size on timeout and restores on success.
        
        Args:
            response: HTTP response object
            
        Returns:
            True if request should be retried
        """
    
    def request_headers(self, **kwargs) -> Mapping[str, Any]:
        """
        Adds Notion-Version header to requests.
        
        Returns:
            Headers mapping with Notion-Version: 2022-06-28
        """
    
    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
        """
        Extracts pagination token from Notion API response.
        
        Args:
            response: HTTP response object
            
        Returns:
            Next page token mapping or None if no more pages
        """
    
    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
        """
        Parses Notion API response and yields result records.
        
        Args:
            response: HTTP response object
            **kwargs: Additional parsing parameters
            
        Yields:
            Individual record mappings from results array
        """

Incremental Sync Stream

Enhanced base class for streams supporting incremental synchronization with cursor-based state management.

class IncrementalNotionStream(NotionStream, CheckpointMixin, ABC):
    """
    Base class for Notion streams with incremental sync capability.
    Implements cursor-based incremental sync with state checkpointing.
    """
    
    cursor_field: str = "last_edited_time"
    http_method: str = "POST"
    is_finished: bool = True
    
    def __init__(self, obj_type: Optional[str] = None, **kwargs):
        """
        Initializes incremental stream with optional object type filter.
        
        Args:
            obj_type: Notion object type filter ("page" or "database")
            **kwargs: Additional stream parameters
        """
    
    @property
    def state(self) -> MutableMapping[str, Any]:
        """Gets current stream state."""
    
    @state.setter
    def state(self, value: MutableMapping[str, Any]):
        """Sets stream state value."""
    
    def path(self, **kwargs) -> str:
        """
        Returns API path for search endpoint.
        
        Returns:
            "search" - Notion's search API endpoint
        """
    
    def request_body_json(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> Optional[Mapping]:
        """
        Builds JSON request body for Notion search API.
        
        Args:
            next_page_token: Pagination token for next page
            **kwargs: Additional request parameters
            
        Returns:
            Request body with sort, filter, and pagination parameters
        """
    
    def read_records(self, sync_mode: SyncMode, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping[str, Any]]:
        """
        Reads records with state management and error handling.
        Handles invalid cursor errors and updates state incrementally.
        
        Args:
            sync_mode: FULL_REFRESH or INCREMENTAL
            stream_state: Current stream state for incremental sync
            **kwargs: Additional read parameters
            
        Yields:
            Record mappings with updated state
        """
    
    def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
        """
        Parses response with state filtering for incremental sync.
        Only yields records newer than state cursor and start_date.
        
        Args:
            response: HTTP response object
            stream_state: Current stream state
            **kwargs: Additional parsing parameters
            
        Yields:
            Filtered record mappings
        """
    
    def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
        """
        Updates stream state with latest record cursor value.
        
        Args:
            current_stream_state: Current state mapping
            latest_record: Latest processed record
            
        Returns:
            Updated state mapping with new cursor value
        """

State Management Helper

Utility class for managing incremental sync state with proper cursor handling.

class StateValueWrapper(pydantic.BaseModel):
    """
    Wrapper for stream state values that handles cursor timing.
    Provides different values during sync vs after completion.
    """
    
    stream: T
    state_value: str
    max_cursor_time: Any = ""
    
    @property
    def value(self) -> str:
        """
        Returns appropriate cursor value based on stream status.
        Uses max_cursor_time when stream is finished, state_value during sync.
        
        Returns:
            Current cursor value as string
        """
    
    def dict(self, **kwargs) -> dict:
        """
        Serializes to dictionary with just the current value.
        
        Returns:
            Dictionary with root key containing current value
        """

Availability Strategy

Custom availability strategy for handling Notion-specific error responses.

class NotionAvailabilityStrategy(HttpAvailabilityStrategy):
    """
    Custom availability strategy with Notion-specific error messaging.
    Provides clearer guidance for common permission issues.
    """
    
    def reasons_for_unavailable_status_codes(self, stream: Stream, logger: Logger, source: Source, error: HTTPError) -> Dict[int, str]:
        """
        Returns custom error messages for HTTP status codes.
        
        Args:
            stream: Stream instance
            logger: Logger instance
            source: Source instance
            error: HTTP error object
            
        Returns:
            Dictionary mapping status codes to user-friendly messages
        """

Usage Examples

Basic Stream Implementation

from source_notion.streams import NotionStream

class CustomNotionStream(NotionStream):
    def path(self, **kwargs) -> str:
        return "custom-endpoint"
    
    def parse_response(self, response, **kwargs):
        for record in response.json().get("results", []):
            yield record

# Initialize stream
config = {"start_date": "2023-01-01T00:00:00.000Z"}
stream = CustomNotionStream(config=config, authenticator=authenticator)

Incremental Stream Implementation

from source_notion.streams import IncrementalNotionStream

class CustomIncrementalStream(IncrementalNotionStream):
    def __init__(self, **kwargs):
        super().__init__(obj_type="page", **kwargs)

# Use with state management
stream_state = {"last_edited_time": "2023-01-01T00:00:00.000Z"}
records = stream.read_records(
    sync_mode=SyncMode.incremental,
    stream_state=stream_state
)

Error Handling and Retry Logic

import requests
from source_notion.streams import NotionStream

# The streams automatically handle:
# - Rate limiting with retry-after headers
# - Page size throttling on 504 timeouts  
# - Invalid cursor detection and recovery
# - Notion API version headers

# Custom backoff behavior
class MyStream(NotionStream):
    def should_retry(self, response: requests.Response) -> bool:
        if response.status_code == 504:
            # Page size automatically reduced
            self.logger.info(f"Reduced page size to {self.page_size}")
        return super().should_retry(response)

State Management

from source_notion.streams import StateValueWrapper, IncrementalNotionStream

# State wrapper automatically handles cursor timing
class MyIncrementalStream(IncrementalNotionStream):
    def read_records(self, sync_mode, stream_state=None, **kwargs):
        # State wrapper ensures proper cursor values
        for record in super().read_records(sync_mode, stream_state, **kwargs):
            # State automatically updated with latest cursor
            yield record
            
# Access current state
stream = MyIncrementalStream(config=config)
current_state = stream.state  # Gets StateValueWrapper
cursor_value = current_state["last_edited_time"].value

Constants

MAX_BLOCK_DEPTH: int = 30

Maximum recursive depth for block hierarchy traversal to prevent infinite loops.

Install with Tessl CLI

npx tessl i tessl/pypi-airbyte-source-notion

docs

connector-setup.md

data-streams.md

index.md

stream-management.md

transformations.md

tile.json