CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-airbyte-source-hubspot

Airbyte source connector for HubSpot that enables data synchronization from HubSpot's CRM and marketing platform to various destinations.

80

1.40x
Overview
Eval results
Files

crm-streams.mddocs/

CRM Data Streams

Stream classes for core HubSpot CRM objects including contacts, companies, deals, and tickets. These streams provide access to the primary business objects in HubSpot with incremental sync capabilities and comprehensive property support.

Capabilities

Base CRM Stream Classes

Foundation classes that provide common functionality for CRM object streams.

class BaseStream(HttpStream, ABC):
    """
    Abstract base class for all HubSpot streams.
    
    Properties:
    - url_base: Base URL for API requests
    - primary_key: Primary key field(s) for the stream
    - properties: Available properties for the object type
    """
    
    def scope_is_granted(self, granted_scopes: Set[str]) -> bool:
        """Check if required scopes are granted for this stream."""
    
    def properties_scope_is_granted(self) -> bool:
        """Check if properties-related scopes are granted."""

class IncrementalStream(BaseStream, ABC):
    """
    Abstract base class for incremental streams.
    
    Properties:
    - state_checkpoint_interval: Checkpoint interval for state management
    - cursor_field: Field used for incremental sync
    """
    
    def get_updated_state(self, current_stream_state, latest_record) -> MutableMapping[str, Any]:
        """Update stream state with latest record information."""

class CRMSearchStream(IncrementalStream, ABC):
    """
    Base class for CRM object streams using HubSpot's search API.
    
    Properties:
    - search_url: Search endpoint URL
    - associations: Object associations to include
    - query_params: Query parameters for search
    """

Contact Streams

Access to HubSpot contact data with full property support and incremental sync.

class Contacts(CRMSearchStream):
    """
    Stream for HubSpot contact records.
    
    Provides access to contact data including:
    - Basic contact information (name, email, phone)
    - Custom contact properties
    - Lifecycle stage information
    - Contact associations
    - Property history (if scopes granted)
    """

Company Streams

Access to HubSpot company data with comprehensive business information.

class Companies(CRMSearchStream):
    """
    Stream for HubSpot company records.
    
    Provides access to company data including:
    - Company information (name, domain, industry)
    - Custom company properties
    - Company associations with contacts and deals
    - Property history (if scopes granted)
    """

Deal Streams

Access to HubSpot deal data including pipeline and sales information.

class Deals(CRMSearchStream):
    """
    Stream for HubSpot deal records.
    
    Provides access to deal data including:
    - Deal information (name, amount, stage)
    - Pipeline and deal stage data
    - Custom deal properties
    - Deal associations with contacts and companies
    - Property history (if scopes granted)
    """

class DealsArchived(ClientSideIncrementalStream):
    """
    Stream for archived HubSpot deal records.
    
    Provides access to deals that have been archived,
    with client-side incremental sync capabilities.
    """

class DealSplits(CRMSearchStream):
    """
    Stream for HubSpot deal split records.
    
    Provides access to deal revenue splits and
    commission tracking data.
    """

class Leads(CRMSearchStream):
    """
    Stream for HubSpot lead records.
    
    Provides access to lead data including:
    - Lead qualification status
    - Lead source and campaign attribution
    - Lead scoring and ranking
    - Conversion tracking
    - Lead lifecycle stage progression
    """

Ticket Streams

Access to HubSpot service ticket data for customer support workflows.

class Tickets(CRMSearchStream):
    """
    Stream for HubSpot ticket records.
    
    Provides access to support ticket data including:
    - Ticket information (subject, status, priority)
    - Ticket pipeline and stage data
    - Custom ticket properties
    - Ticket associations with contacts and companies
    """

Usage Examples

Basic Contact Data Access

from source_hubspot.streams import Contacts, API

# Setup API client
credentials = {
    "credentials_title": "OAuth Credentials",
    "client_id": "your_client_id",
    "client_secret": "your_client_secret",
    "refresh_token": "your_refresh_token"
}
api = API(credentials)

# Create contacts stream
contacts = Contacts(
    api=api,
    start_date="2023-01-01T00:00:00Z",
    credentials=credentials
)

