Airbyte source connector for HubSpot that enables data synchronization from HubSpot's CRM and marketing platform to various destinations.
80
The primary source connector functionality for creating and managing the HubSpot data integration. Provides connection checking, stream discovery, authentication scope validation, and custom object stream generation.
Creates a new SourceHubspot instance with configuration, catalog, and state management.
class SourceHubspot(YamlDeclarativeSource):
def __init__(
catalog: Optional[ConfiguredAirbyteCatalog],
config: Optional[Mapping[str, Any]],
state: TState,
**kwargs
):
"""
Initialize the HubSpot source connector.
Parameters:
- catalog: Configured Airbyte catalog with stream selections
- config: Source configuration including credentials and settings
- state: Current synchronization state for incremental streams
- **kwargs: Additional keyword arguments
"""Validates connectivity and authentication to the HubSpot API.
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
"""
Check connection to HubSpot API.
Parameters:
- logger: Logger instance for connection checking
- config: Configuration with credentials and settings
Returns:
- Tuple of (is_healthy: bool, error_message: Optional[Any])
"""Discovers and returns available streams based on configuration and OAuth scopes.
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
Get list of available streams based on configuration and granted scopes.
Parameters:
- config: Source configuration including credentials and settings
Returns:
- List of available Stream instances
"""Retrieves granted OAuth scopes to determine stream availability.
def get_granted_scopes(self, authenticator) -> List[str]:
"""
Get OAuth scopes granted to the application.
Parameters:
- authenticator: OAuth authenticator instance
Returns:
- List of granted scope strings
"""Creates API client instances with proper configuration.
@staticmethod
def get_api(config: Mapping[str, Any]) -> API:
"""
Factory method to create API client instance.
Parameters:
- config: Configuration with credentials
Returns:
- Configured API client instance
"""Extracts and standardizes common parameters for stream initialization.
def get_common_params(self, config) -> Mapping[str, Any]:
"""
Extract common parameters for stream initialization.
Parameters:
- config: Source configuration
Returns:
- Dictionary of common parameters (api, start_date, credentials, etc.)
"""Generates stream instances for HubSpot custom objects.
def get_custom_object_streams(self, api: API, common_params: Mapping[str, Any]) -> Generator[CustomObject, None, None]:
"""
Generate custom object stream instances.
Parameters:
- api: API client instance
- common_params: Common parameters for stream initialization
Yields:
- CustomObject stream instances for each custom object type
"""Generates web analytics stream instances for custom objects (experimental feature).
def get_web_analytics_custom_objects_stream(
self,
custom_object_stream_instances: List[CustomObject],
common_params: Any
) -> Generator[WebAnalyticsStream, None, None]:
"""
Generate web analytics streams for custom objects.
Parameters:
- custom_object_stream_instances: List of custom object stream instances
- common_params: Common parameters for stream initialization
Yields:
- WebAnalyticsStream instances for each custom object
"""from source_hubspot import SourceHubspot
import logging
# Configure OAuth credentials
config = {
"credentials": {
"credentials_title": "OAuth Credentials",
"client_id": "your_client_id",
"client_secret": "your_client_secret",
"refresh_token": "your_refresh_token"
},
"start_date": "2023-01-01T00:00:00Z"
}
# Create connector instance
source = SourceHubspot(catalog=None, config=config, state=None)
# Test connection
logger = logging.getLogger("test")
is_healthy, error = source.check_connection(logger, config)
if is_healthy:
print("Connection successful!")
# Get available streams
streams = source.streams(config)
print(f"Available streams: {[stream.name for stream in streams]}")
else:
print(f"Connection failed: {error}")# Configure Private App credentials
config = {
"credentials": {
"credentials_title": "Private App Credentials",
"access_token": "your_private_app_token"
},
"start_date": "2023-01-01T00:00:00Z"
}
source = SourceHubspot(catalog=None, config=config, state=None)
streams = source.streams(config)config = {
"credentials": {
"credentials_title": "OAuth Credentials",
"client_id": "your_client_id",
"client_secret": "your_client_secret",
"refresh_token": "your_refresh_token"
},
"start_date": "2023-01-01T00:00:00Z",
"enable_experimental_streams": True
}
source = SourceHubspot(catalog=None, config=config, state=None)
streams = source.streams(config)
# Will include web analytics streams if experimental streams are enabled
web_analytics_streams = [s for s in streams if "WebAnalytics" in s.__class__.__name__]DEFAULT_START_DATE = "2006-06-01T00:00:00Z" # Default start date if none provided
# OAuth scope requirements for each stream type
scopes: Dict[str, Set[str]] # Stream name to required scopes mapping
properties_scopes: Dict[str, Set[str]] # Property-specific scope requirementsInstall with Tessl CLI
npx tessl i tessl/pypi-airbyte-source-hubspot