CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-airbyte-source-notion

Airbyte source connector for extracting data from Notion workspaces with OAuth2.0 and token authentication support.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

data-streams.mddocs/

Data Streams

Specific stream implementations for extracting different types of data from Notion workspaces, including pages and nested block content with hierarchical traversal.

Capabilities

Pages Stream

Stream for extracting Notion pages from workspaces and databases with incremental synchronization support.

class Pages(IncrementalNotionStream):
    """
    Stream for Notion pages with incremental sync capability.
    Serves as parent stream for Blocks substream and implements
    checkpointing for efficient large-scale page extraction.
    """
    
    state_checkpoint_interval: int = 100
    
    def __init__(self, **kwargs):
        """
        Initializes Pages stream with "page" object type filter.
        Configured for incremental sync with regular state checkpoints.
        
        Args:
            **kwargs: Stream configuration parameters including authenticator and config
        """

Blocks Stream

Advanced substream for extracting block content from pages with recursive hierarchy traversal and depth limiting.

class Blocks(HttpSubStream, IncrementalNotionStream):
    """
    Substream for extracting block content from Notion pages.
    Implements recursive traversal of block hierarchies with depth limiting
    and supports incremental sync based on parent page updates.
    """
    
    http_method: str = "GET"
    block_id_stack: List[str] = []
    
    def path(self, **kwargs) -> str:
        """
        Returns API path for block children endpoint.
        Uses current block ID from stack for nested traversal.
        
        Returns:
            API path string: "blocks/{block_id}/children"
        """
    
    def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
        """
        Builds request parameters for block children API.
        
        Args:
            next_page_token: Pagination token for continuation
            **kwargs: Additional request parameters
            
        Returns:
            Parameters dictionary with page_size and optional start_cursor
        """
    
    def stream_slices(self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None) -> Iterable[Optional[Mapping[str, Any]]]:
        """
        Generates stream slices based on parent Pages stream.
        Each slice represents a page whose blocks should be extracted.
        
        Args:
            sync_mode: Sync mode (FULL_REFRESH or INCREMENTAL)
            cursor_field: List of cursor field names
            stream_state: Current stream state for incremental sync
            
        Yields:
            Stream slice dictionaries with page_id for block extraction
        """
    
    def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]:
        """
        Transforms block records to normalize mention object structure.
        Moves mention type data to standardized 'info' field.
        
        Args:
            record: Raw block record from API
            
        Returns:
            Transformed record with normalized mention structure
        """
    
    def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
        """
        Parses block children response and filters unsupported block types.
        Excludes child_page, child_database, and ai_block types.
        
        Args:
            response: HTTP response from blocks API
            stream_state: Current stream state
            **kwargs: Additional parsing parameters
            
        Yields:
            Filtered and transformed block records
        """
    
    def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:
        """
        Reads block records with recursive hierarchy traversal.
        Implements depth-first traversal with MAX_BLOCK_DEPTH limit.
        Automatically handles nested blocks with has_children flag.
        
        Args:
            **kwargs: Read parameters including sync configuration
            
        Yields:
            Block records including nested children up to depth limit
        """
    
    def should_retry(self, response: requests.Response) -> bool:
        """
        Custom retry logic for block-specific errors.
        Handles 404 errors for inaccessible blocks and 400 errors for unsupported ai_block types.
        
        Args:
            response: HTTP response object
            
        Returns:
            True if request should be retried, False to skip
        """

Usage Examples

Basic Pages Stream Usage

from source_notion.streams import Pages
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator

# Setup authentication
authenticator = TokenAuthenticator("your_notion_token")

# Initialize pages stream
config = {
    "start_date": "2023-01-01T00:00:00.000Z"
}

pages_stream = Pages(
    authenticator=authenticator,
    config=config
)

# Read pages with incremental sync
from airbyte_cdk.models import SyncMode

stream_state = {"last_edited_time": "2023-06-01T00:00:00.000Z"}
for page in pages_stream.read_records(
    sync_mode=SyncMode.incremental,
    stream_state=stream_state
):
    print(f"Page: {page['id']} - {page.get('properties', {})}")

Blocks Stream with Parent Dependency

from source_notion.streams import Pages, Blocks

# Setup parent stream (Pages)
pages_stream = Pages(authenticator=authenticator, config=config)

