CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-airbyte-source-hubspot

Airbyte source connector for HubSpot that enables data synchronization from HubSpot's CRM and marketing platform to various destinations.

80

1.40x
Overview
Eval results
Files

source-connector.mddocs/

Source Connector

The primary source connector functionality for creating and managing the HubSpot data integration. Provides connection checking, stream discovery, authentication scope validation, and custom object stream generation.

Capabilities

Connector Initialization

Creates a new SourceHubspot instance with configuration, catalog, and state management.

class SourceHubspot(YamlDeclarativeSource):
    def __init__(
        catalog: Optional[ConfiguredAirbyteCatalog], 
        config: Optional[Mapping[str, Any]], 
        state: TState, 
        **kwargs
    ):
        """
        Initialize the HubSpot source connector.
        
        Parameters:
        - catalog: Configured Airbyte catalog with stream selections
        - config: Source configuration including credentials and settings
        - state: Current synchronization state for incremental streams
        - **kwargs: Additional keyword arguments
        """

Connection Validation

Validates connectivity and authentication to the HubSpot API.

def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
    """
    Check connection to HubSpot API.
    
    Parameters:
    - logger: Logger instance for connection checking
    - config: Configuration with credentials and settings
    
    Returns:
    - Tuple of (is_healthy: bool, error_message: Optional[Any])
    """

Stream Discovery

Discovers and returns available streams based on configuration and OAuth scopes.

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
    """
    Get list of available streams based on configuration and granted scopes.
    
    Parameters:
    - config: Source configuration including credentials and settings
    
    Returns:
    - List of available Stream instances
    """

Scope Management

Retrieves granted OAuth scopes to determine stream availability.

def get_granted_scopes(self, authenticator) -> List[str]:
    """
    Get OAuth scopes granted to the application.
    
    Parameters:
    - authenticator: OAuth authenticator instance
    
    Returns:
    - List of granted scope strings
    """

API Factory

Creates API client instances with proper configuration.

@staticmethod
def get_api(config: Mapping[str, Any]) -> API:
    """
    Factory method to create API client instance.
    
    Parameters:
    - config: Configuration with credentials
    
    Returns:
    - Configured API client instance
    """

Common Parameters

Extracts and standardizes common parameters for stream initialization.

def get_common_params(self, config) -> Mapping[str, Any]:
    """
    Extract common parameters for stream initialization.
    
    Parameters:
    - config: Source configuration
    
    Returns:
    - Dictionary of common parameters (api, start_date, credentials, etc.)
    """

Custom Object Stream Generation

Generates stream instances for HubSpot custom objects.

def get_custom_object_streams(self, api: API, common_params: Mapping[str, Any]) -> Generator[CustomObject, None, None]:
    """
    Generate custom object stream instances.
    
    Parameters:
    - api: API client instance
    - common_params: Common parameters for stream initialization
    
    Yields:
    - CustomObject stream instances for each custom object type
    """

Web Analytics Custom Object Streams

Generates web analytics stream instances for custom objects (experimental feature).

def get_web_analytics_custom_objects_stream(
    self, 
    custom_object_stream_instances: List[CustomObject], 
    common_params: Any
) -> Generator[WebAnalyticsStream, None, None]:
    """
    Generate web analytics streams for custom objects.
    
    Parameters:
    - custom_object_stream_instances: List of custom object stream instances
    - common_params: Common parameters for stream initialization
    
    Yields:
    - WebAnalyticsStream instances for each custom object
    """

Usage Examples

Basic Connector Setup

from source_hubspot import SourceHubspot
import logging

# Configure OAuth credentials
config = {
    "credentials": {
        "credentials_title": "OAuth Credentials",
        "client_id": "your_client_id",
        "client_secret": "your_client_secret", 
        "refresh_token": "your_refresh_token"
    },
    "start_date": "2023-01-01T00:00:00Z"
}

# Create connector instance
source = SourceHubspot(catalog=None, config=config, state=None)

# Test connection
logger = logging.getLogger("test")
is_healthy, error = source.check_connection(logger, config)

if is_healthy:
    print("Connection successful!")
    # Get available streams
    streams = source.streams(config)
    print(f"Available streams: {[stream.name for stream in streams]}")
else:
    print(f"Connection failed: {error}")

Private App Authentication

# Configure Private App credentials
config = {
    "credentials": {
        "credentials_title": "Private App Credentials",
        "access_token": "your_private_app_token"
    },
    "start_date": "2023-01-01T00:00:00Z"
}

source = SourceHubspot(catalog=None, config=config, state=None)
streams = source.streams(config)

With Experimental Streams

config = {
    "credentials": {
        "credentials_title": "OAuth Credentials",
        "client_id": "your_client_id",
        "client_secret": "your_client_secret",
        "refresh_token": "your_refresh_token"
    },
    "start_date": "2023-01-01T00:00:00Z",
    "enable_experimental_streams": True
}

source = SourceHubspot(catalog=None, config=config, state=None)
streams = source.streams(config)

# Will include web analytics streams if experimental streams are enabled
web_analytics_streams = [s for s in streams if "WebAnalytics" in s.__class__.__name__]

Constants

DEFAULT_START_DATE = "2006-06-01T00:00:00Z"  # Default start date if none provided

# OAuth scope requirements for each stream type
scopes: Dict[str, Set[str]]  # Stream name to required scopes mapping
properties_scopes: Dict[str, Set[str]]  # Property-specific scope requirements

Install with Tessl CLI

npx tessl i tessl/pypi-airbyte-source-hubspot

docs

api-client.md

base-stream-classes.md

crm-streams.md

custom-objects.md

engagement-streams.md

error-handling.md

index.md

marketing-sales-streams.md

property-history-streams.md

source-connector.md

web-analytics.md

tile.json