0
# Transformations
1
2
Custom components for transforming Notion API responses and filtering data for efficient incremental synchronization and normalized data structures.
3
4
## Capabilities
5
6
### User Record Transformation
7
8
Transforms Notion User records of type "bot" to normalize nested owner information structure.
9
10
```python { .api }
11
class NotionUserTransformation(RecordTransformation):
12
"""
13
Custom transformation for Notion User records of type "bot".
14
Moves nested owner type data to a standardized "info" field
15
for clarity and consistency in bot user records.
16
"""
17
18
def transform(self, record: MutableMapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
19
"""
20
Transforms bot user records by normalizing owner information.
21
Moves data from owner.{owner_type} to owner.info field.
22
23
Args:
24
record: User record from Notion API
25
**kwargs: Additional transformation parameters
26
27
Returns:
28
Transformed record with normalized owner structure
29
"""
30
```
31
32
### Properties Normalization
33
34
Transforms nested properties objects in Notion pages and databases into normalized array format for easier processing.
35
36
```python { .api }
37
class NotionPropertiesTransformation(RecordTransformation):
38
"""
39
Transforms the nested 'properties' object within Notion Page/Database records.
40
Converts properties dictionary to normalized array format where each element
41
contains 'name' and 'value' keys for consistent processing.
42
"""
43
44
def transform(self, record: MutableMapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
45
"""
46
Normalizes properties object from dictionary to array format.
47
Each property becomes {name: property_name, value: property_content}.
48
49
Args:
50
record: Page or Database record from Notion API
51
**kwargs: Additional transformation parameters
52
53
Returns:
54
Record with properties transformed to array format
55
"""
56
```
57
58
### Incremental Sync Data Filter
59
60
Custom filter for optimizing incremental sync performance with cursor-based pagination by respecting state values more granularly.
61
62
```python { .api }
63
class NotionDataFeedFilter(RecordFilter):
64
"""
65
Custom filter for Data Feed endpoints with incremental sync optimization.
66
Addresses issues with Notion's cursor-based pagination where state thresholds
67
aren't properly respected, causing unnecessary record processing.
68
"""
69
70
def filter_records(self, records: List[Mapping[str, Any]], stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, **kwargs) -> List[Mapping[str, Any]]:
71
"""
72
Filters records based on cursor value compared to current state.
73
Ensures only records newer than state cursor are processed,
74
improving incremental sync efficiency.
75
76
Args:
77
records: List of records from API response
78
stream_state: Current stream state with cursor information
79
stream_slice: Optional stream slice context
80
**kwargs: Additional filter parameters
81
82
Returns:
83
Filtered list containing only records newer than state cursor
84
"""
85
86
def _get_filter_date(self, start_date: str, state_value: list) -> str:
87
"""
88
Calculates effective filter date by comparing start_date with state value.
89
Returns the most recent date to use for record filtering.
90
91
Args:
92
start_date: Configured start date for sync
93
state_value: Current state cursor value
94
95
Returns:
96
Effective filter date string for record comparison
97
"""
98
```
99
100
## Usage Examples
101
102
### User Transformation Example
103
104
```python
105
from source_notion.components import NotionUserTransformation
106
107
# Original bot user record from Notion API
108
original_user = {
109
"object": "user",
110
"id": "bot-123",
111
"type": "bot",
112
"bot": {
113
"owner": {
114
"type": "workspace",
115
"workspace": {
116
"id": "workspace-456",
117
"name": "My Workspace"
118
}
119
}
120
}
121
}
122
123
# Apply transformation
124
transformer = NotionUserTransformation()
125
transformed_user = transformer.transform(original_user)
126
127
# Result: workspace data moved to info field
128
print(transformed_user["bot"]["owner"])
129
# {
130
# "type": "workspace",
131
# "info": {
132
# "id": "workspace-456",
133
# "name": "My Workspace"
134
# }
135
# }
136
```
137
138
### Properties Transformation Example
139
140
```python
141
from source_notion.components import NotionPropertiesTransformation
142
143
# Original page/database record with nested properties
144
original_record = {
145
"id": "page-123",
146
"properties": {
147
"Title": {
148
"type": "title",
149
"title": [{"text": {"content": "My Page"}}]
150
},
151
"Status": {
152
"type": "select",
153
"select": {"name": "In Progress"}
154
},
155
"Created": {
156
"type": "created_time",
157
"created_time": "2023-01-01T00:00:00.000Z"
158
}
159
}
160
}
161
162
# Apply transformation
163
transformer = NotionPropertiesTransformation()
164
transformed_record = transformer.transform(original_record)
165
166
# Result: properties converted to array format
167
print(transformed_record["properties"])
168
# [
169
# {
170
# "name": "Title",
171
# "value": {
172
# "type": "title",
173
# "title": [{"text": {"content": "My Page"}}]
174
# }
175
# },
176
# {
177
# "name": "Status",
178
# "value": {
179
# "type": "select",
180
# "select": {"name": "In Progress"}
181
# }
182
# },
183
# {
184
# "name": "Created",
185
# "value": {
186
# "type": "created_time",
187
# "created_time": "2023-01-01T00:00:00.000Z"
188
# }
189
# }
190
# ]
191
```
192
193
### Data Feed Filter Example
194
195
```python
196
from source_notion.components import NotionDataFeedFilter
197
198
# Sample records from API response
199
records = [
200
{"id": "1", "last_edited_time": "2023-01-01T00:00:00.000Z"},
201
{"id": "2", "last_edited_time": "2023-01-02T00:00:00.000Z"},
202
{"id": "3", "last_edited_time": "2023-01-03T00:00:00.000Z"},
203
{"id": "4", "last_edited_time": "2023-01-04T00:00:00.000Z"},
204
]
205
206
# Current stream state
207
stream_state = {
208
"last_edited_time": "2023-01-02T12:00:00.000Z"
209
}
210
211
# Configuration with start date
212
config = {"start_date": "2023-01-01T00:00:00.000Z"}
213
214
# Apply filter
215
filter_component = NotionDataFeedFilter(config=config)
216
filtered_records = filter_component.filter_records(records, stream_state)
217
218
# Result: only records newer than state cursor
219
print([r["id"] for r in filtered_records])
220
# ["3", "4"] - records 1 and 2 filtered out as they're older than state
221
```
222
223
### Integration with Declarative Streams
224
225
```python
226
# These transformations are automatically applied in manifest.yaml:
227
228
# For users stream:
229
transformations:
230
- type: CustomTransformation
231
class_name: source_notion.components.NotionUserTransformation
232
233
# For databases stream:
234
transformations:
235
- type: CustomTransformation
236
class_name: source_notion.components.NotionPropertiesTransformation
237
238
# For databases stream with incremental sync:
239
record_selector:
240
type: RecordSelector
241
record_filter:
242
type: CustomRecordFilter
243
class_name: source_notion.components.NotionDataFeedFilter
244
```
245
246
### Custom Transformation Implementation
247
248
```python
249
from source_notion.components import NotionPropertiesTransformation
250
251
# Extend for custom property handling
252
class CustomPropertiesTransformation(NotionPropertiesTransformation):
253
def transform(self, record, **kwargs):
254
# Apply base transformation
255
record = super().transform(record, **kwargs)
256
257
# Add custom logic
258
for prop in record.get("properties", []):
259
if prop["name"] == "Tags" and prop["value"]["type"] == "multi_select":
260
# Flatten multi-select values
261
tags = [option["name"] for option in prop["value"]["multi_select"]]
262
prop["value"]["tag_names"] = tags
263
264
return record
265
```
266
267
### Filter Date Calculation Logic
268
269
```python
270
from source_notion.components import NotionDataFeedFilter
271
272
# Understanding filter date calculation
273
filter_component = NotionDataFeedFilter(config={"start_date": "2023-01-01T00:00:00.000Z"})
274
275
# Case 1: No state value - uses start_date
276
filter_date = filter_component._get_filter_date("2023-01-01T00:00:00.000Z", None)
277
print(filter_date) # "2023-01-01T00:00:00.000Z"
278
279
# Case 2: State value newer than start_date - uses state value
280
filter_date = filter_component._get_filter_date(
281
"2023-01-01T00:00:00.000Z",
282
"2023-01-15T00:00:00.000Z"
283
)
284
print(filter_date) # "2023-01-15T00:00:00.000Z"
285
286
# Case 3: State value older than start_date - uses start_date
287
filter_date = filter_component._get_filter_date(
288
"2023-01-15T00:00:00.000Z",
289
"2023-01-01T00:00:00.000Z"
290
)
291
print(filter_date) # "2023-01-15T00:00:00.000Z"
292
```
293
294
### Performance Benefits
295
296
```python
297
# Without NotionDataFeedFilter:
298
# - API returns 100 records per page
299
# - Only 5 records are actually newer than state
300
# - 95 records unnecessarily processed
301
302
# With NotionDataFeedFilter:
303
# - Same 100 records retrieved from API
304
# - Filter eliminates 95 outdated records at component level
305
# - Only 5 records passed to downstream processing
306
# - Significantly reduces processing overhead
307
308
# Example measurement
309
import time
310
311
records = generate_test_records(10000) # 10k records
312
stream_state = {"last_edited_time": "2023-06-01T00:00:00.000Z"}
313
314
# Without filter
315
start = time.time()
316
processed = [r for r in records if r["last_edited_time"] >= "2023-06-01T00:00:00.000Z"]
317
no_filter_time = time.time() - start
318
319
# With filter component
320
start = time.time()
321
filter_component = NotionDataFeedFilter(config={})
322
filtered = filter_component.filter_records(records, stream_state)
323
with_filter_time = time.time() - start
324
325
print(f"Performance improvement: {(no_filter_time - with_filter_time) / no_filter_time * 100:.1f}%")
326
```
327
328
## Transformation Pipeline
329
330
The transformations are applied in this order within Airbyte streams:
331
332
1. **Raw API Response** → Records from Notion API
333
2. **Custom Filters** → NotionDataFeedFilter (for incremental streams)
334
3. **Record Transformations** → NotionUserTransformation, NotionPropertiesTransformation
335
4. **Schema Validation** → Airbyte schema enforcement
336
5. **Output Records** → Final normalized records for destination
337
338
This pipeline ensures data consistency and optimal performance for both full refresh and incremental synchronization modes.