Airbyte source connector for HubSpot that enables data synchronization from HubSpot's CRM and marketing platform to various destinations.
80
Comprehensive error handling classes for HubSpot API-specific errors including authentication, rate limiting, timeouts, and permission issues. The connector provides detailed error information and appropriate retry strategies.
Foundation error class for all HubSpot-specific exceptions.
class HubspotError(AirbyteTracedException):
"""
Base error class for HubSpot connector exceptions.
Subclasses HTTPError to maintain compatibility with existing
error handling code while providing enhanced error context.
"""
def __init__(
self,
internal_message: str = None,
message: str = None,
failure_type: FailureType = FailureType.system_error,
exception: BaseException = None,
response: requests.Response = None
):
"""
Initialize HubSpot error with detailed context.
Parameters:
- internal_message: Internal error message for debugging
- message: User-facing error message
- failure_type: Classification of failure type
- exception: Original exception that caused this error
- response: HTTP response object if applicable
"""Errors related to authentication and authorization failures.
class HubspotInvalidAuth(HubspotError):
"""
401 Unauthorized error.
Raised when:
- Invalid API credentials provided
- OAuth token has expired or been revoked
- Private app token is invalid or malformed
- Cloudflare DNS error (530) indicating token format issues
This error indicates the user needs to check and refresh
their authentication credentials.
"""
class HubspotAccessDenied(HubspotError):
"""
403 Forbidden error.
Raised when:
- User lacks permissions for requested resource
- OAuth scopes are insufficient for the operation
- Private app permissions don't include required access
This error indicates the user needs additional permissions
or OAuth scopes to access the requested resource.
"""Errors related to API rate limits and request throttling.
class HubspotRateLimited(HTTPError):
"""
429 Too Many Requests error.
Raised when HubSpot API rate limits are exceeded.
The connector automatically handles retry logic based on
the 'Retry-After' header provided by HubSpot.
Rate limits vary by:
- API endpoint type
- HubSpot subscription level
- Authentication method (OAuth vs Private App)
"""Errors related to server issues and request timeouts.
class HubspotTimeout(HTTPError):
"""
502/503/504 Server timeout errors.
HubSpot has processing limits to prevent performance degradation.
These responses indicate those limits have been hit.
Recommended action:
- Pause requests for a few seconds
- Retry with exponential backoff
- Consider reducing request frequency
"""Errors related to malformed or invalid requests.
class HubspotBadRequest(HubspotError):
"""
400 Bad Request error.
Raised when:
- Request parameters are invalid or malformed
- Required fields are missing
- Data validation fails
- API endpoint doesn't exist
This error indicates the request needs to be corrected
before retrying.
"""Errors related to connector configuration and setup.
class InvalidStartDateConfigError(Exception):
"""
Raised when invalid start_date is provided in configuration.
The start_date must be a valid ISO 8601 datetime string.
Common issues:
- Incorrect date format
- Invalid timezone specification
- Future dates that don't make sense for historical sync
"""
def __init__(self, actual_value: Any, message: str):
"""
Initialize with details about the invalid start_date.
Parameters:
- actual_value: The invalid value that was provided
- message: Detailed error message explaining the issue
"""from source_hubspot.streams import API
from source_hubspot.errors import (
HubspotInvalidAuth, HubspotRateLimited, HubspotTimeout,
HubspotAccessDenied, HubspotBadRequest
)
import time
api = API(credentials)
try:
data, response = api.get("/crm/v3/objects/contacts")
print(f"Retrieved {len(data.get('results', []))} contacts")
except HubspotInvalidAuth as e:
print(f"Authentication failed: {e}")
print("Please check your credentials and try again")
except HubspotAccessDenied as e:
print(f"Access denied: {e}")
print("Check your OAuth scopes or Private App permissions")
except HubspotRateLimited as e:
retry_after = e.response.headers.get("Retry-After", "60")
print(f"Rate limited. Retrying after {retry_after} seconds...")
time.sleep(int(retry_after))
# Retry the request
except HubspotTimeout as e:
print(f"Request timeout: {e}")
print("Pausing before retry...")
time.sleep(5)
# Retry with backoff
except HubspotBadRequest as e:
print(f"Bad request: {e}")
print("Check your request parameters and try again")import time
import random
from typing import Tuple, Any
def api_call_with_retry(
api: API,
endpoint: str,
params: dict = None,
max_retries: int = 3
) -> Tuple[Any, bool]:
"""
Make API call with comprehensive error handling and retry logic.
Returns:
- Tuple of (data, success)
"""
for attempt in range(max_retries + 1):
try:
data, response = api.get(endpoint, params)
return data, True
except HubspotInvalidAuth as e:
print(f"Authentication error: {e}")
return None, False # Don't retry auth errors
except HubspotAccessDenied as e:
print(f"Access denied: {e}")
return None, False # Don't retry permission errors
except HubspotRateLimited as e:
if attempt < max_retries:
retry_after = int(e.response.headers.get("Retry-After", "60"))
print(f"Rate limited. Waiting {retry_after} seconds before retry {attempt + 1}")
time.sleep(retry_after)
continue
else:
print("Max retries exceeded for rate limiting")
return None, False
except HubspotTimeout as e:
if attempt < max_retries:
# Exponential backoff with jitter
backoff_time = (2 ** attempt) + random.uniform(0, 1)
print(f"Timeout error. Retrying in {backoff_time:.1f} seconds")
time.sleep(backoff_time)
continue
else:
print("Max retries exceeded for timeout")
return None, False
except HubspotBadRequest as e:
print(f"Bad request error: {e}")
return None, False # Don't retry bad requests
except Exception as e:
print(f"Unexpected error: {e}")
if attempt < max_retries:
time.sleep(2 ** attempt)
continue
return None, False
return None, False
# Usage
data, success = api_call_with_retry(api, "/crm/v3/objects/contacts", {"limit": 100})
if success:
print(f"Successfully retrieved data: {len(data.get('results', []))} records")
else:
print("Failed to retrieve data after retries")from source_hubspot import SourceHubspot
from source_hubspot.errors import HubspotInvalidAuth, HubspotAccessDenied
import logging
def test_hubspot_connection(config: dict) -> Tuple[bool, str]:
"""
Test HubSpot connection with detailed error reporting.
Returns:
- Tuple of (is_healthy, error_message)
"""
try:
source = SourceHubspot(catalog=None, config=config, state=None)
logger = logging.getLogger("connection_test")
is_healthy, error = source.check_connection(logger, config)
if is_healthy:
return True, "Connection successful"
else:
return False, f"Connection failed: {error}"
except HubspotInvalidAuth:
return False, "Invalid authentication credentials. Please verify your client ID, client secret, and refresh token."
except HubspotAccessDenied:
return False, "Access denied. Please check your OAuth scopes or Private App permissions."
except InvalidStartDateConfigError as e:
return False, f"Invalid start_date configuration: {e}"
except Exception as e:
return False, f"Unexpected error during connection test: {e}"
# Test different credential configurations
oauth_config = {
"credentials": {
"credentials_title": "OAuth Credentials",
"client_id": "test_client_id",
"client_secret": "test_secret",
"refresh_token": "test_token"
},
"start_date": "2023-01-01T00:00:00Z"
}
is_healthy, message = test_hubspot_connection(oauth_config)
print(f"OAuth connection test: {'PASS' if is_healthy else 'FAIL'} - {message}")from source_hubspot.streams import Contacts
def safe_stream_read(stream, sync_mode="full_refresh", max_errors=5):
"""
Read from stream with error handling and error counting.
"""
error_count = 0
records_processed = 0
try:
for record in stream.read_records(sync_mode=sync_mode):
try:
# Process record
records_processed += 1
yield record
except Exception as e:
error_count += 1
print(f"Error processing record {records_processed}: {e}")
if error_count >= max_errors:
print(f"Max errors ({max_errors}) exceeded. Stopping stream read.")
break
except HubspotRateLimited as e:
print(f"Stream read rate limited: {e}")
retry_after = e.response.headers.get("Retry-After", "60")
print(f"Consider pausing for {retry_after} seconds before continuing")
except HubspotInvalidAuth as e:
print(f"Authentication error during stream read: {e}")
print("Stream read cannot continue without valid authentication")
finally:
print(f"Stream read completed. Processed: {records_processed}, Errors: {error_count}")
# Usage
contacts = Contacts(api=api, start_date="2023-01-01T00:00:00Z", credentials=credentials)
for record in safe_stream_read(contacts):
# Process each record safely
contact_email = record['properties'].get('email', 'No email')
print(f"Contact: {contact_email}")The HubSpot API provides detailed error information in response bodies:
# Example error response structure
{
"status": "error",
"message": "This request is not allowed for this app",
"category": "PERMISSION_ERROR",
"subCategory": "SCOPE_MISSING",
"context": {
"missingScopes": ["contacts"],
"requiredScopes": ["contacts", "crm.objects.contacts.read"]
}
}The connector parses these responses and includes relevant details in exception messages to help with troubleshooting.
Install with Tessl CLI
npx tessl i tessl/pypi-airbyte-source-hubspot