or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connector-setup.mddata-streams.mdindex.mdstream-management.mdtransformations.md

transformations.mddocs/

0

# Transformations

1

2

Custom components for transforming Notion API responses and filtering data for efficient incremental synchronization and normalized data structures.

3

4

## Capabilities

5

6

### User Record Transformation

7

8

Transforms Notion User records of type "bot" to normalize nested owner information structure.

9

10

```python { .api }

11

class NotionUserTransformation(RecordTransformation):

12

"""

13

Custom transformation for Notion User records of type "bot".

14

Moves nested owner type data to a standardized "info" field

15

for clarity and consistency in bot user records.

16

"""

17

18

def transform(self, record: MutableMapping[str, Any], **kwargs) -> MutableMapping[str, Any]:

19

"""

20

Transforms bot user records by normalizing owner information.

21

Moves data from owner.{owner_type} to owner.info field.

22

23

Args:

24

record: User record from Notion API

25

**kwargs: Additional transformation parameters

26

27

Returns:

28

Transformed record with normalized owner structure

29

"""

30

```

31

32

### Properties Normalization

33

34

Transforms nested properties objects in Notion pages and databases into normalized array format for easier processing.

35

36

```python { .api }

37

class NotionPropertiesTransformation(RecordTransformation):

38

"""

39

Transforms the nested 'properties' object within Notion Page/Database records.

40

Converts properties dictionary to normalized array format where each element

41

contains 'name' and 'value' keys for consistent processing.

42

"""

43

44

def transform(self, record: MutableMapping[str, Any], **kwargs) -> MutableMapping[str, Any]:

45

"""

46

Normalizes properties object from dictionary to array format.

47

Each property becomes {name: property_name, value: property_content}.

48

49

Args:

50

record: Page or Database record from Notion API

51

**kwargs: Additional transformation parameters

52

53

Returns:

54

Record with properties transformed to array format

55

"""

56

```

57

58

### Incremental Sync Data Filter

59

60

Custom filter for optimizing incremental sync performance with cursor-based pagination by respecting state values more granularly.

61

62

```python { .api }

63

class NotionDataFeedFilter(RecordFilter):

64

"""

65

Custom filter for Data Feed endpoints with incremental sync optimization.

66

Addresses issues with Notion's cursor-based pagination where state thresholds

67

aren't properly respected, causing unnecessary record processing.

68

"""

69

70

def filter_records(self, records: List[Mapping[str, Any]], stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, **kwargs) -> List[Mapping[str, Any]]:

71

"""

72

Filters records based on cursor value compared to current state.

73

Ensures only records newer than state cursor are processed,

74

improving incremental sync efficiency.

75

76

Args:

77

records: List of records from API response

78

stream_state: Current stream state with cursor information

79

stream_slice: Optional stream slice context

80

**kwargs: Additional filter parameters

81

82

Returns:

83

Filtered list containing only records newer than state cursor

84

"""

85

86

def _get_filter_date(self, start_date: str, state_value: list) -> str:

87

"""

88

Calculates effective filter date by comparing start_date with state value.

89

Returns the most recent date to use for record filtering.

90

91

Args:

92

start_date: Configured start date for sync

93

state_value: Current state cursor value

94

95

Returns:

96

Effective filter date string for record comparison

97

"""

98

```

99

100

## Usage Examples

101

102

### User Transformation Example

103

104

```python

105

from source_notion.components import NotionUserTransformation

106

107

# Original bot user record from Notion API

108

original_user = {

109

"object": "user",

110

"id": "bot-123",

111

"type": "bot",

112

"bot": {

113

"owner": {

114

"type": "workspace",

115

"workspace": {

116

"id": "workspace-456",

117

"name": "My Workspace"

118

}

119

}

120

}

121

}

122

123

# Apply transformation

124

transformer = NotionUserTransformation()

125

transformed_user = transformer.transform(original_user)

126

127

# Result: workspace data moved to info field

128

print(transformed_user["bot"]["owner"])

129

# {

130

# "type": "workspace",

131

# "info": {

132

# "id": "workspace-456",

133

# "name": "My Workspace"

134

# }

135

# }

136

```

137

138

### Properties Transformation Example

139

140

```python

141

from source_notion.components import NotionPropertiesTransformation

142

143

# Original page/database record with nested properties

144

original_record = {

145

"id": "page-123",

146

"properties": {

147

"Title": {

148

"type": "title",

149

"title": [{"text": {"content": "My Page"}}]

150

},

151

"Status": {

152

"type": "select",

153

"select": {"name": "In Progress"}

154

},

155

"Created": {

156

"type": "created_time",

157

"created_time": "2023-01-01T00:00:00.000Z"

158

}

159

}

160

}

161

162

# Apply transformation

163

transformer = NotionPropertiesTransformation()

164

transformed_record = transformer.transform(original_record)

165

166

# Result: properties converted to array format

167

print(transformed_record["properties"])

168

# [

169

# {

170

# "name": "Title",

171

# "value": {

172

# "type": "title",

173

# "title": [{"text": {"content": "My Page"}}]

174

# }

175

# },

176

# {

177

# "name": "Status",

178

# "value": {

179

# "type": "select",

180

# "select": {"name": "In Progress"}

181

# }

182

# },

183

# {

184

# "name": "Created",

185

# "value": {

186

# "type": "created_time",

187

# "created_time": "2023-01-01T00:00:00.000Z"

188

# }

189

# }

190

# ]

191

```

