CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-airbyte-source-hubspot

Airbyte source connector for HubSpot that enables data synchronization from HubSpot's CRM and marketing platform to various destinations.

80

1.40x
Overview
Eval results
Files

base-stream-classes.mddocs/

Base Stream Classes

Foundation classes for extending and customizing HubSpot stream functionality. These abstract base classes provide common patterns and interfaces that can be extended to create custom streams or modify existing behavior.

Capabilities

Core Base Classes

Foundation classes that provide common HTTP and streaming functionality.

class BaseStream(HttpStream, ABC):
    """
    Abstract base class for all HubSpot streams.
    
    Provides common functionality including:
    - HTTP client configuration and authentication
    - Error handling and retry logic
    - Scope validation and permissions checking
    - Common request headers and parameters
    - Response parsing and transformation
    
    Abstract methods to implement:
    - path(): API endpoint path
    - parse_response(): Response parsing logic
    """
    
    @abstractmethod
    def path(self, **kwargs) -> str:
        """Return the API endpoint path for this stream."""
    
    @abstractmethod
    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping[str, Any]]:
        """Parse HTTP response into stream records."""
    
    def scope_is_granted(self, granted_scopes: Set[str]) -> bool:
        """Check if required OAuth scopes are granted."""
    
    def properties_scope_is_granted(self) -> bool:
        """Check if property schema scopes are granted."""

Incremental Sync Classes

Classes that support incremental data synchronization with cursor-based state management.

class IncrementalStream(BaseStream, ABC):
    """
    Abstract base class for streams supporting incremental sync.
    
    Provides incremental sync functionality including:
    - Cursor-based state management
    - Automatic filtering by update timestamps
    - State persistence and recovery
    - Lookback window handling
    - Duplicate record detection
    
    Abstract methods to implement:
    - cursor_field: Field name for cursor tracking
    - get_updated_state(): State update logic
    """
    
    @property
    @abstractmethod 
    def cursor_field(self) -> str:
        """Field name used for incremental sync cursor."""
    
    @abstractmethod
    def get_updated_state(
        self, 
        current_stream_state: MutableMapping[str, Any], 
        latest_record: Mapping[str, Any]
    ) -> MutableMapping[str, Any]:
        """Update stream state with latest cursor value."""

class ClientSideIncrementalStream(BaseStream, CheckpointMixin):
    """
    Base class for client-side incremental streams.
    
    Handles incremental sync logic on the client side when
    the API doesn't support server-side filtering.
    
    Features:
    - Client-side record filtering by timestamp
    - Checkpoint-based state management
    - Memory-efficient processing
    - Automatic deduplication
    """

CRM-Specific Classes

Specialized classes for HubSpot CRM object streaming with built-in CRM patterns.

class CRMSearchStream(IncrementalStream, ABC):
    """
    Base class for CRM objects using HubSpot's search API.
    
    Provides CRM-specific functionality including:
    - Search API integration with filters
    - Association loading
    - Property schema discovery
    - Bulk property handling
    - Archived record handling
    
    Features:
    - Automatic property discovery from HubSpot schemas
    - Support for custom properties
    - Association data loading
    - Search-based pagination
    - Incremental sync with search filters
    """
    
    @property
    @abstractmethod
    def entity(self) -> str:
        """CRM entity name (e.g., 'contact', 'company', 'deal')."""

class CRMObjectStream(BaseStream):
    """
    Base class for simple CRM object streams.
    
    For CRM objects that use basic list endpoints
    rather than the search API.
    
    Features:
    - Simple pagination
    - Property filtering
    - Basic error handling
    - Standard CRM object patterns
    """

class CRMObjectIncrementalStream(CRMObjectStream, IncrementalStream):
    """
    Incremental version of CRMObjectStream.
    
    Combines simple CRM object access with incremental
    sync capabilities for objects that support timestamp filtering.
    
    Features:
    - Timestamp-based incremental sync
    - Simple CRM object endpoints
    - Efficient state management
    - Property-based filtering
    """

Specialized Pattern Classes

Classes for specific HubSpot API patterns and use cases.

class AssociationsStream(BaseStream):
    """
    Base class for loading object associations.
    
    Handles HubSpot's association API patterns including:
    - Association type management
    - Bidirectional relationship handling
    - Association metadata
    - Bulk association loading
    
    Features:
    - Multiple association type support
    - Pagination for large association sets
    - Association metadata inclusion
    - Performance optimizations
    """
    
    def __init__(
        self, 
        parent_stream: Stream, 
        identifiers: Iterable[Union[int, str]], 
        *args, 
        **kwargs
    ):
        """
        Initialize associations stream.
        
        Parameters:
        - parent_stream: Source stream for object IDs
        - identifiers: Object IDs to load associations for
        """

Usage Examples

Custom CRM Stream

from source_hubspot.streams import CRMSearchStream
from typing import Any, Iterable, Mapping, MutableMapping

