or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connector-setup.mddata-streams.mdindex.mdstream-management.mdtransformations.md

data-streams.mddocs/

0

# Data Streams

1

2

Specific stream implementations for extracting different types of data from Notion workspaces, including pages and nested block content with hierarchical traversal.

3

4

## Capabilities

5

6

### Pages Stream

7

8

Stream for extracting Notion pages from workspaces and databases with incremental synchronization support.

9

10

```python { .api }

11

class Pages(IncrementalNotionStream):

12

"""

13

Stream for Notion pages with incremental sync capability.

14

Serves as parent stream for Blocks substream and implements

15

checkpointing for efficient large-scale page extraction.

16

"""

17

18

state_checkpoint_interval: int = 100

19

20

def __init__(self, **kwargs):

21

"""

22

Initializes Pages stream with "page" object type filter.

23

Configured for incremental sync with regular state checkpoints.

24

25

Args:

26

**kwargs: Stream configuration parameters including authenticator and config

27

"""

28

```

29

30

### Blocks Stream

31

32

Advanced substream for extracting block content from pages with recursive hierarchy traversal and depth limiting.

33

34

```python { .api }

35

class Blocks(HttpSubStream, IncrementalNotionStream):

36

"""

37

Substream for extracting block content from Notion pages.

38

Implements recursive traversal of block hierarchies with depth limiting

39

and supports incremental sync based on parent page updates.

40

"""

41

42

http_method: str = "GET"

43

block_id_stack: List[str] = []

44

45

def path(self, **kwargs) -> str:

46

"""

47

Returns API path for block children endpoint.

48

Uses current block ID from stack for nested traversal.

49

50

Returns:

51

API path string: "blocks/{block_id}/children"

52

"""

53

54

def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:

55

"""

56

Builds request parameters for block children API.

57

58

Args:

59

next_page_token: Pagination token for continuation

60

**kwargs: Additional request parameters

61

62

Returns:

63

Parameters dictionary with page_size and optional start_cursor

64

"""

65

66

def stream_slices(self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None) -> Iterable[Optional[Mapping[str, Any]]]:

67

"""

68

Generates stream slices based on parent Pages stream.

69

Each slice represents a page whose blocks should be extracted.

70

71

Args:

72

sync_mode: Sync mode (FULL_REFRESH or INCREMENTAL)

73

cursor_field: List of cursor field names

74

stream_state: Current stream state for incremental sync

75

76

Yields:

77

Stream slice dictionaries with page_id for block extraction

78

"""

79

80

def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]:

81

"""

82

Transforms block records to normalize mention object structure.

83

Moves mention type data to standardized 'info' field.

84

85

Args:

86

record: Raw block record from API

87

88

Returns:

89

Transformed record with normalized mention structure

90

"""

91

92

def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:

93

"""

94

Parses block children response and filters unsupported block types.

95

Excludes child_page, child_database, and ai_block types.

96

97

Args:

98

response: HTTP response from blocks API

99

stream_state: Current stream state

100

**kwargs: Additional parsing parameters

101

102

Yields:

103

Filtered and transformed block records

104

"""

105

106

def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:

107

"""

108

Reads block records with recursive hierarchy traversal.

109

Implements depth-first traversal with MAX_BLOCK_DEPTH limit.

110

Automatically handles nested blocks with has_children flag.

111

112

Args:

113

**kwargs: Read parameters including sync configuration

114

115

Yields:

116

Block records including nested children up to depth limit

117

"""

118

119

def should_retry(self, response: requests.Response) -> bool:

120

"""

121

Custom retry logic for block-specific errors.

122

Handles 404 errors for inaccessible blocks and 400 errors for unsupported ai_block types.

123

124

Args:

125

response: HTTP response object

126

127

Returns:

128

True if request should be retried, False to skip

129

"""

130

```

131

132

## Usage Examples

133

134

### Basic Pages Stream Usage

135

136

```python

137

from source_notion.streams import Pages

138

from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator

139

140

# Setup authentication

141

authenticator = TokenAuthenticator("your_notion_token")

142

143

# Initialize pages stream

144

config = {

145

"start_date": "2023-01-01T00:00:00.000Z"

146

}

147

148

pages_stream = Pages(

149

authenticator=authenticator,

150

config=config

151

)

152

153

# Read pages with incremental sync

154

from airbyte_cdk.models import SyncMode

155

156

stream_state = {"last_edited_time": "2023-06-01T00:00:00.000Z"}

157

for page in pages_stream.read_records(

158

sync_mode=SyncMode.incremental,

159

stream_state=stream_state

160

):

161

print(f"Page: {page['id']} - {page.get('properties', {})}")

162

```

163

164

### Blocks Stream with Parent Dependency

165

166