192

193

### Data Feed Filter Example

194

195

```python

196

from source_notion.components import NotionDataFeedFilter

197

198

# Sample records from API response

199

records = [

200

{"id": "1", "last_edited_time": "2023-01-01T00:00:00.000Z"},

201

{"id": "2", "last_edited_time": "2023-01-02T00:00:00.000Z"},

202

{"id": "3", "last_edited_time": "2023-01-03T00:00:00.000Z"},

203

{"id": "4", "last_edited_time": "2023-01-04T00:00:00.000Z"},

204

]

205

206

# Current stream state

207

stream_state = {

208

"last_edited_time": "2023-01-02T12:00:00.000Z"

209

}

210

211

# Configuration with start date

212

config = {"start_date": "2023-01-01T00:00:00.000Z"}

213

214

# Apply filter

215

filter_component = NotionDataFeedFilter(config=config)

216

filtered_records = filter_component.filter_records(records, stream_state)

217

218

# Result: only records newer than state cursor

219

print([r["id"] for r in filtered_records])

220

# ["3", "4"] - records 1 and 2 filtered out as they're older than state

221

```

222

223

### Integration with Declarative Streams

224

225

```python

226

# These transformations are automatically applied in manifest.yaml:

227

228

# For users stream:

229

transformations:

230

- type: CustomTransformation

231

class_name: source_notion.components.NotionUserTransformation

232

233

# For databases stream:

234

transformations:

235

- type: CustomTransformation

236

class_name: source_notion.components.NotionPropertiesTransformation

237

238

# For databases stream with incremental sync:

239

record_selector:

240

type: RecordSelector

241

record_filter:

242

type: CustomRecordFilter

243

class_name: source_notion.components.NotionDataFeedFilter

244

```

245

246

### Custom Transformation Implementation

247

248

```python

249

from source_notion.components import NotionPropertiesTransformation

250

251

# Extend for custom property handling

252

class CustomPropertiesTransformation(NotionPropertiesTransformation):

253

def transform(self, record, **kwargs):

254

# Apply base transformation

255

record = super().transform(record, **kwargs)

256

257

# Add custom logic

258

for prop in record.get("properties", []):

259

if prop["name"] == "Tags" and prop["value"]["type"] == "multi_select":

260

# Flatten multi-select values

261

tags = [option["name"] for option in prop["value"]["multi_select"]]

262

prop["value"]["tag_names"] = tags

263

264

return record

265

```

266

267

### Filter Date Calculation Logic

268

269

```python

270

from source_notion.components import NotionDataFeedFilter

271

272

# Understanding filter date calculation

273

filter_component = NotionDataFeedFilter(config={"start_date": "2023-01-01T00:00:00.000Z"})

274

275

# Case 1: No state value - uses start_date

276

filter_date = filter_component._get_filter_date("2023-01-01T00:00:00.000Z", None)

277

print(filter_date) # "2023-01-01T00:00:00.000Z"

278

279

# Case 2: State value newer than start_date - uses state value

280

filter_date = filter_component._get_filter_date(

281

"2023-01-01T00:00:00.000Z",

282

"2023-01-15T00:00:00.000Z"

283

)

284

print(filter_date) # "2023-01-15T00:00:00.000Z"

285

286

# Case 3: State value older than start_date - uses start_date

287

filter_date = filter_component._get_filter_date(

288

"2023-01-15T00:00:00.000Z",

289

"2023-01-01T00:00:00.000Z"

290

)

291

print(filter_date) # "2023-01-15T00:00:00.000Z"

292

```

293

294

### Performance Benefits

295

296

```python

297

# Without NotionDataFeedFilter:

298

# - API returns 100 records per page

299

# - Only 5 records are actually newer than state

300

# - 95 records unnecessarily processed

301

302

# With NotionDataFeedFilter:

303

# - Same 100 records retrieved from API

304

# - Filter eliminates 95 outdated records at component level

305

# - Only 5 records passed to downstream processing

306

# - Significantly reduces processing overhead

307

308

# Example measurement

309

import time

310

311

records = generate_test_records(10000) # 10k records

312

stream_state = {"last_edited_time": "2023-06-01T00:00:00.000Z"}

313

314

# Without filter

315

start = time.time()

316

processed = [r for r in records if r["last_edited_time"] >= "2023-06-01T00:00:00.000Z"]

317

no_filter_time = time.time() - start

318

319

# With filter component

320

start = time.time()

321

filter_component = NotionDataFeedFilter(config={})

322

filtered = filter_component.filter_records(records, stream_state)

323

with_filter_time = time.time() - start

324

325

print(f"Performance improvement: {(no_filter_time - with_filter_time) / no_filter_time * 100:.1f}%")

326

```

327

328

## Transformation Pipeline

329

330

The transformations are applied in this order within Airbyte streams:

331

332

1. **Raw API Response** → Records from Notion API

333

2. **Custom Filters** → NotionDataFeedFilter (for incremental streams)

334

3. **Record Transformations** → NotionUserTransformation, NotionPropertiesTransformation

335

4. **Schema Validation** → Airbyte schema enforcement

336

5. **Output Records** → Final normalized records for destination

337

338

This pipeline ensures data consistency and optimal performance for both full refresh and incremental synchronization modes.