CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-airbyte-source-hubspot

Airbyte source connector for HubSpot that enables data synchronization from HubSpot's CRM and marketing platform to various destinations.

80

1.40x
Overview
Eval results
Files

api-client.mddocs/

API Client

HTTP client for HubSpot API communication providing authentication, error handling, retry logic, and custom object metadata discovery. Handles both OAuth 2.0 and Private App authentication methods.

Capabilities

Client Initialization

Creates an authenticated HTTP client for HubSpot API communication.

class API:
    BASE_URL = "https://api.hubapi.com"
    USER_AGENT = "Airbyte"
    
    def __init__(self, credentials: Mapping[str, Any]):
        """
        Initialize API client with credentials.
        
        Parameters:
        - credentials: Authentication credentials (OAuth or Private App)
        """

Authentication Type Detection

Determines the authentication method based on credentials.

def is_oauth2(self) -> bool:
    """
    Check if using OAuth 2.0 authentication.
    
    Returns:
    - True if credentials are OAuth 2.0 type
    """

def is_private_app(self) -> bool:
    """
    Check if using Private App authentication.
    
    Returns:
    - True if credentials are Private App type
    """

Authenticator Creation

Creates appropriate authenticator instance based on credentials.

def get_authenticator(self) -> Optional[Union[Oauth2Authenticator, TokenAuthenticator]]:
    """
    Get authenticator instance based on credentials type.
    
    Returns:
    - Oauth2Authenticator for OAuth credentials
    - TokenAuthenticator for Private App credentials
    - None for unsupported credential types
    """

HTTP Operations

Performs HTTP requests with authentication, error handling, and retry logic.

def get(
    self, 
    url: str, 
    params: MutableMapping[str, Any] = None
) -> Tuple[Union[MutableMapping[str, Any], List[MutableMapping[str, Any]]], requests.Response]:
    """
    Perform authenticated GET request with retry logic.
    
    Parameters:
    - url: API endpoint URL (relative to BASE_URL)
    - params: Query parameters
    
    Returns:
    - Tuple of (parsed_response_data, raw_response)
    """

def post(
    self, 
    url: str, 
    data: Mapping[str, Any], 
    params: MutableMapping[str, Any] = None
) -> Tuple[Union[Mapping[str, Any], List[Mapping[str, Any]]], requests.Response]:
    """
    Perform authenticated POST request.
    
    Parameters:
    - url: API endpoint URL (relative to BASE_URL)
    - data: Request body data
    - params: Query parameters
    
    Returns:
    - Tuple of (parsed_response_data, raw_response)
    """

Custom Object Metadata

Discovers and processes HubSpot custom object schemas and properties.

def get_custom_objects_metadata(self) -> Iterable[Tuple[str, str, Mapping[str, Any]]]:
    """
    Get metadata for all custom objects in the HubSpot account.
    
    Yields:
    - Tuples of (object_name, fully_qualified_name, schema, properties)
    """

def get_properties(self, raw_schema: Mapping[str, Any]) -> Mapping[str, Any]:
    """
    Extract properties from raw schema definition.
    
    Parameters:
    - raw_schema: Raw schema from HubSpot API
    
    Returns:
    - Dictionary mapping property names to property schemas
    """

def generate_schema(self, properties: Mapping[str, Any]) -> Mapping[str, Any]:
    """
    Generate JSON schema from properties.
    
    Parameters:
    - properties: Property definitions
    
    Returns:
    - JSON schema for the object
    """

Error Handling

Parses and handles HubSpot API errors with appropriate exception raising.

@staticmethod
def _parse_and_handle_errors(response) -> Union[MutableMapping[str, Any], List[MutableMapping[str, Any]]]:
    """
    Parse response and handle API errors.
    
    Parameters:
    - response: HTTP response object
    
    Returns:
    - Parsed response data
    
    Raises:
    - HubspotInvalidAuth: For 401/530 authentication errors
    - HubspotRateLimited: For 429 rate limit errors
    - HubspotTimeout: For 502/503 timeout errors
    - HTTPError: For other HTTP errors
    """

Usage Examples

OAuth Client Setup

from source_hubspot.streams import API

# OAuth credentials
credentials = {
    "credentials_title": "OAuth Credentials",
    "client_id": "your_client_id",
    "client_secret": "your_client_secret",
    "refresh_token": "your_refresh_token"
}

# Create API client
api = API(credentials)

# Check authentication type
if api.is_oauth2():
    print("Using OAuth 2.0 authentication")

# Make API request
data, response = api.get("/crm/v3/objects/contacts", {"limit": 10})
print(f"Retrieved {len(data.get('results', []))} contacts")

Private App Client Setup

# Private App credentials
credentials = {
    "credentials_title": "Private App Credentials",
    "access_token": "your_private_app_token"
}

api = API(credentials)

if api.is_private_app():
    print("Using Private App authentication")

# Get contact data
data, response = api.get("/crm/v3/objects/contacts")

Custom Objects Discovery

api = API(credentials)

# Discover custom objects
for name, fqn, schema, properties in api.get_custom_objects_metadata():
    print(f"Found custom object: {name}")
    print(f"Fully qualified name: {fqn}")
    print(f"Properties: {list(properties.keys())}")
    print(f"Schema: {schema}")

Error Handling

from source_hubspot.errors import (
    HubspotInvalidAuth, HubspotRateLimited, HubspotTimeout
)

try:
    data, response = api.get("/crm/v3/objects/contacts")
except HubspotInvalidAuth as e:
    print(f"Authentication failed: {e}")
except HubspotRateLimited as e:
    retry_after = e.response.headers.get("Retry-After")
    print(f"Rate limited. Retry after {retry_after} seconds")
except HubspotTimeout as e:
    print(f"Request timeout: {e}")

POST Requests

# Create a new contact
contact_data = {
    "properties": {
        "email": "test@example.com",
        "firstname": "John",
        "lastname": "Doe"
    }
}

try:
    data, response = api.post("/crm/v3/objects/contacts", contact_data)
    print(f"Created contact with ID: {data['id']}")
except Exception as e:
    print(f"Failed to create contact: {e}")

Error Response Handling

The API client automatically handles various HubSpot API error conditions:

  • 400 Bad Request: Logs warning and raises HTTPError
  • 401 Unauthorized: Raises HubspotInvalidAuth
  • 403 Forbidden: Logs warning and raises HTTPError
  • 429 Too Many Requests: Raises HubspotRateLimited with retry-after header
  • 502/503 Server Errors: Raises HubspotTimeout for retry
  • 530 Cloudflare DNS Error: Raises HubspotInvalidAuth (invalid API token format)

Constants

BASE_URL: str = "https://api.hubapi.com"  # HubSpot API base URL
USER_AGENT: str = "Airbyte"  # User agent for requests

Install with Tessl CLI

npx tessl i tessl/pypi-airbyte-source-hubspot

docs

api-client.md

base-stream-classes.md

crm-streams.md

custom-objects.md

engagement-streams.md

error-handling.md

index.md

marketing-sales-streams.md

property-history-streams.md

source-connector.md

web-analytics.md

tile.json