```python

167

from source_notion.streams import Pages, Blocks

168

169

# Setup parent stream (Pages)

170

pages_stream = Pages(authenticator=authenticator, config=config)

171

172

# Initialize blocks substream

173

blocks_stream = Blocks(

174

parent=pages_stream,

175

authenticator=authenticator,

176

config=config

177

)

178

179

# Read blocks for all pages

180

for block in blocks_stream.read_records(sync_mode=SyncMode.full_refresh):

181

print(f"Block: {block['id']} - Type: {block['type']}")

182

183

# Check for nested structure

184

if block.get('has_children'):

185

print(f" Has children: {block['has_children']}")

186

```

187

188

### Recursive Block Traversal

189

190

```python

191

# The Blocks stream automatically handles recursive traversal

192

# Example of what happens internally:

193

194

blocks_stream = Blocks(parent=pages_stream, authenticator=authenticator, config=config)

195

196

# Stream slices come from parent pages

197

for slice_data in blocks_stream.stream_slices(SyncMode.full_refresh):

198

page_id = slice_data["page_id"]

199

print(f"Processing blocks for page: {page_id}")

200

201

# Blocks are read recursively up to MAX_BLOCK_DEPTH (30 levels)

202

for block in blocks_stream.read_records():

203

print(f" Block {block['id']} at depth {len(blocks_stream.block_id_stack)}")

204

```

205

206

### Handling Block Transformations

207

208

```python

209

# Example of mention transformation that happens automatically

210

original_block = {

211

"type": "paragraph",

212

"paragraph": {

213

"rich_text": [{

214

"mention": {

215

"type": "user",

216

"user": {

217

"id": "user-123",

218

"name": "John Doe"

219

}

220

}

221

}]

222

}

223

}

224

225

# After transformation:

226

transformed_block = {

227

"type": "paragraph",

228

"paragraph": {

229

"rich_text": [{

230

"mention": {

231

"type": "user",

232

"info": { # Moved from "user" to "info"

233

"id": "user-123",

234

"name": "John Doe"

235

}

236

}

237

}]

238

}

239

}

240

```

241

242

### Error Handling for Blocks

243

244

```python

245

# Blocks stream handles various error scenarios automatically:

246

247

class CustomBlocksStream(Blocks):

248

def should_retry(self, response):

249

if response.status_code == 404:

250

# Block not accessible - logged and skipped

251

self.logger.error(f"Block not accessible: {response.json()}")

252

return False

253

elif response.status_code == 400:

254

error = response.json()

255

if "ai_block is not supported" in error.get("message", ""):

256

# AI blocks are unsupported - logged and skipped

257

self.logger.error("AI block type not supported, skipping")

258

return False

259

260

return super().should_retry(response)

261

```

262

263

### State Management in Incremental Sync

264

265

```python

266

# Pages stream with checkpointing

267

pages_stream = Pages(authenticator=authenticator, config=config)

268

269

# State is checkpointed every 100 records (state_checkpoint_interval)

270

records_processed = 0

271

for page in pages_stream.read_records(

272

sync_mode=SyncMode.incremental,

273

stream_state={"last_edited_time": "2023-01-01T00:00:00.000Z"}

274

):

275

records_processed += 1

276

if records_processed % 100 == 0:

277

# State automatically checkpointed

278

current_state = pages_stream.state

279

print(f"Checkpointed at: {current_state}")

280

```

281

282

### Block Hierarchy Depth Control

283

284

```python

285

from source_notion.streams import MAX_BLOCK_DEPTH

286

287

# Depth limiting is automatic but can be monitored

288

class MonitoredBlocksStream(Blocks):

289

def read_records(self, **kwargs):

290

if len(self.block_id_stack) > MAX_BLOCK_DEPTH:

291

self.logger.warning(f"Reached maximum depth {MAX_BLOCK_DEPTH}, stopping traversal")

292

return

293

294

# Continue with normal traversal

295

yield from super().read_records(**kwargs)

296

```

297

298

### Stream Integration with SourceNotion

299

300

```python

301

# How streams are integrated in the main connector

302

from source_notion import SourceNotion

303

304

source = SourceNotion()

305

config = {

306

"credentials": {

307

"auth_type": "token",

308

"token": "your_token"

309

},

310

"start_date": "2023-01-01T00:00:00.000Z"

311

}

312

313

# Get all streams (includes Pages and Blocks)

314

all_streams = source.streams(config)

315

316

# Find specific streams

317

pages_stream = next(s for s in all_streams if s.name == "pages")

318

blocks_stream = next(s for s in all_streams if s.name == "blocks")

319

320

# Blocks stream is automatically configured with Pages as parent

321

assert blocks_stream.parent == pages_stream

322

```

323

324

## Stream Dependencies

325

326

- **Blocks** stream depends on **Pages** stream as its parent

327

- Pages must be read first to provide page IDs for block extraction

328

- Block hierarchy traversal maintains parent-child relationships

329

- State synchronization ensures consistent incremental updates across related streams