0
# Stream Management
1
2
Base classes and functionality for managing Notion data streams with pagination, error handling, and incremental sync capabilities.
3
4
## Capabilities
5
6
### Base Notion Stream
7
8
The foundational class for all Notion API streams with common functionality for rate limiting, pagination, and error handling.
9
10
```python { .api }
11
class NotionStream(HttpStream, ABC):
12
"""
13
Abstract base class for Notion API streams.
14
Provides common functionality including rate limiting, pagination,
15
error handling, and Notion-specific API behaviors.
16
"""
17
18
url_base: str = "https://api.notion.com/v1/"
19
primary_key: str = "id"
20
page_size: int = 100
21
raise_on_http_errors: bool = True
22
23
def __init__(self, config: Mapping[str, Any], **kwargs):
24
"""
25
Initializes stream with configuration and sets start_date.
26
If start_date not provided, defaults to 2 years ago.
27
28
Args:
29
config: Stream configuration mapping
30
**kwargs: Additional stream parameters
31
"""
32
33
@property
34
def availability_strategy(self) -> HttpAvailabilityStrategy:
35
"""Returns NotionAvailabilityStrategy for custom error handling."""
36
37
@property
38
def retry_factor(self) -> int:
39
"""Retry factor for exponential backoff (5)."""
40
41
@property
42
def max_retries(self) -> int:
43
"""Maximum number of retry attempts (7)."""
44
45
@property
46
def max_time(self) -> int:
47
"""Maximum time in seconds for retries (660)."""
48
49
@staticmethod
50
def check_invalid_start_cursor(response: requests.Response) -> Optional[str]:
51
"""
52
Checks if response contains invalid start cursor error.
53
54
Args:
55
response: HTTP response object
56
57
Returns:
58
Error message if invalid cursor detected, None otherwise
59
"""
60
61
@staticmethod
62
def throttle_request_page_size(current_page_size: int) -> int:
63
"""
64
Reduces page size for retry after 504 Gateway Timeout.
65
66
Args:
67
current_page_size: Current page size value
68
69
Returns:
70
Throttled page size (minimum 10)
71
"""
72
73
def backoff_time(self, response: requests.Response) -> Optional[float]:
74
"""
75
Custom backoff logic for Notion API rate limiting.
76
Uses retry-after header for 429 responses (~3 req/sec limit).
77
78
Args:
79
response: HTTP response object
80
81
Returns:
82
Backoff time in seconds
83
"""
84
85
def should_retry(self, response: requests.Response) -> bool:
86
"""
87
Custom retry logic with page size throttling for 504 errors.
88
Automatically reduces page_size on timeout and restores on success.
89
90
Args:
91
response: HTTP response object
92
93
Returns:
94
True if request should be retried
95
"""
96
97
def request_headers(self, **kwargs) -> Mapping[str, Any]:
98
"""
99
Adds Notion-Version header to requests.
100
101
Returns:
102
Headers mapping with Notion-Version: 2022-06-28
103
"""
104
105
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
106
"""
107
Extracts pagination token from Notion API response.
108
109
Args:
110
response: HTTP response object
111
112
Returns:
113
Next page token mapping or None if no more pages
114
"""
115
116
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
117
"""
118
Parses Notion API response and yields result records.
119
120
Args:
121
response: HTTP response object
122
**kwargs: Additional parsing parameters
123
124
Yields:
125
Individual record mappings from results array
126
"""
127
```
128
129
### Incremental Sync Stream
130
131
Enhanced base class for streams supporting incremental synchronization with cursor-based state management.
132
133
```python { .api }
134
class IncrementalNotionStream(NotionStream, CheckpointMixin, ABC):
135
"""
136
Base class for Notion streams with incremental sync capability.
137
Implements cursor-based incremental sync with state checkpointing.
138
"""
139
140
cursor_field: str = "last_edited_time"
141
http_method: str = "POST"
142
is_finished: bool = True
143
144
def __init__(self, obj_type: Optional[str] = None, **kwargs):
145
"""
146
Initializes incremental stream with optional object type filter.
147
148
Args:
149
obj_type: Notion object type filter ("page" or "database")
150
**kwargs: Additional stream parameters
151
"""
152
153
@property
154
def state(self) -> MutableMapping[str, Any]:
155
"""Gets current stream state."""
156
157
@state.setter
158
def state(self, value: MutableMapping[str, Any]):
159
"""Sets stream state value."""
160
161
def path(self, **kwargs) -> str:
162
"""
163
Returns API path for search endpoint.
164
165
Returns:
166
"search" - Notion's search API endpoint
167
"""
168
169
def request_body_json(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> Optional[Mapping]:
170
"""
171
Builds JSON request body for Notion search API.
172
173
Args:
174
next_page_token: Pagination token for next page
175
**kwargs: Additional request parameters
176
177
Returns:
178
Request body with sort, filter, and pagination parameters
179
"""
180
181
def read_records(self, sync_mode: SyncMode, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping[str, Any]]:
182
"""
183
Reads records with state management and error handling.
184
Handles invalid cursor errors and updates state incrementally.
185
186
Args:
187
sync_mode: FULL_REFRESH or INCREMENTAL
188
stream_state: Current stream state for incremental sync
189
**kwargs: Additional read parameters
190
191
Yields:
192
Record mappings with updated state
193
"""
194
195
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
196
"""
197
Parses response with state filtering for incremental sync.
198
Only yields records newer than state cursor and start_date.
199
200
Args:
201
response: HTTP response object
202
stream_state: Current stream state
203
**kwargs: Additional parsing parameters
204
205
Yields:
206
Filtered record mappings
207
"""
208
209
def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
210
"""
211
Updates stream state with latest record cursor value.
212
213
Args:
214
current_stream_state: Current state mapping
215
latest_record: Latest processed record
216
217
Returns:
218
Updated state mapping with new cursor value
219
"""
220
```
221
222
### State Management Helper
223
224
Utility class for managing incremental sync state with proper cursor handling.
225
226
```python { .api }
227
class StateValueWrapper(pydantic.BaseModel):
228
"""
229
Wrapper for stream state values that handles cursor timing.
230
Provides different values during sync vs after completion.
231
"""
232
233
stream: T
234
state_value: str
235
max_cursor_time: Any = ""
236
237
@property
238
def value(self) -> str:
239
"""
240
Returns appropriate cursor value based on stream status.
241
Uses max_cursor_time when stream is finished, state_value during sync.
242
243
Returns:
244
Current cursor value as string
245
"""
246
247
def dict(self, **kwargs) -> dict:
248
"""
249
Serializes to dictionary with just the current value.
250
251
Returns:
252
Dictionary with root key containing current value
253
"""
254
```
255
256
### Availability Strategy
257
258
Custom availability strategy for handling Notion-specific error responses.
259
260
```python { .api }
261
class NotionAvailabilityStrategy(HttpAvailabilityStrategy):
262
"""
263
Custom availability strategy with Notion-specific error messaging.
264
Provides clearer guidance for common permission issues.
265
"""
266
267
def reasons_for_unavailable_status_codes(self, stream: Stream, logger: Logger, source: Source, error: HTTPError) -> Dict[int, str]:
268
"""
269
Returns custom error messages for HTTP status codes.
270
271
Args:
272
stream: Stream instance
273
logger: Logger instance
274
source: Source instance
275
error: HTTP error object
276
277
Returns:
278
Dictionary mapping status codes to user-friendly messages
279
"""
280
```
281
282
## Usage Examples
283
284
### Basic Stream Implementation
285
286
```python
287
from source_notion.streams import NotionStream
288
289
class CustomNotionStream(NotionStream):
290
def path(self, **kwargs) -> str:
291
return "custom-endpoint"
292
293
def parse_response(self, response, **kwargs):
294
for record in response.json().get("results", []):
295
yield record
296
297
# Initialize stream
298
config = {"start_date": "2023-01-01T00:00:00.000Z"}
299
stream = CustomNotionStream(config=config, authenticator=authenticator)
300
```
301
302
### Incremental Stream Implementation
303
304
```python
305
from source_notion.streams import IncrementalNotionStream
306
307
class CustomIncrementalStream(IncrementalNotionStream):
308
def __init__(self, **kwargs):
309
super().__init__(obj_type="page", **kwargs)
310
311
# Use with state management
312
stream_state = {"last_edited_time": "2023-01-01T00:00:00.000Z"}
313
records = stream.read_records(
314
sync_mode=SyncMode.incremental,
315
stream_state=stream_state
316
)
317
```
318
319
### Error Handling and Retry Logic
320
321
```python
322
import requests
323
from source_notion.streams import NotionStream
324
325
# The streams automatically handle:
326
# - Rate limiting with retry-after headers
327
# - Page size throttling on 504 timeouts
328
# - Invalid cursor detection and recovery
329
# - Notion API version headers
330
331
# Custom backoff behavior
332
class MyStream(NotionStream):
333
def should_retry(self, response: requests.Response) -> bool:
334
if response.status_code == 504:
335
# Page size automatically reduced
336
self.logger.info(f"Reduced page size to {self.page_size}")
337
return super().should_retry(response)
338
```
339
340
### State Management
341
342
```python
343
from source_notion.streams import StateValueWrapper, IncrementalNotionStream
344
345
# State wrapper automatically handles cursor timing
346
class MyIncrementalStream(IncrementalNotionStream):
347
def read_records(self, sync_mode, stream_state=None, **kwargs):
348
# State wrapper ensures proper cursor values
349
for record in super().read_records(sync_mode, stream_state, **kwargs):
350
# State automatically updated with latest cursor
351
yield record
352
353
# Access current state
354
stream = MyIncrementalStream(config=config)
355
current_state = stream.state # Gets StateValueWrapper
356
cursor_value = current_state["last_edited_time"].value
357
```
358
359
## Constants
360
361
```python { .api }
362
MAX_BLOCK_DEPTH: int = 30
363
```
364
365
Maximum recursive depth for block hierarchy traversal to prevent infinite loops.