or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdindex.mdsource-configuration.mdstream-operations.md

stream-operations.mddocs/

0

# Stream Operations

1

2

Stream classes for handling Webflow data extraction, including collection discovery, schema generation, and data retrieval with automatic pagination and type conversion. All streams extend the base WebflowStream class.

3

4

## Capabilities

5

6

### Base Stream Class

7

8

Abstract base class providing common functionality for all Webflow streams, including API base URL and authentication handling.

9

10

```python { .api }

11

class WebflowStream(HttpStream, ABC):

12

"""Base class for Webflow streams with common API functionality."""

13

14

url_base = "https://api.webflow.com/"

15

16

@property

17

def authenticator(self) -> WebflowTokenAuthenticator: ...

18

19

def request_params(

20

self,

21

stream_state: Mapping[str, Any],

22

stream_slice: Mapping[str, any] = None,

23

next_page_token: Mapping[str, Any] = None

24

) -> MutableMapping[str, Any]: ...

25

```

26

27

### Collection Contents Stream

28

29

Retrieves items from a specific Webflow collection with automatic pagination and dynamic schema generation.

30

31

```python { .api }

32

class CollectionContents(WebflowStream):

33

"""Stream for extracting items from a Webflow collection."""

34

35

primary_key = None

36

37

def __init__(self, site_id: str = None, collection_id: str = None, collection_name: str = None, **kwargs):

38

"""

39

Initialize collection contents stream.

40

41

Parameters:

42

- site_id: Webflow site identifier

43

- collection_id: Webflow collection identifier for API calls

44

- collection_name: Human-readable collection name for stream naming

45

"""

46

47

@property

48

def name(self) -> str:

49

"""Return the collection name as the stream name."""

50

51

def path(self, **kwargs) -> str:

52

"""

53

API path for collection items.

54

55

Returns:

56

String path in format: collections/{collection_id}/items

57

"""

58

59

def get_json_schema(self) -> Mapping[str, Any]:

60

"""

61

Generate JSON schema for collection based on Webflow field definitions.

62

63

Returns:

64

JSON schema dictionary with properties for each field in the collection

65

"""

66

67

def next_page_token(self, response: requests.Response) -> Mapping[str, Any]:

68

"""

69

Handle pagination using Webflow's offset-based system.

70

71

Parameters:

72

- response: HTTP response from Webflow API

73

74

Returns:

75

Dictionary with offset for next page, or empty dict if no more pages

76

"""

77

78

def request_params(

79

self,

80

stream_state: Mapping[str, Any],

81

stream_slice: Mapping[str, Any] = None,

82

next_page_token: Mapping[str, Any] = None,

83

) -> MutableMapping[str, Any]:

84

"""

85

Build request parameters including pagination.

86

87

Parameters:

88

- stream_state: Current stream state (unused for full refresh)

89

- stream_slice: Stream slice parameters (unused)

90

- next_page_token: Pagination token from previous response

91

92

Returns:

93

Dictionary with limit and optional offset parameters

94

"""

95

96

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:

97

"""

98

Parse collection items from API response.

99

100

Parameters:

101

- response: HTTP response from Webflow items API

102

103

Returns:

104

Iterator yielding individual collection items

105

"""

106

```

107

108

### Collections List Stream

109

110

Retrieves metadata about all collections available in a Webflow site.

111

112

```python { .api }

113

class CollectionsList(WebflowStream):

114

"""Stream for listing available collections in a Webflow site."""

115

116

primary_key = None

117

118

def __init__(self, site_id: str = None, **kwargs):

119

"""

120

Initialize collections list stream.

121

122

Parameters:

123

- site_id: Webflow site identifier

124

"""

125

126

def path(self, **kwargs) -> str:

127

"""

128

API path for collections list.

129

130

Returns:

131

String path in format: sites/{site_id}/collections

132

"""

133

134

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:

135

"""

136

Parse collections list from API response.

137

138

Parameters:

139

- response: HTTP response from Webflow collections API

140

141

Returns:

142

Iterator yielding collection metadata objects

143

"""

144

145

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:

146

"""

147

Pagination token (collections list doesn't paginate).

148

149

Returns:

150

Empty dictionary as this API doesn't support pagination

151

"""

152

```

