Airbyte source connector for extracting data from Notion workspaces with OAuth2.0 and token authentication support.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Base classes and functionality for managing Notion data streams with pagination, error handling, and incremental sync capabilities.
The foundational class for all Notion API streams with common functionality for rate limiting, pagination, and error handling.
class NotionStream(HttpStream, ABC):
"""
Abstract base class for Notion API streams.
Provides common functionality including rate limiting, pagination,
error handling, and Notion-specific API behaviors.
"""
url_base: str = "https://api.notion.com/v1/"
primary_key: str = "id"
page_size: int = 100
raise_on_http_errors: bool = True
def __init__(self, config: Mapping[str, Any], **kwargs):
"""
Initializes stream with configuration and sets start_date.
If start_date not provided, defaults to 2 years ago.
Args:
config: Stream configuration mapping
**kwargs: Additional stream parameters
"""
@property
def availability_strategy(self) -> HttpAvailabilityStrategy:
"""Returns NotionAvailabilityStrategy for custom error handling."""
@property
def retry_factor(self) -> int:
"""Retry factor for exponential backoff (5)."""
@property
def max_retries(self) -> int:
"""Maximum number of retry attempts (7)."""
@property
def max_time(self) -> int:
"""Maximum time in seconds for retries (660)."""
@staticmethod
def check_invalid_start_cursor(response: requests.Response) -> Optional[str]:
"""
Checks if response contains invalid start cursor error.
Args:
response: HTTP response object
Returns:
Error message if invalid cursor detected, None otherwise
"""
@staticmethod
def throttle_request_page_size(current_page_size: int) -> int:
"""
Reduces page size for retry after 504 Gateway Timeout.
Args:
current_page_size: Current page size value
Returns:
Throttled page size (minimum 10)
"""
def backoff_time(self, response: requests.Response) -> Optional[float]:
"""
Custom backoff logic for Notion API rate limiting.
Uses retry-after header for 429 responses (~3 req/sec limit).
Args:
response: HTTP response object
Returns:
Backoff time in seconds
"""
def should_retry(self, response: requests.Response) -> bool:
"""
Custom retry logic with page size throttling for 504 errors.
Automatically reduces page_size on timeout and restores on success.
Args:
response: HTTP response object
Returns:
True if request should be retried
"""
def request_headers(self, **kwargs) -> Mapping[str, Any]:
"""
Adds Notion-Version header to requests.
Returns:
Headers mapping with Notion-Version: 2022-06-28
"""
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""
Extracts pagination token from Notion API response.
Args:
response: HTTP response object
Returns:
Next page token mapping or None if no more pages
"""
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""
Parses Notion API response and yields result records.
Args:
response: HTTP response object
**kwargs: Additional parsing parameters
Yields:
Individual record mappings from results array
"""Enhanced base class for streams supporting incremental synchronization with cursor-based state management.
class IncrementalNotionStream(NotionStream, CheckpointMixin, ABC):
"""
Base class for Notion streams with incremental sync capability.
Implements cursor-based incremental sync with state checkpointing.
"""
cursor_field: str = "last_edited_time"
http_method: str = "POST"
is_finished: bool = True
def __init__(self, obj_type: Optional[str] = None, **kwargs):
"""
Initializes incremental stream with optional object type filter.
Args:
obj_type: Notion object type filter ("page" or "database")
**kwargs: Additional stream parameters
"""
@property
def state(self) -> MutableMapping[str, Any]:
"""Gets current stream state."""
@state.setter
def state(self, value: MutableMapping[str, Any]):
"""Sets stream state value."""
def path(self, **kwargs) -> str:
"""
Returns API path for search endpoint.
Returns:
"search" - Notion's search API endpoint
"""
def request_body_json(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> Optional[Mapping]:
"""
Builds JSON request body for Notion search API.
Args:
next_page_token: Pagination token for next page
**kwargs: Additional request parameters
Returns:
Request body with sort, filter, and pagination parameters
"""
def read_records(self, sync_mode: SyncMode, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping[str, Any]]:
"""
Reads records with state management and error handling.
Handles invalid cursor errors and updates state incrementally.
Args:
sync_mode: FULL_REFRESH or INCREMENTAL
stream_state: Current stream state for incremental sync
**kwargs: Additional read parameters
Yields:
Record mappings with updated state
"""
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
"""
Parses response with state filtering for incremental sync.
Only yields records newer than state cursor and start_date.
Args:
response: HTTP response object
stream_state: Current stream state
**kwargs: Additional parsing parameters
Yields:
Filtered record mappings
"""
def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Updates stream state with latest record cursor value.
Args:
current_stream_state: Current state mapping
latest_record: Latest processed record
Returns:
Updated state mapping with new cursor value
"""Utility class for managing incremental sync state with proper cursor handling.
class StateValueWrapper(pydantic.BaseModel):
"""
Wrapper for stream state values that handles cursor timing.
Provides different values during sync vs after completion.
"""
stream: T
state_value: str
max_cursor_time: Any = ""
@property
def value(self) -> str:
"""
Returns appropriate cursor value based on stream status.
Uses max_cursor_time when stream is finished, state_value during sync.
Returns:
Current cursor value as string
"""
def dict(self, **kwargs) -> dict:
"""
Serializes to dictionary with just the current value.
Returns:
Dictionary with root key containing current value
"""Custom availability strategy for handling Notion-specific error responses.
class NotionAvailabilityStrategy(HttpAvailabilityStrategy):
"""
Custom availability strategy with Notion-specific error messaging.
Provides clearer guidance for common permission issues.
"""
def reasons_for_unavailable_status_codes(self, stream: Stream, logger: Logger, source: Source, error: HTTPError) -> Dict[int, str]:
"""
Returns custom error messages for HTTP status codes.
Args:
stream: Stream instance
logger: Logger instance
source: Source instance
error: HTTP error object
Returns:
Dictionary mapping status codes to user-friendly messages
"""from source_notion.streams import NotionStream
class CustomNotionStream(NotionStream):
def path(self, **kwargs) -> str:
return "custom-endpoint"
def parse_response(self, response, **kwargs):
for record in response.json().get("results", []):
yield record
# Initialize stream
config = {"start_date": "2023-01-01T00:00:00.000Z"}
stream = CustomNotionStream(config=config, authenticator=authenticator)from source_notion.streams import IncrementalNotionStream
class CustomIncrementalStream(IncrementalNotionStream):
def __init__(self, **kwargs):
super().__init__(obj_type="page", **kwargs)
# Use with state management
stream_state = {"last_edited_time": "2023-01-01T00:00:00.000Z"}
records = stream.read_records(
sync_mode=SyncMode.incremental,
stream_state=stream_state
)import requests
from source_notion.streams import NotionStream
# The streams automatically handle:
# - Rate limiting with retry-after headers
# - Page size throttling on 504 timeouts
# - Invalid cursor detection and recovery
# - Notion API version headers
# Custom backoff behavior
class MyStream(NotionStream):
def should_retry(self, response: requests.Response) -> bool:
if response.status_code == 504:
# Page size automatically reduced
self.logger.info(f"Reduced page size to {self.page_size}")
return super().should_retry(response)from source_notion.streams import StateValueWrapper, IncrementalNotionStream
# State wrapper automatically handles cursor timing
class MyIncrementalStream(IncrementalNotionStream):
def read_records(self, sync_mode, stream_state=None, **kwargs):
# State wrapper ensures proper cursor values
for record in super().read_records(sync_mode, stream_state, **kwargs):
# State automatically updated with latest cursor
yield record
# Access current state
stream = MyIncrementalStream(config=config)
current_state = stream.state # Gets StateValueWrapper
cursor_value = current_state["last_edited_time"].valueMAX_BLOCK_DEPTH: int = 30Maximum recursive depth for block hierarchy traversal to prevent infinite loops.
Install with Tessl CLI
npx tessl i tessl/pypi-airbyte-source-notion