# Initialize blocks substream
blocks_stream = Blocks(
    parent=pages_stream,
    authenticator=authenticator,
    config=config
)

# Read blocks for all pages
for block in blocks_stream.read_records(sync_mode=SyncMode.full_refresh):
    print(f"Block: {block['id']} - Type: {block['type']}")
    
    # Check for nested structure
    if block.get('has_children'):
        print(f"  Has children: {block['has_children']}")

Recursive Block Traversal

# The Blocks stream automatically handles recursive traversal
# Example of what happens internally:

blocks_stream = Blocks(parent=pages_stream, authenticator=authenticator, config=config)

# Stream slices come from parent pages
for slice_data in blocks_stream.stream_slices(SyncMode.full_refresh):
    page_id = slice_data["page_id"]
    print(f"Processing blocks for page: {page_id}")
    
    # Blocks are read recursively up to MAX_BLOCK_DEPTH (30 levels)
    for block in blocks_stream.read_records():
        print(f"  Block {block['id']} at depth {len(blocks_stream.block_id_stack)}")

Handling Block Transformations

# Example of mention transformation that happens automatically
original_block = {
    "type": "paragraph",
    "paragraph": {
        "rich_text": [{
            "mention": {
                "type": "user",
                "user": {
                    "id": "user-123",
                    "name": "John Doe"
                }
            }
        }]
    }
}

# After transformation:
transformed_block = {
    "type": "paragraph", 
    "paragraph": {
        "rich_text": [{
            "mention": {
                "type": "user",
                "info": {  # Moved from "user" to "info"
                    "id": "user-123",
                    "name": "John Doe"
                }
            }
        }]
    }
}

Error Handling for Blocks

# Blocks stream handles various error scenarios automatically:

class CustomBlocksStream(Blocks):
    def should_retry(self, response):
        if response.status_code == 404:
            # Block not accessible - logged and skipped
            self.logger.error(f"Block not accessible: {response.json()}")
            return False
        elif response.status_code == 400:
            error = response.json()
            if "ai_block is not supported" in error.get("message", ""):
                # AI blocks are unsupported - logged and skipped
                self.logger.error("AI block type not supported, skipping")
                return False
        
        return super().should_retry(response)

State Management in Incremental Sync

# Pages stream with checkpointing
pages_stream = Pages(authenticator=authenticator, config=config)

# State is checkpointed every 100 records (state_checkpoint_interval)
records_processed = 0
for page in pages_stream.read_records(
    sync_mode=SyncMode.incremental,
    stream_state={"last_edited_time": "2023-01-01T00:00:00.000Z"}
):
    records_processed += 1
    if records_processed % 100 == 0:
        # State automatically checkpointed
        current_state = pages_stream.state
        print(f"Checkpointed at: {current_state}")

Block Hierarchy Depth Control

from source_notion.streams import MAX_BLOCK_DEPTH

# Depth limiting is automatic but can be monitored
class MonitoredBlocksStream(Blocks):
    def read_records(self, **kwargs):
        if len(self.block_id_stack) > MAX_BLOCK_DEPTH:
            self.logger.warning(f"Reached maximum depth {MAX_BLOCK_DEPTH}, stopping traversal")
            return
        
        # Continue with normal traversal
        yield from super().read_records(**kwargs)

Stream Integration with SourceNotion

# How streams are integrated in the main connector
from source_notion import SourceNotion

source = SourceNotion()
config = {
    "credentials": {
        "auth_type": "token",
        "token": "your_token"
    },
    "start_date": "2023-01-01T00:00:00.000Z"
}

# Get all streams (includes Pages and Blocks)
all_streams = source.streams(config)

# Find specific streams
pages_stream = next(s for s in all_streams if s.name == "pages")
blocks_stream = next(s for s in all_streams if s.name == "blocks")

# Blocks stream is automatically configured with Pages as parent
assert blocks_stream.parent == pages_stream

Stream Dependencies

  • Blocks stream depends on Pages stream as its parent
  • Pages must be read first to provide page IDs for block extraction
  • Block hierarchy traversal maintains parent-child relationships
  • State synchronization ensures consistent incremental updates across related streams

Install with Tessl CLI

npx tessl i tessl/pypi-airbyte-source-notion

docs

connector-setup.md

data-streams.md

index.md

stream-management.md

transformations.md

tile.json