CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-airbyte-source-hubspot

Airbyte source connector for HubSpot that enables data synchronization from HubSpot's CRM and marketing platform to various destinations.

80

1.40x
Overview
Eval results
Files

marketing-sales-streams.mddocs/

Marketing & Sales Streams

Stream classes for marketing and sales data including forms, form submissions, owners, and products. These streams provide access to HubSpot's marketing automation and sales enablement features.

Capabilities

Form Streams

Access to HubSpot form definitions and form submission data.

class Forms(ClientSideIncrementalStream):
    """
    Stream for HubSpot form definitions.
    
    Provides access to form data including:
    - Form configuration and fields
    - Form settings and styling
    - Form submission settings
    - Form performance metadata
    - Custom form properties
    """

class FormSubmissions(ClientSideIncrementalStream):
    """
    Stream for HubSpot form submission records.
    
    Provides access to form submission data including:
    - Submitted form field values
    - Submission timestamp and source
    - Contact information from submission
    - Form conversion tracking
    - Page context and referrer information
    """

Owner Streams

Access to HubSpot user and owner information for sales team management.

class Owners(ClientSideIncrementalStream):
    """
    Stream for HubSpot owner (user) records.
    
    Provides access to owner data including:
    - User profile information
    - Owner permissions and roles
    - Team assignments
    - Owner activity status
    - Custom owner properties
    """

class OwnersArchived(ClientSideIncrementalStream):
    """
    Stream for archived HubSpot owner records.
    
    Provides access to deactivated or archived
    owner accounts with historical data.
    """

Product & E-commerce Streams

Access to product catalog and line item data for e-commerce integration.

class Products(CRMObjectIncrementalStream):
    """
    Stream for HubSpot product records.
    
    Provides access to product data including:
    - Product name, description, and SKU
    - Pricing and cost information
    - Product categories and properties
    - Inventory tracking data
    - Custom product properties
    """

class LineItems(CRMObjectIncrementalStream):
    """
    Stream for HubSpot line item records.
    
    Provides access to line item data including:
    - Product associations and quantities
    - Pricing and discount information
    - Line item properties
    - Deal and quote associations
    - Custom line item properties
    """

Goal Streams

Access to HubSpot goal and performance tracking data.

class Goals(CRMObjectIncrementalStream):
    """
    Stream for HubSpot goal records.
    
    Provides access to goal data including:
    - Goal definitions and targets
    - Goal progress tracking
    - Goal assignment and ownership
    - Performance metrics
    - Custom goal properties
    """

Feedback Streams

Access to customer feedback and survey response data.

class FeedbackSubmissions(CRMObjectIncrementalStream):
    """
    Stream for HubSpot feedback submission records.
    
    Provides access to feedback data including:
    - Survey responses and ratings
    - Feedback content and comments
    - Customer satisfaction scores
    - Feedback categorization
    - Custom feedback properties
    """

Usage Examples

Form Performance Analysis

from source_hubspot.streams import Forms, FormSubmissions, API

api = API(credentials)

# Get form definitions
forms = Forms(
    api=api,
    start_date="2023-01-01T00:00:00Z",
    credentials=credentials
)

# Get form submissions
submissions = FormSubmissions(
    api=api,
    start_date="2023-01-01T00:00:00Z",
    credentials=credentials
)

# Analyze form performance
form_performance = {}

# First, get all forms
for form_record in forms.read_records(sync_mode="full_refresh"):
    form_id = form_record['guid']
    form_name = form_record['name']
    form_performance[form_id] = {
        'name': form_name,
        'submissions': 0,
        'unique_contacts': set()
    }

# Count submissions per form
for submission_record in submissions.read_records(sync_mode="full_refresh"):
    form_guid = submission_record['formId']
    contact_email = None
    
    # Extract email from form values
    for value in submission_record.get('values', []):
        if value.get('name') == 'email':
            contact_email = value.get('value')
            break
    
    if form_guid in form_performance:
        form_performance[form_guid]['submissions'] += 1
        if contact_email:
            form_performance[form_guid]['unique_contacts'].add(contact_email)