# Read contact data
for record in contacts.read_records(sync_mode="full_refresh"):
    print(f"Contact: {record['properties']['email']}")
    print(f"Name: {record['properties'].get('firstname', '')} {record['properties'].get('lastname', '')}")

Incremental Company Sync

from source_hubspot.streams import Companies
from airbyte_cdk.models import SyncMode

# Create companies stream
companies = Companies(
    api=api,
    start_date="2023-01-01T00:00:00Z",
    credentials=credentials
)

# Read with incremental sync
stream_state = {}
for record in companies.read_records(
    sync_mode=SyncMode.incremental,
    stream_state=stream_state
):
    print(f"Company: {record['properties']['name']}")
    print(f"Domain: {record['properties'].get('domain', 'N/A')}")
    
    # Update state
    stream_state = companies.get_updated_state(stream_state, record)

Deal Pipeline Analysis

from source_hubspot.streams import Deals

deals = Deals(
    api=api,
    start_date="2023-01-01T00:00:00Z", 
    credentials=credentials
)

# Analyze deals by pipeline stage
deal_stages = {}
for record in deals.read_records(sync_mode="full_refresh"):
    stage = record['properties'].get('dealstage', 'Unknown')
    amount = float(record['properties'].get('amount', 0) or 0)
    
    if stage not in deal_stages:
        deal_stages[stage] = {'count': 0, 'total_amount': 0}
    
    deal_stages[stage]['count'] += 1
    deal_stages[stage]['total_amount'] += amount

for stage, data in deal_stages.items():
    print(f"{stage}: {data['count']} deals, ${data['total_amount']:,.2f}")

Support Ticket Stream

from source_hubspot.streams import Tickets

tickets = Tickets(
    api=api,
    start_date="2023-01-01T00:00:00Z",
    credentials=credentials
)

# Process support tickets
for record in tickets.read_records(sync_mode="full_refresh"):
    ticket_id = record['id']
    subject = record['properties'].get('subject', 'No subject')
    status = record['properties'].get('hs_ticket_status', 'Unknown')
    priority = record['properties'].get('hs_ticket_priority', 'Normal')
    
    print(f"Ticket {ticket_id}: {subject}")
    print(f"Status: {status}, Priority: {priority}")

Working with Associations

# CRM streams support associations between objects
contacts = Contacts(
    api=api,
    start_date="2023-01-01T00:00:00Z",
    credentials=credentials
)

for record in contacts.read_records(sync_mode="full_refresh"):
    contact_id = record['id']
    email = record['properties']['email']
    
    # Check for associated companies
    associations = record.get('associations', {})
    if 'companies' in associations:
        company_ids = [assoc['id'] for assoc in associations['companies']['results']]
        print(f"Contact {email} associated with companies: {company_ids}")

Stream Configuration

Required Parameters

All CRM streams require these common parameters:

  • api: API client instance with authentication
  • start_date: ISO 8601 datetime string for incremental sync starting point
  • credentials: Authentication credentials object

Optional Parameters

  • acceptance_test_config: Configuration for acceptance testing (used internally)

Stream Properties

Each CRM stream provides access to:

  • name: Stream name (e.g., "contacts", "companies")
  • primary_key: Primary key field (typically "id")
  • cursor_field: Field used for incremental sync (typically "updatedAt")
  • properties: Available object properties based on HubSpot schema

OAuth Scopes

CRM streams require specific OAuth scopes:

  • Contacts: crm.objects.contacts.read
  • Companies: crm.objects.contacts.read, crm.objects.companies.read
  • Deals: contacts, crm.objects.deals.read
  • Tickets: tickets

Additional scopes for property history:

  • Contacts: crm.schemas.contacts.read
  • Companies: crm.schemas.companies.read
  • Deals: crm.schemas.deals.read

Install with Tessl CLI

npx tessl i tessl/pypi-airbyte-source-hubspot

docs

api-client.md

base-stream-classes.md

crm-streams.md

custom-objects.md

engagement-streams.md

error-handling.md

index.md

marketing-sales-streams.md

property-history-streams.md

source-connector.md

web-analytics.md

tile.json