Airbyte source connector for extracting data from Notion workspaces with OAuth2.0 and token authentication support.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Specific stream implementations for extracting different types of data from Notion workspaces, including pages and nested block content with hierarchical traversal.
Stream for extracting Notion pages from workspaces and databases with incremental synchronization support.
class Pages(IncrementalNotionStream):
"""
Stream for Notion pages with incremental sync capability.
Serves as parent stream for Blocks substream and implements
checkpointing for efficient large-scale page extraction.
"""
state_checkpoint_interval: int = 100
def __init__(self, **kwargs):
"""
Initializes Pages stream with "page" object type filter.
Configured for incremental sync with regular state checkpoints.
Args:
**kwargs: Stream configuration parameters including authenticator and config
"""Advanced substream for extracting block content from pages with recursive hierarchy traversal and depth limiting.
class Blocks(HttpSubStream, IncrementalNotionStream):
"""
Substream for extracting block content from Notion pages.
Implements recursive traversal of block hierarchies with depth limiting
and supports incremental sync based on parent page updates.
"""
http_method: str = "GET"
block_id_stack: List[str] = []
def path(self, **kwargs) -> str:
"""
Returns API path for block children endpoint.
Uses current block ID from stack for nested traversal.
Returns:
API path string: "blocks/{block_id}/children"
"""
def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
"""
Builds request parameters for block children API.
Args:
next_page_token: Pagination token for continuation
**kwargs: Additional request parameters
Returns:
Parameters dictionary with page_size and optional start_cursor
"""
def stream_slices(self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None) -> Iterable[Optional[Mapping[str, Any]]]:
"""
Generates stream slices based on parent Pages stream.
Each slice represents a page whose blocks should be extracted.
Args:
sync_mode: Sync mode (FULL_REFRESH or INCREMENTAL)
cursor_field: List of cursor field names
stream_state: Current stream state for incremental sync
Yields:
Stream slice dictionaries with page_id for block extraction
"""
def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Transforms block records to normalize mention object structure.
Moves mention type data to standardized 'info' field.
Args:
record: Raw block record from API
Returns:
Transformed record with normalized mention structure
"""
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
"""
Parses block children response and filters unsupported block types.
Excludes child_page, child_database, and ai_block types.
Args:
response: HTTP response from blocks API
stream_state: Current stream state
**kwargs: Additional parsing parameters
Yields:
Filtered and transformed block records
"""
def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:
"""
Reads block records with recursive hierarchy traversal.
Implements depth-first traversal with MAX_BLOCK_DEPTH limit.
Automatically handles nested blocks with has_children flag.
Args:
**kwargs: Read parameters including sync configuration
Yields:
Block records including nested children up to depth limit
"""
def should_retry(self, response: requests.Response) -> bool:
"""
Custom retry logic for block-specific errors.
Handles 404 errors for inaccessible blocks and 400 errors for unsupported ai_block types.
Args:
response: HTTP response object
Returns:
True if request should be retried, False to skip
"""from source_notion.streams import Pages
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
# Setup authentication
authenticator = TokenAuthenticator("your_notion_token")
# Initialize pages stream
config = {
"start_date": "2023-01-01T00:00:00.000Z"
}
pages_stream = Pages(
authenticator=authenticator,
config=config
)
# Read pages with incremental sync
from airbyte_cdk.models import SyncMode
stream_state = {"last_edited_time": "2023-06-01T00:00:00.000Z"}
for page in pages_stream.read_records(
sync_mode=SyncMode.incremental,
stream_state=stream_state
):
print(f"Page: {page['id']} - {page.get('properties', {})}")from source_notion.streams import Pages, Blocks
# Setup parent stream (Pages)
pages_stream = Pages(authenticator=authenticator, config=config)
# Initialize blocks substream
blocks_stream = Blocks(
parent=pages_stream,
authenticator=authenticator,
config=config
)
# Read blocks for all pages
for block in blocks_stream.read_records(sync_mode=SyncMode.full_refresh):
print(f"Block: {block['id']} - Type: {block['type']}")
# Check for nested structure
if block.get('has_children'):
print(f" Has children: {block['has_children']}")# The Blocks stream automatically handles recursive traversal
# Example of what happens internally:
blocks_stream = Blocks(parent=pages_stream, authenticator=authenticator, config=config)
# Stream slices come from parent pages
for slice_data in blocks_stream.stream_slices(SyncMode.full_refresh):
page_id = slice_data["page_id"]
print(f"Processing blocks for page: {page_id}")
# Blocks are read recursively up to MAX_BLOCK_DEPTH (30 levels)
for block in blocks_stream.read_records():
print(f" Block {block['id']} at depth {len(blocks_stream.block_id_stack)}")# Example of mention transformation that happens automatically
original_block = {
"type": "paragraph",
"paragraph": {
"rich_text": [{
"mention": {
"type": "user",
"user": {
"id": "user-123",
"name": "John Doe"
}
}
}]
}
}
# After transformation:
transformed_block = {
"type": "paragraph",
"paragraph": {
"rich_text": [{
"mention": {
"type": "user",
"info": { # Moved from "user" to "info"
"id": "user-123",
"name": "John Doe"
}
}
}]
}
}# Blocks stream handles various error scenarios automatically:
class CustomBlocksStream(Blocks):
def should_retry(self, response):
if response.status_code == 404:
# Block not accessible - logged and skipped
self.logger.error(f"Block not accessible: {response.json()}")
return False
elif response.status_code == 400:
error = response.json()
if "ai_block is not supported" in error.get("message", ""):
# AI blocks are unsupported - logged and skipped
self.logger.error("AI block type not supported, skipping")
return False
return super().should_retry(response)# Pages stream with checkpointing
pages_stream = Pages(authenticator=authenticator, config=config)
# State is checkpointed every 100 records (state_checkpoint_interval)
records_processed = 0
for page in pages_stream.read_records(
sync_mode=SyncMode.incremental,
stream_state={"last_edited_time": "2023-01-01T00:00:00.000Z"}
):
records_processed += 1
if records_processed % 100 == 0:
# State automatically checkpointed
current_state = pages_stream.state
print(f"Checkpointed at: {current_state}")from source_notion.streams import MAX_BLOCK_DEPTH
# Depth limiting is automatic but can be monitored
class MonitoredBlocksStream(Blocks):
def read_records(self, **kwargs):
if len(self.block_id_stack) > MAX_BLOCK_DEPTH:
self.logger.warning(f"Reached maximum depth {MAX_BLOCK_DEPTH}, stopping traversal")
return
# Continue with normal traversal
yield from super().read_records(**kwargs)# How streams are integrated in the main connector
from source_notion import SourceNotion
source = SourceNotion()
config = {
"credentials": {
"auth_type": "token",
"token": "your_token"
},
"start_date": "2023-01-01T00:00:00.000Z"
}
# Get all streams (includes Pages and Blocks)
all_streams = source.streams(config)
# Find specific streams
pages_stream = next(s for s in all_streams if s.name == "pages")
blocks_stream = next(s for s in all_streams if s.name == "blocks")
# Blocks stream is automatically configured with Pages as parent
assert blocks_stream.parent == pages_streamInstall with Tessl CLI
npx tessl i tessl/pypi-airbyte-source-notion