or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connector-setup.mddata-streams.mdindex.mdstream-management.mdtransformations.md

stream-management.mddocs/

0

# Stream Management

1

2

Base classes and functionality for managing Notion data streams with pagination, error handling, and incremental sync capabilities.

3

4

## Capabilities

5

6

### Base Notion Stream

7

8

The foundational class for all Notion API streams with common functionality for rate limiting, pagination, and error handling.

9

10

```python { .api }

11

class NotionStream(HttpStream, ABC):

12

"""

13

Abstract base class for Notion API streams.

14

Provides common functionality including rate limiting, pagination,

15

error handling, and Notion-specific API behaviors.

16

"""

17

18

url_base: str = "https://api.notion.com/v1/"

19

primary_key: str = "id"

20

page_size: int = 100

21

raise_on_http_errors: bool = True

22

23

def __init__(self, config: Mapping[str, Any], **kwargs):

24

"""

25

Initializes stream with configuration and sets start_date.

26

If start_date not provided, defaults to 2 years ago.

27

28

Args:

29

config: Stream configuration mapping

30

**kwargs: Additional stream parameters

31

"""

32

33

@property

34

def availability_strategy(self) -> HttpAvailabilityStrategy:

35

"""Returns NotionAvailabilityStrategy for custom error handling."""

36

37

@property

38

def retry_factor(self) -> int:

39

"""Retry factor for exponential backoff (5)."""

40

41

@property

42

def max_retries(self) -> int:

43

"""Maximum number of retry attempts (7)."""

44

45

@property

46

def max_time(self) -> int:

47

"""Maximum time in seconds for retries (660)."""

48

49

@staticmethod

50

def check_invalid_start_cursor(response: requests.Response) -> Optional[str]:

51

"""

52

Checks if response contains invalid start cursor error.

53

54

Args:

55

response: HTTP response object

56

57

Returns:

58

Error message if invalid cursor detected, None otherwise

59

"""

60

61

@staticmethod

62

def throttle_request_page_size(current_page_size: int) -> int:

63

"""

64

Reduces page size for retry after 504 Gateway Timeout.

65

66

Args:

67

current_page_size: Current page size value

68

69

Returns:

70

Throttled page size (minimum 10)

71

"""

72

73

def backoff_time(self, response: requests.Response) -> Optional[float]:

74

"""

75

Custom backoff logic for Notion API rate limiting.

76

Uses retry-after header for 429 responses (~3 req/sec limit).

77

78

Args:

79

response: HTTP response object

80

81

Returns:

82

Backoff time in seconds

83

"""

84

85

def should_retry(self, response: requests.Response) -> bool:

86

"""

87

Custom retry logic with page size throttling for 504 errors.

88

Automatically reduces page_size on timeout and restores on success.

89

90

Args:

91

response: HTTP response object

92

93

Returns:

94

True if request should be retried

95

"""

96

97

def request_headers(self, **kwargs) -> Mapping[str, Any]:

98

"""

99

Adds Notion-Version header to requests.

100

101

Returns:

102

Headers mapping with Notion-Version: 2022-06-28

103

"""

104

105

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:

106

"""

107

Extracts pagination token from Notion API response.

108

109

Args:

110

response: HTTP response object

111

112

Returns:

113

Next page token mapping or None if no more pages

114

"""

115

116

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:

117

"""

118

Parses Notion API response and yields result records.

119

120

Args:

121

response: HTTP response object

122

**kwargs: Additional parsing parameters

123

124

Yields:

125

Individual record mappings from results array

126

"""

127

```

128

129

### Incremental Sync Stream

130

131

Enhanced base class for streams supporting incremental synchronization with cursor-based state management.

132

133

```python { .api }

134

class IncrementalNotionStream(NotionStream, CheckpointMixin, ABC):

135

"""

136

Base class for Notion streams with incremental sync capability.

137

Implements cursor-based incremental sync with state checkpointing.

138

"""

139

140

cursor_field: str = "last_edited_time"

141

http_method: str = "POST"

142

is_finished: bool = True

143

144

def __init__(self, obj_type: Optional[str] = None, **kwargs):

145

"""

146

Initializes incremental stream with optional object type filter.

147

148

Args:

149

obj_type: Notion object type filter ("page" or "database")

150

**kwargs: Additional stream parameters

151

"""

152

153

@property

154

def state(self) -> MutableMapping[str, Any]:

155

"""Gets current stream state."""

156

157

@state.setter

158

def state(self, value: MutableMapping[str, Any]):

159

"""Sets stream state value."""

160

161

def path(self, **kwargs) -> str:

162

"""

163

Returns API path for search endpoint.

164

165

Returns:

166

"search" - Notion's search API endpoint

167

"""

168

169

def request_body_json(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> Optional[Mapping]:

170

"""

171

Builds JSON request body for Notion search API.

172

173

Args:

174

next_page_token: Pagination token for next page

175

**kwargs: Additional request parameters

176

177

Returns:

178

Request body with sort, filter, and pagination parameters

179

"""

180

181

def read_records(self, sync_mode: SyncMode, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping[str, Any]]:

182

"""

183

Reads records with state management and error handling.

184

Handles invalid cursor errors and updates state incrementally.

185

186

Args:

187

sync_mode: FULL_REFRESH or INCREMENTAL

188

stream_state: Current stream state for incremental sync

189

**kwargs: Additional read parameters

190

191

Yields:

192

Record mappings with updated state

193

"""

194

195

def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:

196

"""

197

Parses response with state filtering for incremental sync.

198

Only yields records newer than state cursor and start_date.

199

200

Args:

201

response: HTTP response object

202

stream_state: Current stream state

203

**kwargs: Additional parsing parameters

204

205

Yields:

206

Filtered record mappings

207

"""

208

209

def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:

210

"""

211

Updates stream state with latest record cursor value.

212

213

Args:

214

current_stream_state: Current state mapping

215

latest_record: Latest processed record

216

217

Returns:

218

Updated state mapping with new cursor value

219

"""

220

```

