0
# Data Streams
1
2
Specific stream implementations for extracting different types of data from Notion workspaces, including pages and nested block content with hierarchical traversal.
3
4
## Capabilities
5
6
### Pages Stream
7
8
Stream for extracting Notion pages from workspaces and databases with incremental synchronization support.
9
10
```python { .api }
11
class Pages(IncrementalNotionStream):
12
"""
13
Stream for Notion pages with incremental sync capability.
14
Serves as parent stream for Blocks substream and implements
15
checkpointing for efficient large-scale page extraction.
16
"""
17
18
state_checkpoint_interval: int = 100
19
20
def __init__(self, **kwargs):
21
"""
22
Initializes Pages stream with "page" object type filter.
23
Configured for incremental sync with regular state checkpoints.
24
25
Args:
26
**kwargs: Stream configuration parameters including authenticator and config
27
"""
28
```
29
30
### Blocks Stream
31
32
Advanced substream for extracting block content from pages with recursive hierarchy traversal and depth limiting.
33
34
```python { .api }
35
class Blocks(HttpSubStream, IncrementalNotionStream):
36
"""
37
Substream for extracting block content from Notion pages.
38
Implements recursive traversal of block hierarchies with depth limiting
39
and supports incremental sync based on parent page updates.
40
"""
41
42
http_method: str = "GET"
43
block_id_stack: List[str] = []
44
45
def path(self, **kwargs) -> str:
46
"""
47
Returns API path for block children endpoint.
48
Uses current block ID from stack for nested traversal.
49
50
Returns:
51
API path string: "blocks/{block_id}/children"
52
"""
53
54
def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
55
"""
56
Builds request parameters for block children API.
57
58
Args:
59
next_page_token: Pagination token for continuation
60
**kwargs: Additional request parameters
61
62
Returns:
63
Parameters dictionary with page_size and optional start_cursor
64
"""
65
66
def stream_slices(self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None) -> Iterable[Optional[Mapping[str, Any]]]:
67
"""
68
Generates stream slices based on parent Pages stream.
69
Each slice represents a page whose blocks should be extracted.
70
71
Args:
72
sync_mode: Sync mode (FULL_REFRESH or INCREMENTAL)
73
cursor_field: List of cursor field names
74
stream_state: Current stream state for incremental sync
75
76
Yields:
77
Stream slice dictionaries with page_id for block extraction
78
"""
79
80
def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]:
81
"""
82
Transforms block records to normalize mention object structure.
83
Moves mention type data to standardized 'info' field.
84
85
Args:
86
record: Raw block record from API
87
88
Returns:
89
Transformed record with normalized mention structure
90
"""
91
92
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
93
"""
94
Parses block children response and filters unsupported block types.
95
Excludes child_page, child_database, and ai_block types.
96
97
Args:
98
response: HTTP response from blocks API
99
stream_state: Current stream state
100
**kwargs: Additional parsing parameters
101
102
Yields:
103
Filtered and transformed block records
104
"""
105
106
def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:
107
"""
108
Reads block records with recursive hierarchy traversal.
109
Implements depth-first traversal with MAX_BLOCK_DEPTH limit.
110
Automatically handles nested blocks with has_children flag.
111
112
Args:
113
**kwargs: Read parameters including sync configuration
114
115
Yields:
116
Block records including nested children up to depth limit
117
"""
118
119
def should_retry(self, response: requests.Response) -> bool:
120
"""
121
Custom retry logic for block-specific errors.
122
Handles 404 errors for inaccessible blocks and 400 errors for unsupported ai_block types.
123
124
Args:
125
response: HTTP response object
126
127
Returns:
128
True if request should be retried, False to skip
129
"""
130
```
131
132
## Usage Examples
133
134
### Basic Pages Stream Usage
135
136
```python
137
from source_notion.streams import Pages
138
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
139
140
# Setup authentication
141
authenticator = TokenAuthenticator("your_notion_token")
142
143
# Initialize pages stream
144
config = {
145
"start_date": "2023-01-01T00:00:00.000Z"
146
}
147
148
pages_stream = Pages(
149
authenticator=authenticator,
150
config=config
151
)
152
153
# Read pages with incremental sync
154
from airbyte_cdk.models import SyncMode
155
156
stream_state = {"last_edited_time": "2023-06-01T00:00:00.000Z"}
157
for page in pages_stream.read_records(
158
sync_mode=SyncMode.incremental,
159
stream_state=stream_state
160
):
161
print(f"Page: {page['id']} - {page.get('properties', {})}")
162
```
163
164
### Blocks Stream with Parent Dependency
165
166
```python
167
from source_notion.streams import Pages, Blocks
168
169
# Setup parent stream (Pages)
170
pages_stream = Pages(authenticator=authenticator, config=config)
171
172
# Initialize blocks substream
173
blocks_stream = Blocks(
174
parent=pages_stream,
175
authenticator=authenticator,
176
config=config
177
)
178
179
# Read blocks for all pages
180
for block in blocks_stream.read_records(sync_mode=SyncMode.full_refresh):
181
print(f"Block: {block['id']} - Type: {block['type']}")
182
183
# Check for nested structure
184
if block.get('has_children'):
185
print(f" Has children: {block['has_children']}")
186
```
187
188
### Recursive Block Traversal
189
190
```python
191
# The Blocks stream automatically handles recursive traversal
192
# Example of what happens internally:
193
194
blocks_stream = Blocks(parent=pages_stream, authenticator=authenticator, config=config)
195
196
# Stream slices come from parent pages
197
for slice_data in blocks_stream.stream_slices(SyncMode.full_refresh):
198
page_id = slice_data["page_id"]
199
print(f"Processing blocks for page: {page_id}")
200
201
# Blocks are read recursively up to MAX_BLOCK_DEPTH (30 levels)
202
for block in blocks_stream.read_records():
203
print(f" Block {block['id']} at depth {len(blocks_stream.block_id_stack)}")
204
```
205
206
### Handling Block Transformations
207
208
```python
209
# Example of mention transformation that happens automatically
210
original_block = {
211
"type": "paragraph",
212
"paragraph": {
213
"rich_text": [{
214
"mention": {
215
"type": "user",
216
"user": {
217
"id": "user-123",
218
"name": "John Doe"
219
}
220
}
221
}]
222
}
223
}
224
225
# After transformation:
226
transformed_block = {
227
"type": "paragraph",
228
"paragraph": {
229
"rich_text": [{
230
"mention": {
231
"type": "user",
232
"info": { # Moved from "user" to "info"
233
"id": "user-123",
234
"name": "John Doe"
235
}
236
}
237
}]
238
}
239
}
240
```
241
242
### Error Handling for Blocks
243
244
```python
245
# Blocks stream handles various error scenarios automatically:
246
247
class CustomBlocksStream(Blocks):
248
def should_retry(self, response):
249
if response.status_code == 404:
250
# Block not accessible - logged and skipped
251
self.logger.error(f"Block not accessible: {response.json()}")
252
return False
253
elif response.status_code == 400:
254
error = response.json()
255
if "ai_block is not supported" in error.get("message", ""):
256
# AI blocks are unsupported - logged and skipped
257
self.logger.error("AI block type not supported, skipping")
258
return False
259
260
return super().should_retry(response)
261
```
262
263
### State Management in Incremental Sync
264
265
```python
266
# Pages stream with checkpointing
267
pages_stream = Pages(authenticator=authenticator, config=config)
268
269
# State is checkpointed every 100 records (state_checkpoint_interval)
270
records_processed = 0
271
for page in pages_stream.read_records(
272
sync_mode=SyncMode.incremental,
273
stream_state={"last_edited_time": "2023-01-01T00:00:00.000Z"}
274
):
275
records_processed += 1
276
if records_processed % 100 == 0:
277
# State automatically checkpointed
278
current_state = pages_stream.state
279
print(f"Checkpointed at: {current_state}")
280
```
281
282
### Block Hierarchy Depth Control
283
284
```python
285
from source_notion.streams import MAX_BLOCK_DEPTH
286
287
# Depth limiting is automatic but can be monitored
288
class MonitoredBlocksStream(Blocks):
289
def read_records(self, **kwargs):
290
if len(self.block_id_stack) > MAX_BLOCK_DEPTH:
291
self.logger.warning(f"Reached maximum depth {MAX_BLOCK_DEPTH}, stopping traversal")
292
return
293
294
# Continue with normal traversal
295
yield from super().read_records(**kwargs)
296
```
297
298
### Stream Integration with SourceNotion
299
300
```python
301
# How streams are integrated in the main connector
302
from source_notion import SourceNotion
303
304
source = SourceNotion()
305
config = {
306
"credentials": {
307
"auth_type": "token",
308
"token": "your_token"
309
},
310
"start_date": "2023-01-01T00:00:00.000Z"
311
}
312
313
# Get all streams (includes Pages and Blocks)
314
all_streams = source.streams(config)
315
316
# Find specific streams
317
pages_stream = next(s for s in all_streams if s.name == "pages")
318
blocks_stream = next(s for s in all_streams if s.name == "blocks")
319
320
# Blocks stream is automatically configured with Pages as parent
321
assert blocks_stream.parent == pages_stream
322
```
323
324
## Stream Dependencies
325
326
- **Blocks** stream depends on **Pages** stream as its parent
327
- Pages must be read first to provide page IDs for block extraction
328
- Block hierarchy traversal maintains parent-child relationships
329
- State synchronization ensures consistent incremental updates across related streams