# Display results
for form_id, data in form_performance.items():
    unique_count = len(data['unique_contacts'])
    conversion_rate = (unique_count / data['submissions'] * 100) if data['submissions'] > 0 else 0
    print(f"Form: {data['name']}")
    print(f"  Submissions: {data['submissions']}")
    print(f"  Unique contacts: {unique_count}")
    print(f"  Conversion rate: {conversion_rate:.1f}%")

Sales Team Performance

from source_hubspot.streams import Owners, Deals

# Get owner information
owners = Owners(
    api=api,
    start_date="2023-01-01T00:00:00Z",
    credentials=credentials
)

# Build owner lookup
owner_lookup = {}
for owner_record in owners.read_records(sync_mode="full_refresh"):
    owner_id = str(owner_record['ownerId'])
    owner_lookup[owner_id] = {
        'name': f"{owner_record.get('firstName', '')} {owner_record.get('lastName', '')}".strip(),
        'email': owner_record.get('email', ''),
        'deals_count': 0,
        'deals_value': 0
    }

# Analyze deals by owner
deals = Deals(
    api=api,
    start_date="2023-01-01T00:00:00Z",
    credentials=credentials
)

for deal_record in deals.read_records(sync_mode="full_refresh"):
    owner_id = deal_record['properties'].get('hubspot_owner_id')
    deal_amount = float(deal_record['properties'].get('amount', 0) or 0)
    
    if owner_id and owner_id in owner_lookup:
        owner_lookup[owner_id]['deals_count'] += 1
        owner_lookup[owner_id]['deals_value'] += deal_amount

# Display sales performance
print("Sales Team Performance:")
sorted_owners = sorted(owner_lookup.items(), key=lambda x: x[1]['deals_value'], reverse=True)
for owner_id, data in sorted_owners:
    if data['deals_count'] > 0:
        avg_deal_size = data['deals_value'] / data['deals_count']
        print(f"{data['name']}: {data['deals_count']} deals, ${data['deals_value']:,.2f} total, ${avg_deal_size:,.2f} avg")

Product Catalog Management

from source_hubspot.streams import Products, LineItems

# Get product catalog
products = Products(
    api=api,
    start_date="2023-01-01T00:00:00Z",
    credentials=credentials
)

product_catalog = {}
for product_record in products.read_records(sync_mode="full_refresh"):
    product_id = product_record['id']
    properties = product_record['properties']
    
    product_catalog[product_id] = {
        'name': properties.get('name', 'Unnamed Product'),
        'price': float(properties.get('price', 0) or 0),
        'sku': properties.get('sku', ''),
        'sales_count': 0,
        'revenue': 0
    }

# Analyze line item sales
line_items = LineItems(
    api=api,
    start_date="2023-01-01T00:00:00Z",
    credentials=credentials
)

for line_item_record in line_items.read_records(sync_mode="full_refresh"):
    properties = line_item_record['properties']
    product_id = properties.get('hs_product_id')
    quantity = int(properties.get('quantity', 0) or 0)
    price = float(properties.get('price', 0) or 0)
    
    if product_id and product_id in product_catalog:
        product_catalog[product_id]['sales_count'] += quantity
        product_catalog[product_id]['revenue'] += (quantity * price)

# Display product performance
print("Product Performance:")
sorted_products = sorted(product_catalog.items(), key=lambda x: x[1]['revenue'], reverse=True)
for product_id, data in sorted_products[:10]:  # Top 10 products
    print(f"{data['name']} (SKU: {data['sku']})")
    print(f"  Units sold: {data['sales_count']}")
    print(f"  Revenue: ${data['revenue']:,.2f}")
    print(f"  Avg selling price: ${data['revenue'] / max(data['sales_count'], 1):,.2f}")

Goal Tracking