221

222

### State Management Helper

223

224

Utility class for managing incremental sync state with proper cursor handling.

225

226

```python { .api }

227

class StateValueWrapper(pydantic.BaseModel):

228

"""

229

Wrapper for stream state values that handles cursor timing.

230

Provides different values during sync vs after completion.

231

"""

232

233

stream: T

234

state_value: str

235

max_cursor_time: Any = ""

236

237

@property

238

def value(self) -> str:

239

"""

240

Returns appropriate cursor value based on stream status.

241

Uses max_cursor_time when stream is finished, state_value during sync.

242

243

Returns:

244

Current cursor value as string

245

"""

246

247

def dict(self, **kwargs) -> dict:

248

"""

249

Serializes to dictionary with just the current value.

250

251

Returns:

252

Dictionary with root key containing current value

253

"""

254

```

255

256

### Availability Strategy

257

258

Custom availability strategy for handling Notion-specific error responses.

259

260

```python { .api }

261

class NotionAvailabilityStrategy(HttpAvailabilityStrategy):

262

"""

263

Custom availability strategy with Notion-specific error messaging.

264

Provides clearer guidance for common permission issues.

265

"""

266

267

def reasons_for_unavailable_status_codes(self, stream: Stream, logger: Logger, source: Source, error: HTTPError) -> Dict[int, str]:

268

"""

269

Returns custom error messages for HTTP status codes.

270

271

Args:

272

stream: Stream instance

273

logger: Logger instance

274

source: Source instance

275

error: HTTP error object

276

277

Returns:

278

Dictionary mapping status codes to user-friendly messages

279

"""

280

```

281

282

## Usage Examples

283

284

### Basic Stream Implementation

285

286

```python

287

from source_notion.streams import NotionStream

288

289

class CustomNotionStream(NotionStream):

290

def path(self, **kwargs) -> str:

291

return "custom-endpoint"

292

293

def parse_response(self, response, **kwargs):

294

for record in response.json().get("results", []):

295

yield record

296

297

# Initialize stream

298

config = {"start_date": "2023-01-01T00:00:00.000Z"}

299

stream = CustomNotionStream(config=config, authenticator=authenticator)

300

```

301

302

### Incremental Stream Implementation

303

304

```python

305

from source_notion.streams import IncrementalNotionStream

306

307

class CustomIncrementalStream(IncrementalNotionStream):

308

def __init__(self, **kwargs):

309

super().__init__(obj_type="page", **kwargs)

310

311

# Use with state management

312

stream_state = {"last_edited_time": "2023-01-01T00:00:00.000Z"}

313

records = stream.read_records(

314

sync_mode=SyncMode.incremental,

315

stream_state=stream_state

316

)

317

```

318

319

### Error Handling and Retry Logic

320

321

```python

322

import requests

323

from source_notion.streams import NotionStream

324

325

# The streams automatically handle:

326

# - Rate limiting with retry-after headers

327

# - Page size throttling on 504 timeouts

328

# - Invalid cursor detection and recovery

329

# - Notion API version headers

330

331

# Custom backoff behavior

332

class MyStream(NotionStream):

333

def should_retry(self, response: requests.Response) -> bool:

334

if response.status_code == 504:

335

# Page size automatically reduced

336

self.logger.info(f"Reduced page size to {self.page_size}")

337

return super().should_retry(response)

338

```

339

340

### State Management

341

342

```python

343

from source_notion.streams import StateValueWrapper, IncrementalNotionStream

344

345

# State wrapper automatically handles cursor timing

346

class MyIncrementalStream(IncrementalNotionStream):

347

def read_records(self, sync_mode, stream_state=None, **kwargs):

348

# State wrapper ensures proper cursor values

349

for record in super().read_records(sync_mode, stream_state, **kwargs):

350

# State automatically updated with latest cursor

351

yield record

352

353

# Access current state

354

stream = MyIncrementalStream(config=config)

355

current_state = stream.state # Gets StateValueWrapper

356

cursor_value = current_state["last_edited_time"].value

357

```

358

359

## Constants

360

361

```python { .api }

362

MAX_BLOCK_DEPTH: int = 30

363

```

364

365

Maximum recursive depth for block hierarchy traversal to prevent infinite loops.