Airbyte source connector for HubSpot that enables data synchronization from HubSpot's CRM and marketing platform to various destinations.
80
Stream classes for core HubSpot CRM objects including contacts, companies, deals, and tickets. These streams provide access to the primary business objects in HubSpot with incremental sync capabilities and comprehensive property support.
Foundation classes that provide common functionality for CRM object streams.
class BaseStream(HttpStream, ABC):
"""
Abstract base class for all HubSpot streams.
Properties:
- url_base: Base URL for API requests
- primary_key: Primary key field(s) for the stream
- properties: Available properties for the object type
"""
def scope_is_granted(self, granted_scopes: Set[str]) -> bool:
"""Check if required scopes are granted for this stream."""
def properties_scope_is_granted(self) -> bool:
"""Check if properties-related scopes are granted."""
class IncrementalStream(BaseStream, ABC):
"""
Abstract base class for incremental streams.
Properties:
- state_checkpoint_interval: Checkpoint interval for state management
- cursor_field: Field used for incremental sync
"""
def get_updated_state(self, current_stream_state, latest_record) -> MutableMapping[str, Any]:
"""Update stream state with latest record information."""
class CRMSearchStream(IncrementalStream, ABC):
"""
Base class for CRM object streams using HubSpot's search API.
Properties:
- search_url: Search endpoint URL
- associations: Object associations to include
- query_params: Query parameters for search
"""Access to HubSpot contact data with full property support and incremental sync.
class Contacts(CRMSearchStream):
"""
Stream for HubSpot contact records.
Provides access to contact data including:
- Basic contact information (name, email, phone)
- Custom contact properties
- Lifecycle stage information
- Contact associations
- Property history (if scopes granted)
"""Access to HubSpot company data with comprehensive business information.
class Companies(CRMSearchStream):
"""
Stream for HubSpot company records.
Provides access to company data including:
- Company information (name, domain, industry)
- Custom company properties
- Company associations with contacts and deals
- Property history (if scopes granted)
"""Access to HubSpot deal data including pipeline and sales information.
class Deals(CRMSearchStream):
"""
Stream for HubSpot deal records.
Provides access to deal data including:
- Deal information (name, amount, stage)
- Pipeline and deal stage data
- Custom deal properties
- Deal associations with contacts and companies
- Property history (if scopes granted)
"""
class DealsArchived(ClientSideIncrementalStream):
"""
Stream for archived HubSpot deal records.
Provides access to deals that have been archived,
with client-side incremental sync capabilities.
"""
class DealSplits(CRMSearchStream):
"""
Stream for HubSpot deal split records.
Provides access to deal revenue splits and
commission tracking data.
"""
class Leads(CRMSearchStream):
"""
Stream for HubSpot lead records.
Provides access to lead data including:
- Lead qualification status
- Lead source and campaign attribution
- Lead scoring and ranking
- Conversion tracking
- Lead lifecycle stage progression
"""Access to HubSpot service ticket data for customer support workflows.
class Tickets(CRMSearchStream):
"""
Stream for HubSpot ticket records.
Provides access to support ticket data including:
- Ticket information (subject, status, priority)
- Ticket pipeline and stage data
- Custom ticket properties
- Ticket associations with contacts and companies
"""from source_hubspot.streams import Contacts, API
# Setup API client
credentials = {
"credentials_title": "OAuth Credentials",
"client_id": "your_client_id",
"client_secret": "your_client_secret",
"refresh_token": "your_refresh_token"
}
api = API(credentials)
# Create contacts stream
contacts = Contacts(
api=api,
start_date="2023-01-01T00:00:00Z",
credentials=credentials
)
# Read contact data
for record in contacts.read_records(sync_mode="full_refresh"):
print(f"Contact: {record['properties']['email']}")
print(f"Name: {record['properties'].get('firstname', '')} {record['properties'].get('lastname', '')}")from source_hubspot.streams import Companies
from airbyte_cdk.models import SyncMode
# Create companies stream
companies = Companies(
api=api,
start_date="2023-01-01T00:00:00Z",
credentials=credentials
)
# Read with incremental sync
stream_state = {}
for record in companies.read_records(
sync_mode=SyncMode.incremental,
stream_state=stream_state
):
print(f"Company: {record['properties']['name']}")
print(f"Domain: {record['properties'].get('domain', 'N/A')}")
# Update state
stream_state = companies.get_updated_state(stream_state, record)from source_hubspot.streams import Deals
deals = Deals(
api=api,
start_date="2023-01-01T00:00:00Z",
credentials=credentials
)
# Analyze deals by pipeline stage
deal_stages = {}
for record in deals.read_records(sync_mode="full_refresh"):
stage = record['properties'].get('dealstage', 'Unknown')
amount = float(record['properties'].get('amount', 0) or 0)
if stage not in deal_stages:
deal_stages[stage] = {'count': 0, 'total_amount': 0}
deal_stages[stage]['count'] += 1
deal_stages[stage]['total_amount'] += amount
for stage, data in deal_stages.items():
print(f"{stage}: {data['count']} deals, ${data['total_amount']:,.2f}")from source_hubspot.streams import Tickets
tickets = Tickets(
api=api,
start_date="2023-01-01T00:00:00Z",
credentials=credentials
)
# Process support tickets
for record in tickets.read_records(sync_mode="full_refresh"):
ticket_id = record['id']
subject = record['properties'].get('subject', 'No subject')
status = record['properties'].get('hs_ticket_status', 'Unknown')
priority = record['properties'].get('hs_ticket_priority', 'Normal')
print(f"Ticket {ticket_id}: {subject}")
print(f"Status: {status}, Priority: {priority}")# CRM streams support associations between objects
contacts = Contacts(
api=api,
start_date="2023-01-01T00:00:00Z",
credentials=credentials
)
for record in contacts.read_records(sync_mode="full_refresh"):
contact_id = record['id']
email = record['properties']['email']
# Check for associated companies
associations = record.get('associations', {})
if 'companies' in associations:
company_ids = [assoc['id'] for assoc in associations['companies']['results']]
print(f"Contact {email} associated with companies: {company_ids}")All CRM streams require these common parameters:
Each CRM stream provides access to:
CRM streams require specific OAuth scopes:
crm.objects.contacts.readcrm.objects.contacts.read, crm.objects.companies.readcontacts, crm.objects.deals.readticketsAdditional scopes for property history:
crm.schemas.contacts.readcrm.schemas.companies.readcrm.schemas.deals.readInstall with Tessl CLI
npx tessl i tessl/pypi-airbyte-source-hubspot