from source_hubspot.streams import Goals

goals = Goals(
    api=api,
    start_date="2023-01-01T00:00:00Z",
    credentials=credentials
)

# Track goal progress
goal_progress = {}
for goal_record in goals.read_records(sync_mode="full_refresh"):
    properties = goal_record['properties']
    goal_name = properties.get('name', 'Unnamed Goal')
    target_value = float(properties.get('target_value', 0) or 0)
    current_value = float(properties.get('current_value', 0) or 0)
    
    progress_percent = (current_value / target_value * 100) if target_value > 0 else 0
    
    goal_progress[goal_name] = {
        'target': target_value,
        'current': current_value,
        'progress': progress_percent,
        'status': properties.get('status', 'Unknown')
    }

print("Goal Progress:")
for goal_name, data in goal_progress.items():
    status_indicator = "✅" if data['progress'] >= 100 else "🔄" if data['progress'] >= 50 else "⚠️"
    print(f"{status_indicator} {goal_name}: {data['progress']:.1f}% ({data['current']}/{data['target']})")

Customer Feedback Analysis

from source_hubspot.streams import FeedbackSubmissions

feedback = FeedbackSubmissions(
    api=api,
    start_date="2023-01-01T00:00:00Z",
    credentials=credentials
)

# Analyze feedback ratings
feedback_stats = {
    'total_responses': 0,
    'rating_distribution': {},
    'average_rating': 0,
    'total_rating': 0
}

for feedback_record in feedback.read_records(sync_mode="full_refresh"):
    properties = feedback_record['properties']
    rating = properties.get('rating')
    
    if rating:
        rating = float(rating)
        feedback_stats['total_responses'] += 1
        feedback_stats['total_rating'] += rating
        
        rating_bucket = int(rating)
        if rating_bucket not in feedback_stats['rating_distribution']:
            feedback_stats['rating_distribution'][rating_bucket] = 0
        feedback_stats['rating_distribution'][rating_bucket] += 1

if feedback_stats['total_responses'] > 0:
    feedback_stats['average_rating'] = feedback_stats['total_rating'] / feedback_stats['total_responses']

print(f"Customer Feedback Summary:")
print(f"Total responses: {feedback_stats['total_responses']}")
print(f"Average rating: {feedback_stats['average_rating']:.2f}")
print("Rating distribution:")
for rating in sorted(feedback_stats['rating_distribution'].keys()):
    count = feedback_stats['rating_distribution'][rating]
    percentage = count / feedback_stats['total_responses'] * 100
    print(f"  {rating} stars: {count} ({percentage:.1f}%)")

Stream Base Classes

ClientSideIncrementalStream

Used by forms, owners, and other streams that implement client-side incremental logic.

class ClientSideIncrementalStream(BaseStream, CheckpointMixin):
    """
    Base class for streams with client-side incremental sync.
    
    Handles state management and checkpointing for streams
    that don't support server-side incremental queries.
    """

CRMObjectIncrementalStream

Used by products, line items, goals, and feedback streams.

class CRMObjectIncrementalStream(CRMObjectStream, IncrementalStream):
    """
    Base class for CRM object streams with incremental sync.
    
    Combines CRM object functionality with incremental
    sync capabilities for optimal data synchronization.
    """

OAuth Scopes

Marketing & sales streams require specific OAuth scopes:

  • Forms: forms
  • Form Submissions: forms
  • Owners: crm.objects.owners.read
  • Products: e-commerce
  • Line Items: e-commerce, crm.objects.line_items.read
  • Goals: crm.objects.goals.read
  • Feedback Submissions: Custom object scopes may apply

Install with Tessl CLI

npx tessl i tessl/pypi-airbyte-source-hubspot

docs

api-client.md

base-stream-classes.md

crm-streams.md

custom-objects.md

engagement-streams.md

error-handling.md

index.md

marketing-sales-streams.md

property-history-streams.md

source-connector.md

web-analytics.md

tile.json