Airbyte source connector for extracting data from Notion workspaces with OAuth2.0 and token authentication support.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Custom components for transforming Notion API responses and filtering data for efficient incremental synchronization and normalized data structures.
Transforms Notion User records of type "bot" to normalize nested owner information structure.
class NotionUserTransformation(RecordTransformation):
"""
Custom transformation for Notion User records of type "bot".
Moves nested owner type data to a standardized "info" field
for clarity and consistency in bot user records.
"""
def transform(self, record: MutableMapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
"""
Transforms bot user records by normalizing owner information.
Moves data from owner.{owner_type} to owner.info field.
Args:
record: User record from Notion API
**kwargs: Additional transformation parameters
Returns:
Transformed record with normalized owner structure
"""Transforms nested properties objects in Notion pages and databases into normalized array format for easier processing.
class NotionPropertiesTransformation(RecordTransformation):
"""
Transforms the nested 'properties' object within Notion Page/Database records.
Converts properties dictionary to normalized array format where each element
contains 'name' and 'value' keys for consistent processing.
"""
def transform(self, record: MutableMapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
"""
Normalizes properties object from dictionary to array format.
Each property becomes {name: property_name, value: property_content}.
Args:
record: Page or Database record from Notion API
**kwargs: Additional transformation parameters
Returns:
Record with properties transformed to array format
"""Custom filter for optimizing incremental sync performance with cursor-based pagination by respecting state values more granularly.
class NotionDataFeedFilter(RecordFilter):
"""
Custom filter for Data Feed endpoints with incremental sync optimization.
Addresses issues with Notion's cursor-based pagination where state thresholds
aren't properly respected, causing unnecessary record processing.
"""
def filter_records(self, records: List[Mapping[str, Any]], stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, **kwargs) -> List[Mapping[str, Any]]:
"""
Filters records based on cursor value compared to current state.
Ensures only records newer than state cursor are processed,
improving incremental sync efficiency.
Args:
records: List of records from API response
stream_state: Current stream state with cursor information
stream_slice: Optional stream slice context
**kwargs: Additional filter parameters
Returns:
Filtered list containing only records newer than state cursor
"""
def _get_filter_date(self, start_date: str, state_value: list) -> str:
"""
Calculates effective filter date by comparing start_date with state value.
Returns the most recent date to use for record filtering.
Args:
start_date: Configured start date for sync
state_value: Current state cursor value
Returns:
Effective filter date string for record comparison
"""from source_notion.components import NotionUserTransformation
# Original bot user record from Notion API
original_user = {
"object": "user",
"id": "bot-123",
"type": "bot",
"bot": {
"owner": {
"type": "workspace",
"workspace": {
"id": "workspace-456",
"name": "My Workspace"
}
}
}
}
# Apply transformation
transformer = NotionUserTransformation()
transformed_user = transformer.transform(original_user)
# Result: workspace data moved to info field
print(transformed_user["bot"]["owner"])
# {
# "type": "workspace",
# "info": {
# "id": "workspace-456",
# "name": "My Workspace"
# }
# }from source_notion.components import NotionPropertiesTransformation
# Original page/database record with nested properties
original_record = {
"id": "page-123",
"properties": {
"Title": {
"type": "title",
"title": [{"text": {"content": "My Page"}}]
},
"Status": {
"type": "select",
"select": {"name": "In Progress"}
},
"Created": {
"type": "created_time",
"created_time": "2023-01-01T00:00:00.000Z"
}
}
}
# Apply transformation
transformer = NotionPropertiesTransformation()
transformed_record = transformer.transform(original_record)
# Result: properties converted to array format
print(transformed_record["properties"])
# [
# {
# "name": "Title",
# "value": {
# "type": "title",
# "title": [{"text": {"content": "My Page"}}]
# }
# },
# {
# "name": "Status",
# "value": {
# "type": "select",
# "select": {"name": "In Progress"}
# }
# },
# {
# "name": "Created",
# "value": {
# "type": "created_time",
# "created_time": "2023-01-01T00:00:00.000Z"
# }
# }
# ]from source_notion.components import NotionDataFeedFilter
# Sample records from API response
records = [
{"id": "1", "last_edited_time": "2023-01-01T00:00:00.000Z"},
{"id": "2", "last_edited_time": "2023-01-02T00:00:00.000Z"},
{"id": "3", "last_edited_time": "2023-01-03T00:00:00.000Z"},
{"id": "4", "last_edited_time": "2023-01-04T00:00:00.000Z"},
]
# Current stream state
stream_state = {
"last_edited_time": "2023-01-02T12:00:00.000Z"
}
# Configuration with start date
config = {"start_date": "2023-01-01T00:00:00.000Z"}
# Apply filter
filter_component = NotionDataFeedFilter(config=config)
filtered_records = filter_component.filter_records(records, stream_state)
# Result: only records newer than state cursor
print([r["id"] for r in filtered_records])
# ["3", "4"] - records 1 and 2 filtered out as they're older than state# These transformations are automatically applied in manifest.yaml:
# For users stream:
transformations:
- type: CustomTransformation
class_name: source_notion.components.NotionUserTransformation
# For databases stream:
transformations:
- type: CustomTransformation
class_name: source_notion.components.NotionPropertiesTransformation
# For databases stream with incremental sync:
record_selector:
type: RecordSelector
record_filter:
type: CustomRecordFilter
class_name: source_notion.components.NotionDataFeedFilterfrom source_notion.components import NotionPropertiesTransformation
# Extend for custom property handling
class CustomPropertiesTransformation(NotionPropertiesTransformation):
def transform(self, record, **kwargs):
# Apply base transformation
record = super().transform(record, **kwargs)
# Add custom logic
for prop in record.get("properties", []):
if prop["name"] == "Tags" and prop["value"]["type"] == "multi_select":
# Flatten multi-select values
tags = [option["name"] for option in prop["value"]["multi_select"]]
prop["value"]["tag_names"] = tags
return recordfrom source_notion.components import NotionDataFeedFilter
# Understanding filter date calculation
filter_component = NotionDataFeedFilter(config={"start_date": "2023-01-01T00:00:00.000Z"})
# Case 1: No state value - uses start_date
filter_date = filter_component._get_filter_date("2023-01-01T00:00:00.000Z", None)
print(filter_date) # "2023-01-01T00:00:00.000Z"
# Case 2: State value newer than start_date - uses state value
filter_date = filter_component._get_filter_date(
"2023-01-01T00:00:00.000Z",
"2023-01-15T00:00:00.000Z"
)
print(filter_date) # "2023-01-15T00:00:00.000Z"
# Case 3: State value older than start_date - uses start_date
filter_date = filter_component._get_filter_date(
"2023-01-15T00:00:00.000Z",
"2023-01-01T00:00:00.000Z"
)
print(filter_date) # "2023-01-15T00:00:00.000Z"# Without NotionDataFeedFilter:
# - API returns 100 records per page
# - Only 5 records are actually newer than state
# - 95 records unnecessarily processed
# With NotionDataFeedFilter:
# - Same 100 records retrieved from API
# - Filter eliminates 95 outdated records at component level
# - Only 5 records passed to downstream processing
# - Significantly reduces processing overhead
# Example measurement
import time
records = generate_test_records(10000) # 10k records
stream_state = {"last_edited_time": "2023-06-01T00:00:00.000Z"}
# Without filter
start = time.time()
processed = [r for r in records if r["last_edited_time"] >= "2023-06-01T00:00:00.000Z"]
no_filter_time = time.time() - start
# With filter component
start = time.time()
filter_component = NotionDataFeedFilter(config={})
filtered = filter_component.filter_records(records, stream_state)
with_filter_time = time.time() - start
print(f"Performance improvement: {(no_filter_time - with_filter_time) / no_filter_time * 100:.1f}%")The transformations are applied in this order within Airbyte streams:
This pipeline ensures data consistency and optimal performance for both full refresh and incremental synchronization modes.
Install with Tessl CLI
npx tessl i tessl/pypi-airbyte-source-notion