Airbyte source connector for HubSpot that enables data synchronization from HubSpot's CRM and marketing platform to various destinations.
80
HTTP client for HubSpot API communication providing authentication, error handling, retry logic, and custom object metadata discovery. Handles both OAuth 2.0 and Private App authentication methods.
Creates an authenticated HTTP client for HubSpot API communication.
class API:
BASE_URL = "https://api.hubapi.com"
USER_AGENT = "Airbyte"
def __init__(self, credentials: Mapping[str, Any]):
"""
Initialize API client with credentials.
Parameters:
- credentials: Authentication credentials (OAuth or Private App)
"""Determines the authentication method based on credentials.
def is_oauth2(self) -> bool:
"""
Check if using OAuth 2.0 authentication.
Returns:
- True if credentials are OAuth 2.0 type
"""
def is_private_app(self) -> bool:
"""
Check if using Private App authentication.
Returns:
- True if credentials are Private App type
"""Creates appropriate authenticator instance based on credentials.
def get_authenticator(self) -> Optional[Union[Oauth2Authenticator, TokenAuthenticator]]:
"""
Get authenticator instance based on credentials type.
Returns:
- Oauth2Authenticator for OAuth credentials
- TokenAuthenticator for Private App credentials
- None for unsupported credential types
"""Performs HTTP requests with authentication, error handling, and retry logic.
def get(
self,
url: str,
params: MutableMapping[str, Any] = None
) -> Tuple[Union[MutableMapping[str, Any], List[MutableMapping[str, Any]]], requests.Response]:
"""
Perform authenticated GET request with retry logic.
Parameters:
- url: API endpoint URL (relative to BASE_URL)
- params: Query parameters
Returns:
- Tuple of (parsed_response_data, raw_response)
"""
def post(
self,
url: str,
data: Mapping[str, Any],
params: MutableMapping[str, Any] = None
) -> Tuple[Union[Mapping[str, Any], List[Mapping[str, Any]]], requests.Response]:
"""
Perform authenticated POST request.
Parameters:
- url: API endpoint URL (relative to BASE_URL)
- data: Request body data
- params: Query parameters
Returns:
- Tuple of (parsed_response_data, raw_response)
"""Discovers and processes HubSpot custom object schemas and properties.
def get_custom_objects_metadata(self) -> Iterable[Tuple[str, str, Mapping[str, Any]]]:
"""
Get metadata for all custom objects in the HubSpot account.
Yields:
- Tuples of (object_name, fully_qualified_name, schema, properties)
"""
def get_properties(self, raw_schema: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Extract properties from raw schema definition.
Parameters:
- raw_schema: Raw schema from HubSpot API
Returns:
- Dictionary mapping property names to property schemas
"""
def generate_schema(self, properties: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Generate JSON schema from properties.
Parameters:
- properties: Property definitions
Returns:
- JSON schema for the object
"""Parses and handles HubSpot API errors with appropriate exception raising.
@staticmethod
def _parse_and_handle_errors(response) -> Union[MutableMapping[str, Any], List[MutableMapping[str, Any]]]:
"""
Parse response and handle API errors.
Parameters:
- response: HTTP response object
Returns:
- Parsed response data
Raises:
- HubspotInvalidAuth: For 401/530 authentication errors
- HubspotRateLimited: For 429 rate limit errors
- HubspotTimeout: For 502/503 timeout errors
- HTTPError: For other HTTP errors
"""from source_hubspot.streams import API
# OAuth credentials
credentials = {
"credentials_title": "OAuth Credentials",
"client_id": "your_client_id",
"client_secret": "your_client_secret",
"refresh_token": "your_refresh_token"
}
# Create API client
api = API(credentials)
# Check authentication type
if api.is_oauth2():
print("Using OAuth 2.0 authentication")
# Make API request
data, response = api.get("/crm/v3/objects/contacts", {"limit": 10})
print(f"Retrieved {len(data.get('results', []))} contacts")# Private App credentials
credentials = {
"credentials_title": "Private App Credentials",
"access_token": "your_private_app_token"
}
api = API(credentials)
if api.is_private_app():
print("Using Private App authentication")
# Get contact data
data, response = api.get("/crm/v3/objects/contacts")api = API(credentials)
# Discover custom objects
for name, fqn, schema, properties in api.get_custom_objects_metadata():
print(f"Found custom object: {name}")
print(f"Fully qualified name: {fqn}")
print(f"Properties: {list(properties.keys())}")
print(f"Schema: {schema}")from source_hubspot.errors import (
HubspotInvalidAuth, HubspotRateLimited, HubspotTimeout
)
try:
data, response = api.get("/crm/v3/objects/contacts")
except HubspotInvalidAuth as e:
print(f"Authentication failed: {e}")
except HubspotRateLimited as e:
retry_after = e.response.headers.get("Retry-After")
print(f"Rate limited. Retry after {retry_after} seconds")
except HubspotTimeout as e:
print(f"Request timeout: {e}")# Create a new contact
contact_data = {
"properties": {
"email": "test@example.com",
"firstname": "John",
"lastname": "Doe"
}
}
try:
data, response = api.post("/crm/v3/objects/contacts", contact_data)
print(f"Created contact with ID: {data['id']}")
except Exception as e:
print(f"Failed to create contact: {e}")The API client automatically handles various HubSpot API error conditions:
BASE_URL: str = "https://api.hubapi.com" # HubSpot API base URL
USER_AGENT: str = "Airbyte" # User agent for requestsInstall with Tessl CLI
npx tessl i tessl/pypi-airbyte-source-hubspot