An Airbyte source connector for extracting data from Webflow CMS collections
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Stream classes for handling Webflow data extraction, including collection discovery, schema generation, and data retrieval with automatic pagination and type conversion. All streams extend the base WebflowStream class.
Abstract base class providing common functionality for all Webflow streams, including API base URL and authentication handling.
class WebflowStream(HttpStream, ABC):
"""Base class for Webflow streams with common API functionality."""
url_base = "https://api.webflow.com/"
@property
def authenticator(self) -> WebflowTokenAuthenticator: ...
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, any] = None,
next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]: ...Retrieves items from a specific Webflow collection with automatic pagination and dynamic schema generation.
class CollectionContents(WebflowStream):
"""Stream for extracting items from a Webflow collection."""
primary_key = None
def __init__(self, site_id: str = None, collection_id: str = None, collection_name: str = None, **kwargs):
"""
Initialize collection contents stream.
Parameters:
- site_id: Webflow site identifier
- collection_id: Webflow collection identifier for API calls
- collection_name: Human-readable collection name for stream naming
"""
@property
def name(self) -> str:
"""Return the collection name as the stream name."""
def path(self, **kwargs) -> str:
"""
API path for collection items.
Returns:
String path in format: collections/{collection_id}/items
"""
def get_json_schema(self) -> Mapping[str, Any]:
"""
Generate JSON schema for collection based on Webflow field definitions.
Returns:
JSON schema dictionary with properties for each field in the collection
"""
def next_page_token(self, response: requests.Response) -> Mapping[str, Any]:
"""
Handle pagination using Webflow's offset-based system.
Parameters:
- response: HTTP response from Webflow API
Returns:
Dictionary with offset for next page, or empty dict if no more pages
"""
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
"""
Build request parameters including pagination.
Parameters:
- stream_state: Current stream state (unused for full refresh)
- stream_slice: Stream slice parameters (unused)
- next_page_token: Pagination token from previous response
Returns:
Dictionary with limit and optional offset parameters
"""
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""
Parse collection items from API response.
Parameters:
- response: HTTP response from Webflow items API
Returns:
Iterator yielding individual collection items
"""Retrieves metadata about all collections available in a Webflow site.
class CollectionsList(WebflowStream):
"""Stream for listing available collections in a Webflow site."""
primary_key = None
def __init__(self, site_id: str = None, **kwargs):
"""
Initialize collections list stream.
Parameters:
- site_id: Webflow site identifier
"""
def path(self, **kwargs) -> str:
"""
API path for collections list.
Returns:
String path in format: sites/{site_id}/collections
"""
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""
Parse collections list from API response.
Parameters:
- response: HTTP response from Webflow collections API
Returns:
Iterator yielding collection metadata objects
"""
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""
Pagination token (collections list doesn't paginate).
Returns:
Empty dictionary as this API doesn't support pagination
"""Retrieves and converts Webflow collection schemas to Airbyte-compatible JSON schemas.
class CollectionSchema(WebflowStream):
"""Stream for retrieving collection field schemas from Webflow."""
primary_key = None
def __init__(self, collection_id: str = None, **kwargs):
"""
Initialize collection schema stream.
Parameters:
- collection_id: Webflow collection identifier
"""
def path(self, **kwargs) -> str:
"""
API path for collection schema.
Returns:
String path in format: collections/{collection_id}
"""
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""
Parse and convert Webflow schema to Airbyte format.
Parameters:
- response: HTTP response from Webflow collection schema API
Returns:
Iterator yielding field schema mappings
Raises:
Exception: If field type is not supported in the mapping
"""
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""
Pagination token (schema doesn't paginate).
Returns:
Empty dictionary as this API doesn't support pagination
"""from source_webflow.source import CollectionContents
from source_webflow.auth import WebflowTokenAuthenticator
# Create authenticator
auth = WebflowTokenAuthenticator(token="your_api_token", accept_version="1.0.0")
# Create stream for a specific collection
stream = CollectionContents(
authenticator=auth,
site_id="your_site_id",
collection_id="collection_id_from_api",
collection_name="Blog Posts"
)
# Read all records
records = stream.read_records(sync_mode="full_refresh")
for record in records:
print(f"Item: {record}")
# Get the JSON schema
schema = stream.get_json_schema()
print(f"Schema: {schema}")from source_webflow.source import CollectionsList
from source_webflow.auth import WebflowTokenAuthenticator
# Create authenticator
auth = WebflowTokenAuthenticator(token="your_api_token", accept_version="1.0.0")
# Create collections list stream
stream = CollectionsList(authenticator=auth, site_id="your_site_id")
# Get all collections
collections = stream.read_records(sync_mode="full_refresh")
for collection in collections:
print(f"Collection: {collection['name']} (ID: {collection['_id']})")from source_webflow.source import CollectionSchema
from source_webflow.auth import WebflowTokenAuthenticator
# Create authenticator
auth = WebflowTokenAuthenticator(token="your_api_token", accept_version="1.0.0")
# Create schema stream
stream = CollectionSchema(authenticator=auth, collection_id="your_collection_id")
# Get schema fields
schema_fields = stream.read_records(sync_mode="full_refresh")
for field in schema_fields:
print(f"Field schema: {field}")All streams have the following common properties:
primary_key = None: No incremental sync support, full refresh onlyurl_base = "https://api.webflow.com/": Base URL for Webflow APIThe CollectionContents stream dynamically generates JSON schemas by:
Install with Tessl CLI
npx tessl i tessl/pypi-airbyte-source-webflow