153

154

### Collection Schema Stream

155

156

Retrieves and converts Webflow collection schemas to Airbyte-compatible JSON schemas.

157

158

```python { .api }

159

class CollectionSchema(WebflowStream):

160

"""Stream for retrieving collection field schemas from Webflow."""

161

162

primary_key = None

163

164

def __init__(self, collection_id: str = None, **kwargs):

165

"""

166

Initialize collection schema stream.

167

168

Parameters:

169

- collection_id: Webflow collection identifier

170

"""

171

172

def path(self, **kwargs) -> str:

173

"""

174

API path for collection schema.

175

176

Returns:

177

String path in format: collections/{collection_id}

178

"""

179

180

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:

181

"""

182

Parse and convert Webflow schema to Airbyte format.

183

184

Parameters:

185

- response: HTTP response from Webflow collection schema API

186

187

Returns:

188

Iterator yielding field schema mappings

189

190

Raises:

191

Exception: If field type is not supported in the mapping

192

"""

193

194

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:

195

"""

196

Pagination token (schema doesn't paginate).

197

198

Returns:

199

Empty dictionary as this API doesn't support pagination

200

"""

201

```

202

203

## Usage Examples

204

205

### Reading Collection Items

206

207

```python

208

from source_webflow.source import CollectionContents

209

from source_webflow.auth import WebflowTokenAuthenticator

210

211

# Create authenticator

212

auth = WebflowTokenAuthenticator(token="your_api_token", accept_version="1.0.0")

213

214

# Create stream for a specific collection

215

stream = CollectionContents(

216

authenticator=auth,

217

site_id="your_site_id",

218

collection_id="collection_id_from_api",

219

collection_name="Blog Posts"

220

)

221

222

# Read all records

223

records = stream.read_records(sync_mode="full_refresh")

224

for record in records:

225

print(f"Item: {record}")

226

227

# Get the JSON schema

228

schema = stream.get_json_schema()

229

print(f"Schema: {schema}")

230

```

231

232

### Discovering Collections

233

234

```python

235

from source_webflow.source import CollectionsList

236

from source_webflow.auth import WebflowTokenAuthenticator

237

238

# Create authenticator

239

auth = WebflowTokenAuthenticator(token="your_api_token", accept_version="1.0.0")

240

241

# Create collections list stream

242

stream = CollectionsList(authenticator=auth, site_id="your_site_id")

243

244

# Get all collections

245

collections = stream.read_records(sync_mode="full_refresh")

246

for collection in collections:

247

print(f"Collection: {collection['name']} (ID: {collection['_id']})")

248

```

249

250

### Getting Collection Schema

251

252

```python

253

from source_webflow.source import CollectionSchema

254

from source_webflow.auth import WebflowTokenAuthenticator

255

256

# Create authenticator

257

auth = WebflowTokenAuthenticator(token="your_api_token", accept_version="1.0.0")

258

259

# Create schema stream

260

stream = CollectionSchema(authenticator=auth, collection_id="your_collection_id")

261

262

# Get schema fields

263

schema_fields = stream.read_records(sync_mode="full_refresh")

264

for field in schema_fields:

265

print(f"Field schema: {field}")

266

```

267

268

## Stream Properties

269

270

All streams have the following common properties:

271

272

- `primary_key = None`: No incremental sync support, full refresh only

273

- `url_base = "https://api.webflow.com/"`: Base URL for Webflow API

274

- Automatic authentication header injection via WebflowTokenAuthenticator

275

- JSON response parsing with appropriate error handling

276

277

## Pagination Behavior

278

279

- **CollectionContents**: Uses offset-based pagination with configurable limit (default: 100)

280

- **CollectionsList**: No pagination, returns all collections in single response

281

- **CollectionSchema**: No pagination, returns all fields in single response

282

283

## Schema Generation

284

285

The CollectionContents stream dynamically generates JSON schemas by:

286

1. Fetching field definitions from Webflow via CollectionSchema

287

2. Converting Webflow field types to JSON schema types using WebflowToAirbyteMapping

288

3. Adding standard Webflow fields (_id, _cid, _locale) that aren't in the schema API

289

4. Returning a complete JSON Schema v7 compatible schema