CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-airbyte-source-notion

Airbyte source connector for extracting data from Notion workspaces with OAuth2.0 and token authentication support.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

transformations.mddocs/

Transformations

Custom components for transforming Notion API responses and filtering data for efficient incremental synchronization and normalized data structures.

Capabilities

User Record Transformation

Transforms Notion User records of type "bot" to normalize nested owner information structure.

class NotionUserTransformation(RecordTransformation):
    """
    Custom transformation for Notion User records of type "bot".
    Moves nested owner type data to a standardized "info" field
    for clarity and consistency in bot user records.
    """
    
    def transform(self, record: MutableMapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
        """
        Transforms bot user records by normalizing owner information.
        Moves data from owner.{owner_type} to owner.info field.
        
        Args:
            record: User record from Notion API
            **kwargs: Additional transformation parameters
            
        Returns:
            Transformed record with normalized owner structure
        """

Properties Normalization

Transforms nested properties objects in Notion pages and databases into normalized array format for easier processing.

class NotionPropertiesTransformation(RecordTransformation):
    """
    Transforms the nested 'properties' object within Notion Page/Database records.
    Converts properties dictionary to normalized array format where each element
    contains 'name' and 'value' keys for consistent processing.
    """
    
    def transform(self, record: MutableMapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
        """
        Normalizes properties object from dictionary to array format.
        Each property becomes {name: property_name, value: property_content}.
        
        Args:
            record: Page or Database record from Notion API
            **kwargs: Additional transformation parameters
            
        Returns:
            Record with properties transformed to array format
        """

Incremental Sync Data Filter

Custom filter for optimizing incremental sync performance with cursor-based pagination by respecting state values more granularly.

class NotionDataFeedFilter(RecordFilter):
    """
    Custom filter for Data Feed endpoints with incremental sync optimization.
    Addresses issues with Notion's cursor-based pagination where state thresholds
    aren't properly respected, causing unnecessary record processing.
    """
    
    def filter_records(self, records: List[Mapping[str, Any]], stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, **kwargs) -> List[Mapping[str, Any]]:
        """
        Filters records based on cursor value compared to current state.
        Ensures only records newer than state cursor are processed,
        improving incremental sync efficiency.
        
        Args:
            records: List of records from API response
            stream_state: Current stream state with cursor information
            stream_slice: Optional stream slice context
            **kwargs: Additional filter parameters
            
        Returns:
            Filtered list containing only records newer than state cursor
        """
    
    def _get_filter_date(self, start_date: str, state_value: list) -> str:
        """
        Calculates effective filter date by comparing start_date with state value.
        Returns the most recent date to use for record filtering.
        
        Args:
            start_date: Configured start date for sync
            state_value: Current state cursor value
            
        Returns:
            Effective filter date string for record comparison
        """

Usage Examples

User Transformation Example

from source_notion.components import NotionUserTransformation

# Original bot user record from Notion API
original_user = {
    "object": "user",
    "id": "bot-123",
    "type": "bot",
    "bot": {
        "owner": {
            "type": "workspace",
            "workspace": {
                "id": "workspace-456",
                "name": "My Workspace"
            }
        }
    }
}

# Apply transformation
transformer = NotionUserTransformation()
transformed_user = transformer.transform(original_user)

# Result: workspace data moved to info field
print(transformed_user["bot"]["owner"])
# {
#     "type": "workspace",
#     "info": {
#         "id": "workspace-456", 
#         "name": "My Workspace"
#     }
# }

Properties Transformation Example

from source_notion.components import NotionPropertiesTransformation

# Original page/database record with nested properties
original_record = {
    "id": "page-123",
    "properties": {
        "Title": {
            "type": "title",
            "title": [{"text": {"content": "My Page"}}]
        },
        "Status": {
            "type": "select", 
            "select": {"name": "In Progress"}
        },
        "Created": {
            "type": "created_time",
            "created_time": "2023-01-01T00:00:00.000Z"
        }
    }
}

# Apply transformation
transformer = NotionPropertiesTransformation()
transformed_record = transformer.transform(original_record)

# Result: properties converted to array format
print(transformed_record["properties"])
# [
#     {
#         "name": "Title",
#         "value": {
#             "type": "title",
#             "title": [{"text": {"content": "My Page"}}]
#         }
#     },
#     {
#         "name": "Status", 
#         "value": {
#             "type": "select",
#             "select": {"name": "In Progress"}
#         }
#     },
#     {
#         "name": "Created",
#         "value": {
#             "type": "created_time", 
#             "created_time": "2023-01-01T00:00:00.000Z"
#         }
#     }
# ]

Data Feed Filter Example

from source_notion.components import NotionDataFeedFilter

# Sample records from API response
records = [
    {"id": "1", "last_edited_time": "2023-01-01T00:00:00.000Z"},
    {"id": "2", "last_edited_time": "2023-01-02T00:00:00.000Z"},
    {"id": "3", "last_edited_time": "2023-01-03T00:00:00.000Z"},
    {"id": "4", "last_edited_time": "2023-01-04T00:00:00.000Z"},
]

# Current stream state
stream_state = {
    "last_edited_time": "2023-01-02T12:00:00.000Z"
}

# Configuration with start date
config = {"start_date": "2023-01-01T00:00:00.000Z"}

# Apply filter
filter_component = NotionDataFeedFilter(config=config)
filtered_records = filter_component.filter_records(records, stream_state)

# Result: only records newer than state cursor
print([r["id"] for r in filtered_records])
# ["3", "4"] - records 1 and 2 filtered out as they're older than state

Integration with Declarative Streams

# These transformations are automatically applied in manifest.yaml:

# For users stream:
transformations:
  - type: CustomTransformation
    class_name: source_notion.components.NotionUserTransformation

# For databases stream:  
transformations:
  - type: CustomTransformation
    class_name: source_notion.components.NotionPropertiesTransformation

# For databases stream with incremental sync:
record_selector:
  type: RecordSelector
  record_filter:
    type: CustomRecordFilter
    class_name: source_notion.components.NotionDataFeedFilter

Custom Transformation Implementation

from source_notion.components import NotionPropertiesTransformation

# Extend for custom property handling
class CustomPropertiesTransformation(NotionPropertiesTransformation):
    def transform(self, record, **kwargs):
        # Apply base transformation
        record = super().transform(record, **kwargs)
        
        # Add custom logic
        for prop in record.get("properties", []):
            if prop["name"] == "Tags" and prop["value"]["type"] == "multi_select":
                # Flatten multi-select values
                tags = [option["name"] for option in prop["value"]["multi_select"]]
                prop["value"]["tag_names"] = tags
        
        return record

Filter Date Calculation Logic

from source_notion.components import NotionDataFeedFilter

# Understanding filter date calculation
filter_component = NotionDataFeedFilter(config={"start_date": "2023-01-01T00:00:00.000Z"})

# Case 1: No state value - uses start_date
filter_date = filter_component._get_filter_date("2023-01-01T00:00:00.000Z", None)
print(filter_date)  # "2023-01-01T00:00:00.000Z"

# Case 2: State value newer than start_date - uses state value
filter_date = filter_component._get_filter_date(
    "2023-01-01T00:00:00.000Z", 
    "2023-01-15T00:00:00.000Z"
)  
print(filter_date)  # "2023-01-15T00:00:00.000Z"

# Case 3: State value older than start_date - uses start_date
filter_date = filter_component._get_filter_date(
    "2023-01-15T00:00:00.000Z",
    "2023-01-01T00:00:00.000Z"
)
print(filter_date)  # "2023-01-15T00:00:00.000Z"

Performance Benefits

# Without NotionDataFeedFilter:
# - API returns 100 records per page
# - Only 5 records are actually newer than state
# - 95 records unnecessarily processed

# With NotionDataFeedFilter:
# - Same 100 records retrieved from API
# - Filter eliminates 95 outdated records at component level
# - Only 5 records passed to downstream processing
# - Significantly reduces processing overhead

# Example measurement
import time

records = generate_test_records(10000)  # 10k records
stream_state = {"last_edited_time": "2023-06-01T00:00:00.000Z"}

# Without filter
start = time.time()
processed = [r for r in records if r["last_edited_time"] >= "2023-06-01T00:00:00.000Z"]
no_filter_time = time.time() - start

# With filter component
start = time.time() 
filter_component = NotionDataFeedFilter(config={})
filtered = filter_component.filter_records(records, stream_state)
with_filter_time = time.time() - start

print(f"Performance improvement: {(no_filter_time - with_filter_time) / no_filter_time * 100:.1f}%")

Transformation Pipeline

The transformations are applied in this order within Airbyte streams:

  1. Raw API Response → Records from Notion API
  2. Custom Filters → NotionDataFeedFilter (for incremental streams)
  3. Record Transformations → NotionUserTransformation, NotionPropertiesTransformation
  4. Schema Validation → Airbyte schema enforcement
  5. Output Records → Final normalized records for destination

This pipeline ensures data consistency and optimal performance for both full refresh and incremental synchronization modes.

Install with Tessl CLI

npx tessl i tessl/pypi-airbyte-source-notion

docs

connector-setup.md

data-streams.md

index.md

stream-management.md

transformations.md

tile.json