0
# Stream Operations
1
2
Stream classes for handling Webflow data extraction, including collection discovery, schema generation, and data retrieval with automatic pagination and type conversion. All streams extend the base WebflowStream class.
3
4
## Capabilities
5
6
### Base Stream Class
7
8
Abstract base class providing common functionality for all Webflow streams, including API base URL and authentication handling.
9
10
```python { .api }
11
class WebflowStream(HttpStream, ABC):
12
"""Base class for Webflow streams with common API functionality."""
13
14
url_base = "https://api.webflow.com/"
15
16
@property
17
def authenticator(self) -> WebflowTokenAuthenticator: ...
18
19
def request_params(
20
self,
21
stream_state: Mapping[str, Any],
22
stream_slice: Mapping[str, any] = None,
23
next_page_token: Mapping[str, Any] = None
24
) -> MutableMapping[str, Any]: ...
25
```
26
27
### Collection Contents Stream
28
29
Retrieves items from a specific Webflow collection with automatic pagination and dynamic schema generation.
30
31
```python { .api }
32
class CollectionContents(WebflowStream):
33
"""Stream for extracting items from a Webflow collection."""
34
35
primary_key = None
36
37
def __init__(self, site_id: str = None, collection_id: str = None, collection_name: str = None, **kwargs):
38
"""
39
Initialize collection contents stream.
40
41
Parameters:
42
- site_id: Webflow site identifier
43
- collection_id: Webflow collection identifier for API calls
44
- collection_name: Human-readable collection name for stream naming
45
"""
46
47
@property
48
def name(self) -> str:
49
"""Return the collection name as the stream name."""
50
51
def path(self, **kwargs) -> str:
52
"""
53
API path for collection items.
54
55
Returns:
56
String path in format: collections/{collection_id}/items
57
"""
58
59
def get_json_schema(self) -> Mapping[str, Any]:
60
"""
61
Generate JSON schema for collection based on Webflow field definitions.
62
63
Returns:
64
JSON schema dictionary with properties for each field in the collection
65
"""
66
67
def next_page_token(self, response: requests.Response) -> Mapping[str, Any]:
68
"""
69
Handle pagination using Webflow's offset-based system.
70
71
Parameters:
72
- response: HTTP response from Webflow API
73
74
Returns:
75
Dictionary with offset for next page, or empty dict if no more pages
76
"""
77
78
def request_params(
79
self,
80
stream_state: Mapping[str, Any],
81
stream_slice: Mapping[str, Any] = None,
82
next_page_token: Mapping[str, Any] = None,
83
) -> MutableMapping[str, Any]:
84
"""
85
Build request parameters including pagination.
86
87
Parameters:
88
- stream_state: Current stream state (unused for full refresh)
89
- stream_slice: Stream slice parameters (unused)
90
- next_page_token: Pagination token from previous response
91
92
Returns:
93
Dictionary with limit and optional offset parameters
94
"""
95
96
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
97
"""
98
Parse collection items from API response.
99
100
Parameters:
101
- response: HTTP response from Webflow items API
102
103
Returns:
104
Iterator yielding individual collection items
105
"""
106
```
107
108
### Collections List Stream
109
110
Retrieves metadata about all collections available in a Webflow site.
111
112
```python { .api }
113
class CollectionsList(WebflowStream):
114
"""Stream for listing available collections in a Webflow site."""
115
116
primary_key = None
117
118
def __init__(self, site_id: str = None, **kwargs):
119
"""
120
Initialize collections list stream.
121
122
Parameters:
123
- site_id: Webflow site identifier
124
"""
125
126
def path(self, **kwargs) -> str:
127
"""
128
API path for collections list.
129
130
Returns:
131
String path in format: sites/{site_id}/collections
132
"""
133
134
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
135
"""
136
Parse collections list from API response.
137
138
Parameters:
139
- response: HTTP response from Webflow collections API
140
141
Returns:
142
Iterator yielding collection metadata objects
143
"""
144
145
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
146
"""
147
Pagination token (collections list doesn't paginate).
148
149
Returns:
150
Empty dictionary as this API doesn't support pagination
151
"""
152
```
153
154
### Collection Schema Stream
155
156
Retrieves and converts Webflow collection schemas to Airbyte-compatible JSON schemas.
157
158
```python { .api }
159
class CollectionSchema(WebflowStream):
160
"""Stream for retrieving collection field schemas from Webflow."""
161
162
primary_key = None
163
164
def __init__(self, collection_id: str = None, **kwargs):
165
"""
166
Initialize collection schema stream.
167
168
Parameters:
169
- collection_id: Webflow collection identifier
170
"""
171
172
def path(self, **kwargs) -> str:
173
"""
174
API path for collection schema.
175
176
Returns:
177
String path in format: collections/{collection_id}
178
"""
179
180
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
181
"""
182
Parse and convert Webflow schema to Airbyte format.
183
184
Parameters:
185
- response: HTTP response from Webflow collection schema API
186
187
Returns:
188
Iterator yielding field schema mappings
189
190
Raises:
191
Exception: If field type is not supported in the mapping
192
"""
193
194
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
195
"""
196
Pagination token (schema doesn't paginate).
197
198
Returns:
199
Empty dictionary as this API doesn't support pagination
200
"""
201
```
202
203
## Usage Examples
204
205
### Reading Collection Items
206
207
```python
208
from source_webflow.source import CollectionContents
209
from source_webflow.auth import WebflowTokenAuthenticator
210
211
# Create authenticator
212
auth = WebflowTokenAuthenticator(token="your_api_token", accept_version="1.0.0")
213
214
# Create stream for a specific collection
215
stream = CollectionContents(
216
authenticator=auth,
217
site_id="your_site_id",
218
collection_id="collection_id_from_api",
219
collection_name="Blog Posts"
220
)
221
222
# Read all records
223
records = stream.read_records(sync_mode="full_refresh")
224
for record in records:
225
print(f"Item: {record}")
226
227
# Get the JSON schema
228
schema = stream.get_json_schema()
229
print(f"Schema: {schema}")
230
```
231
232
### Discovering Collections
233
234
```python
235
from source_webflow.source import CollectionsList
236
from source_webflow.auth import WebflowTokenAuthenticator
237
238
# Create authenticator
239
auth = WebflowTokenAuthenticator(token="your_api_token", accept_version="1.0.0")
240
241
# Create collections list stream
242
stream = CollectionsList(authenticator=auth, site_id="your_site_id")
243
244
# Get all collections
245
collections = stream.read_records(sync_mode="full_refresh")
246
for collection in collections:
247
print(f"Collection: {collection['name']} (ID: {collection['_id']})")
248
```
249
250
### Getting Collection Schema
251
252
```python
253
from source_webflow.source import CollectionSchema
254
from source_webflow.auth import WebflowTokenAuthenticator
255
256
# Create authenticator
257
auth = WebflowTokenAuthenticator(token="your_api_token", accept_version="1.0.0")
258
259
# Create schema stream
260
stream = CollectionSchema(authenticator=auth, collection_id="your_collection_id")
261
262
# Get schema fields
263
schema_fields = stream.read_records(sync_mode="full_refresh")
264
for field in schema_fields:
265
print(f"Field schema: {field}")
266
```
267
268
## Stream Properties
269
270
All streams have the following common properties:
271
272
- `primary_key = None`: No incremental sync support, full refresh only
273
- `url_base = "https://api.webflow.com/"`: Base URL for Webflow API
274
- Automatic authentication header injection via WebflowTokenAuthenticator
275
- JSON response parsing with appropriate error handling
276
277
## Pagination Behavior
278
279
- **CollectionContents**: Uses offset-based pagination with configurable limit (default: 100)
280
- **CollectionsList**: No pagination, returns all collections in single response
281
- **CollectionSchema**: No pagination, returns all fields in single response
282
283
## Schema Generation
284
285
The CollectionContents stream dynamically generates JSON schemas by:
286
1. Fetching field definitions from Webflow via CollectionSchema
287
2. Converting Webflow field types to JSON schema types using WebflowToAirbyteMapping
288
3. Adding standard Webflow fields (_id, _cid, _locale) that aren't in the schema API
289
4. Returning a complete JSON Schema v7 compatible schema