class CustomObjectStream(CRMSearchStream):
    """Custom stream for a specific HubSpot custom object."""
    
    entity = "my_custom_object"  # HubSpot custom object name
    scopes = {"crm.objects.custom.read", "crm.schemas.custom.read"}
    
    def path(self, **kwargs) -> str:
        return f"/crm/v3/objects/{self.entity}"
    
    def get_json_schema(self) -> Mapping[str, Any]:
        # Define custom schema or use dynamic discovery
        return {
            "type": "object",
            "properties": {
                "id": {"type": "string"},
                "properties": {"type": "object"},
                "createdAt": {"type": "string", "format": "date-time"},
                "updatedAt": {"type": "string", "format": "date-time"}
            }
        }

# Usage
custom_stream = CustomObjectStream(
    api=api,
    start_date="2023-01-01T00:00:00Z",
    credentials=credentials
)

Custom Incremental Stream

class CustomIncrementalStream(IncrementalStream):
    """Custom stream with incremental sync."""
    
    primary_key = "id"
    cursor_field = "updatedAt"
    
    def path(self, **kwargs) -> str:
        return "/custom/api/endpoint"
    
    def request_params(
        self, 
        stream_state: Mapping[str, Any], 
        stream_slice: Mapping[str, Any] = None,
        next_page_token: Mapping[str, Any] = None
    ) -> MutableMapping[str, Any]:
        params = {"limit": 100}
        
        # Add incremental filter
        if stream_state and self.cursor_field in stream_state:
            params["since"] = stream_state[self.cursor_field]
            
        # Add pagination
        if next_page_token:
            params["offset"] = next_page_token["offset"]
            
        return params
    
    def parse_response(
        self, 
        response: requests.Response, 
        **kwargs
    ) -> Iterable[Mapping[str, Any]]:
        data = response.json()
        yield from data.get("results", [])
    
    def get_updated_state(
        self, 
        current_stream_state: MutableMapping[str, Any], 
        latest_record: Mapping[str, Any]
    ) -> MutableMapping[str, Any]:
        current_cursor = current_stream_state.get(self.cursor_field)
        latest_cursor = latest_record.get(self.cursor_field)
        
        if not current_cursor or latest_cursor > current_cursor:
            return {self.cursor_field: latest_cursor}
        return current_stream_state

Client-Side Incremental Stream

class CustomClientSideStream(ClientSideIncrementalStream):
    """Stream that handles incremental sync client-side."""
    
    primary_key = "id"
    cursor_field = "modified_date"
    
    def path(self, **kwargs) -> str:
        return "/api/all-records"  # API doesn't support filtering
    
    def parse_response(
        self, 
        response: requests.Response, 
        **kwargs
    ) -> Iterable[Mapping[str, Any]]:
        data = response.json()
        records = data.get("items", [])
        
        # Client-side filtering based on state
        stream_state = kwargs.get("stream_state", {})
        last_modified = stream_state.get(self.cursor_field)
        
        for record in records:
            if not last_modified or record[self.cursor_field] > last_modified:
                yield record

Extension Patterns

Adding Custom Properties

class EnhancedContactsStream(Contacts):
    """Contacts stream with additional custom processing."""
    
    def parse_response(
        self, 
        response: requests.Response, 
        **kwargs
    ) -> Iterable[Mapping[str, Any]]:
        # Get base records
        for record in super().parse_response(response, **kwargs):
            # Add custom processing
            if "properties" in record:
                record["computed_score"] = self._calculate_score(record["properties"])
            yield record
    
    def _calculate_score(self, properties: Mapping[str, Any]) -> int:
        """Custom scoring logic."""
        score = 0
        if properties.get("email"):
            score += 10
        if properties.get("company"):
            score += 20
        return score

Custom Error Handling

class RobustStream(BaseStream):
    """Stream with enhanced error handling."""
    
    def parse_response(
        self, 
        response: requests.Response, 
        **kwargs
    ) -> Iterable[Mapping[str, Any]]:
        try:
            data = response.json()
        except json.JSONDecodeError:
            self.logger.warning(f"Invalid JSON response: {response.text}")
            return
            
        if "errors" in data:
            for error in data["errors"]:
                self.logger.error(f"API Error: {error}")
            return
            
        yield from data.get("results", [])

Abstract Method Requirements

When extending base classes, you must implement these abstract methods:

BaseStream Requirements

  • path(): Return API endpoint path
  • parse_response(): Parse HTTP response to records

IncrementalStream Additional Requirements

  • cursor_field: Property name for cursor tracking
  • get_updated_state(): Update state with latest cursor

CRMSearchStream Additional Requirements

  • entity: CRM object type name

Failure to implement required abstract methods will result in a TypeError at runtime.

Install with Tessl CLI

npx tessl i tessl/pypi-airbyte-source-hubspot

docs

api-client.md

base-stream-classes.md

crm-streams.md

custom-objects.md

engagement-streams.md

error-handling.md

index.md

marketing-sales-streams.md

property-history-streams.md

source-connector.md

web-analytics.md

tile.json