Airbyte source connector for HubSpot that enables data synchronization from HubSpot's CRM and marketing platform to various destinations.
80
Foundation classes for extending and customizing HubSpot stream functionality. These abstract base classes provide common patterns and interfaces that can be extended to create custom streams or modify existing behavior.
Foundation classes that provide common HTTP and streaming functionality.
class BaseStream(HttpStream, ABC):
"""
Abstract base class for all HubSpot streams.
Provides common functionality including:
- HTTP client configuration and authentication
- Error handling and retry logic
- Scope validation and permissions checking
- Common request headers and parameters
- Response parsing and transformation
Abstract methods to implement:
- path(): API endpoint path
- parse_response(): Response parsing logic
"""
@abstractmethod
def path(self, **kwargs) -> str:
"""Return the API endpoint path for this stream."""
@abstractmethod
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping[str, Any]]:
"""Parse HTTP response into stream records."""
def scope_is_granted(self, granted_scopes: Set[str]) -> bool:
"""Check if required OAuth scopes are granted."""
def properties_scope_is_granted(self) -> bool:
"""Check if property schema scopes are granted."""Classes that support incremental data synchronization with cursor-based state management.
class IncrementalStream(BaseStream, ABC):
"""
Abstract base class for streams supporting incremental sync.
Provides incremental sync functionality including:
- Cursor-based state management
- Automatic filtering by update timestamps
- State persistence and recovery
- Lookback window handling
- Duplicate record detection
Abstract methods to implement:
- cursor_field: Field name for cursor tracking
- get_updated_state(): State update logic
"""
@property
@abstractmethod
def cursor_field(self) -> str:
"""Field name used for incremental sync cursor."""
@abstractmethod
def get_updated_state(
self,
current_stream_state: MutableMapping[str, Any],
latest_record: Mapping[str, Any]
) -> MutableMapping[str, Any]:
"""Update stream state with latest cursor value."""
class ClientSideIncrementalStream(BaseStream, CheckpointMixin):
"""
Base class for client-side incremental streams.
Handles incremental sync logic on the client side when
the API doesn't support server-side filtering.
Features:
- Client-side record filtering by timestamp
- Checkpoint-based state management
- Memory-efficient processing
- Automatic deduplication
"""Specialized classes for HubSpot CRM object streaming with built-in CRM patterns.
class CRMSearchStream(IncrementalStream, ABC):
"""
Base class for CRM objects using HubSpot's search API.
Provides CRM-specific functionality including:
- Search API integration with filters
- Association loading
- Property schema discovery
- Bulk property handling
- Archived record handling
Features:
- Automatic property discovery from HubSpot schemas
- Support for custom properties
- Association data loading
- Search-based pagination
- Incremental sync with search filters
"""
@property
@abstractmethod
def entity(self) -> str:
"""CRM entity name (e.g., 'contact', 'company', 'deal')."""
class CRMObjectStream(BaseStream):
"""
Base class for simple CRM object streams.
For CRM objects that use basic list endpoints
rather than the search API.
Features:
- Simple pagination
- Property filtering
- Basic error handling
- Standard CRM object patterns
"""
class CRMObjectIncrementalStream(CRMObjectStream, IncrementalStream):
"""
Incremental version of CRMObjectStream.
Combines simple CRM object access with incremental
sync capabilities for objects that support timestamp filtering.
Features:
- Timestamp-based incremental sync
- Simple CRM object endpoints
- Efficient state management
- Property-based filtering
"""Classes for specific HubSpot API patterns and use cases.
class AssociationsStream(BaseStream):
"""
Base class for loading object associations.
Handles HubSpot's association API patterns including:
- Association type management
- Bidirectional relationship handling
- Association metadata
- Bulk association loading
Features:
- Multiple association type support
- Pagination for large association sets
- Association metadata inclusion
- Performance optimizations
"""
def __init__(
self,
parent_stream: Stream,
identifiers: Iterable[Union[int, str]],
*args,
**kwargs
):
"""
Initialize associations stream.
Parameters:
- parent_stream: Source stream for object IDs
- identifiers: Object IDs to load associations for
"""from source_hubspot.streams import CRMSearchStream
from typing import Any, Iterable, Mapping, MutableMapping
class CustomObjectStream(CRMSearchStream):
"""Custom stream for a specific HubSpot custom object."""
entity = "my_custom_object" # HubSpot custom object name
scopes = {"crm.objects.custom.read", "crm.schemas.custom.read"}
def path(self, **kwargs) -> str:
return f"/crm/v3/objects/{self.entity}"
def get_json_schema(self) -> Mapping[str, Any]:
# Define custom schema or use dynamic discovery
return {
"type": "object",
"properties": {
"id": {"type": "string"},
"properties": {"type": "object"},
"createdAt": {"type": "string", "format": "date-time"},
"updatedAt": {"type": "string", "format": "date-time"}
}
}
# Usage
custom_stream = CustomObjectStream(
api=api,
start_date="2023-01-01T00:00:00Z",
credentials=credentials
)class CustomIncrementalStream(IncrementalStream):
"""Custom stream with incremental sync."""
primary_key = "id"
cursor_field = "updatedAt"
def path(self, **kwargs) -> str:
return "/custom/api/endpoint"
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
params = {"limit": 100}
# Add incremental filter
if stream_state and self.cursor_field in stream_state:
params["since"] = stream_state[self.cursor_field]
# Add pagination
if next_page_token:
params["offset"] = next_page_token["offset"]
return params
def parse_response(
self,
response: requests.Response,
**kwargs
) -> Iterable[Mapping[str, Any]]:
data = response.json()
yield from data.get("results", [])
def get_updated_state(
self,
current_stream_state: MutableMapping[str, Any],
latest_record: Mapping[str, Any]
) -> MutableMapping[str, Any]:
current_cursor = current_stream_state.get(self.cursor_field)
latest_cursor = latest_record.get(self.cursor_field)
if not current_cursor or latest_cursor > current_cursor:
return {self.cursor_field: latest_cursor}
return current_stream_stateclass CustomClientSideStream(ClientSideIncrementalStream):
"""Stream that handles incremental sync client-side."""
primary_key = "id"
cursor_field = "modified_date"
def path(self, **kwargs) -> str:
return "/api/all-records" # API doesn't support filtering
def parse_response(
self,
response: requests.Response,
**kwargs
) -> Iterable[Mapping[str, Any]]:
data = response.json()
records = data.get("items", [])
# Client-side filtering based on state
stream_state = kwargs.get("stream_state", {})
last_modified = stream_state.get(self.cursor_field)
for record in records:
if not last_modified or record[self.cursor_field] > last_modified:
yield recordclass EnhancedContactsStream(Contacts):
"""Contacts stream with additional custom processing."""
def parse_response(
self,
response: requests.Response,
**kwargs
) -> Iterable[Mapping[str, Any]]:
# Get base records
for record in super().parse_response(response, **kwargs):
# Add custom processing
if "properties" in record:
record["computed_score"] = self._calculate_score(record["properties"])
yield record
def _calculate_score(self, properties: Mapping[str, Any]) -> int:
"""Custom scoring logic."""
score = 0
if properties.get("email"):
score += 10
if properties.get("company"):
score += 20
return scoreclass RobustStream(BaseStream):
"""Stream with enhanced error handling."""
def parse_response(
self,
response: requests.Response,
**kwargs
) -> Iterable[Mapping[str, Any]]:
try:
data = response.json()
except json.JSONDecodeError:
self.logger.warning(f"Invalid JSON response: {response.text}")
return
if "errors" in data:
for error in data["errors"]:
self.logger.error(f"API Error: {error}")
return
yield from data.get("results", [])When extending base classes, you must implement these abstract methods:
path(): Return API endpoint pathparse_response(): Parse HTTP response to recordscursor_field: Property name for cursor trackingget_updated_state(): Update state with latest cursorentity: CRM object type nameFailure to implement required abstract methods will result in a TypeError at runtime.
Install with Tessl CLI
npx tessl i tessl/pypi-airbyte-source-hubspot