CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-airbyte-source-hubspot

Airbyte source connector for HubSpot that enables data synchronization from HubSpot's CRM and marketing platform to various destinations.

80

1.40x
Overview
Eval results
Files

engagement-streams.mddocs/

Engagement Streams

Stream classes for HubSpot engagement data including calls, emails, meetings, notes, and tasks. These streams provide access to interaction history and communication records between users and CRM objects.

Capabilities

Call Engagements

Access to call engagement records including call details, duration, and outcomes.

class EngagementsCalls(CRMSearchStream):
    """
    Stream for HubSpot call engagement records.
    
    Provides access to call data including:
    - Call duration and timestamp
    - Call outcome and disposition
    - Associated contacts, companies, and deals
    - Call recording and notes
    - Custom call properties
    """

Email Engagements

Access to email engagement records including email content and interaction data.

class EngagementsEmails(CRMSearchStream):
    """
    Stream for HubSpot email engagement records.
    
    Provides access to email data including:
    - Email subject and body content
    - Send timestamp and status
    - Email recipient information
    - Associated contacts, companies, and deals
    - Email tracking data (opens, clicks)
    - Custom email properties
    """

Meeting Engagements

Access to meeting engagement records including meeting details and attendee information.

class EngagementsMeetings(CRMSearchStream):
    """
    Stream for HubSpot meeting engagement records.
    
    Provides access to meeting data including:
    - Meeting title and description
    - Start and end timestamps
    - Meeting location and type
    - Attendee information
    - Associated contacts, companies, and deals
    - Meeting outcome and follow-up notes
    - Custom meeting properties
    """

Note Engagements

Access to note engagement records including text content and associations.

class EngagementsNotes(CRMSearchStream):
    """
    Stream for HubSpot note engagement records.
    
    Provides access to note data including:
    - Note content and timestamp
    - Note author information
    - Associated contacts, companies, and deals
    - Note attachments
    - Custom note properties
    """

Task Engagements

Access to task engagement records including task details and completion status.

class EngagementsTasks(CRMSearchStream):
    """
    Stream for HubSpot task engagement records.
    
    Provides access to task data including:
    - Task subject and description
    - Due date and completion status
    - Task priority and type
    - Task assignee information
    - Associated contacts, companies, and deals
    - Task completion notes
    - Custom task properties
    """

Usage Examples

Call Engagement Analysis

from source_hubspot.streams import EngagementsCalls, API

# Setup API client
api = API(credentials)

# Create calls stream
calls = EngagementsCalls(
    api=api,
    start_date="2023-01-01T00:00:00Z",
    credentials=credentials
)

# Analyze call outcomes
call_outcomes = {}
total_duration = 0

for record in calls.read_records(sync_mode="full_refresh"):
    engagement = record['engagement']
    metadata = record['metadata']
    
    # Call outcome analysis
    outcome = metadata.get('disposition', 'Unknown')
    duration = metadata.get('durationMilliseconds', 0)
    
    if outcome not in call_outcomes:
        call_outcomes[outcome] = {'count': 0, 'total_duration': 0}
    
    call_outcomes[outcome]['count'] += 1
    call_outcomes[outcome]['total_duration'] += duration
    total_duration += duration

print(f"Total call duration: {total_duration / 1000 / 60:.2f} minutes")
for outcome, data in call_outcomes.items():
    avg_duration = data['total_duration'] / data['count'] / 1000 / 60
    print(f"{outcome}: {data['count']} calls, avg {avg_duration:.2f} min")

Email Engagement Tracking

from source_hubspot.streams import EngagementsEmails

emails = EngagementsEmails(
    api=api,
    start_date="2023-01-01T00:00:00Z",
    credentials=credentials
)

# Track email engagement metrics
email_stats = {
    'sent': 0,
    'opened': 0,
    'clicked': 0,
    'replied': 0
}

for record in emails.read_records(sync_mode="full_refresh"):
    metadata = record['metadata']
    
    email_stats['sent'] += 1
    
    if metadata.get('opened'):
        email_stats['opened'] += 1
    
    if metadata.get('clicked'):
        email_stats['clicked'] += 1
        
    if metadata.get('replied'):
        email_stats['replied'] += 1

print(f"Email Performance:")
print(f"Sent: {email_stats['sent']}")
print(f"Open Rate: {email_stats['opened'] / email_stats['sent'] * 100:.1f}%")
print(f"Click Rate: {email_stats['clicked'] / email_stats['sent'] * 100:.1f}%")
print(f"Reply Rate: {email_stats['replied'] / email_stats['sent'] * 100:.1f}%")

Meeting Engagement Schedule

from source_hubspot.streams import EngagementsMeetings
from datetime import datetime, timedelta

meetings = EngagementsMeetings(
    api=api,
    start_date="2023-01-01T00:00:00Z",
    credentials=credentials
)

# Analyze meeting patterns
meeting_by_day = {}
upcoming_meetings = []

for record in meetings.read_records(sync_mode="full_refresh"):
    engagement = record['engagement']
    metadata = record['metadata']
    
    # Parse meeting timestamp
    timestamp = engagement['timestamp']
    meeting_date = datetime.fromtimestamp(timestamp / 1000)
    day_of_week = meeting_date.strftime('%A')
    
    # Count meetings by day of week
    if day_of_week not in meeting_by_day:
        meeting_by_day[day_of_week] = 0
    meeting_by_day[day_of_week] += 1
    
    # Check for upcoming meetings
    if meeting_date > datetime.now():
        upcoming_meetings.append({
            'title': metadata.get('title', 'Untitled'),
            'date': meeting_date,
            'location': metadata.get('location', 'Not specified')
        })

print("Meetings by day of week:")
for day, count in meeting_by_day.items():
    print(f"{day}: {count}")

print(f"\nUpcoming meetings: {len(upcoming_meetings)}")

Task Management

from source_hubspot.streams import EngagementsTasks

tasks = EngagementsTasks(
    api=api,
    start_date="2023-01-01T00:00:00Z",
    credentials=credentials
)

# Track task completion
task_stats = {
    'total': 0,
    'completed': 0,
    'overdue': 0,
    'by_priority': {}
}

current_time = datetime.now().timestamp() * 1000

for record in tasks.read_records(sync_mode="full_refresh"):
    engagement = record['engagement']
    metadata = record['metadata']
    
    task_stats['total'] += 1
    
    # Check completion status
    if metadata.get('status') == 'COMPLETED':
        task_stats['completed'] += 1
    
    # Check for overdue tasks
    due_date = metadata.get('forObjectType')  # This would need proper field mapping
    if due_date and due_date < current_time and metadata.get('status') != 'COMPLETED':
        task_stats['overdue'] += 1
    
    # Track by priority
    priority = metadata.get('priority', 'NONE')
    if priority not in task_stats['by_priority']:
        task_stats['by_priority'][priority] = 0
    task_stats['by_priority'][priority] += 1

completion_rate = task_stats['completed'] / task_stats['total'] * 100
print(f"Task completion rate: {completion_rate:.1f}%")
print(f"Overdue tasks: {task_stats['overdue']}")
print("Tasks by priority:", task_stats['by_priority'])

Note Content Analysis

from source_hubspot.streams import EngagementsNotes

notes = EngagementsNotes(
    api=api,
    start_date="2023-01-01T00:00:00Z",
    credentials=credentials
)

# Analyze note content
note_stats = {
    'total': 0,
    'total_length': 0,
    'by_author': {}
}

for record in notes.read_records(sync_mode="full_refresh"):
    engagement = record['engagement']
    metadata = record['metadata']
    
    note_stats['total'] += 1
    
    # Note content length
    body = metadata.get('body', '')
    note_stats['total_length'] += len(body)
    
    # Notes by author
    owner_id = engagement.get('ownerId')
    if owner_id not in note_stats['by_author']:
        note_stats['by_author'][owner_id] = 0
    note_stats['by_author'][owner_id] += 1

avg_length = note_stats['total_length'] / note_stats['total']
print(f"Average note length: {avg_length:.0f} characters")
print(f"Total notes: {note_stats['total']}")
print("Most active note authors:", 
      sorted(note_stats['by_author'].items(), key=lambda x: x[1], reverse=True)[:5])

Engagement Data Structure

All engagement streams return records with a consistent structure:

# Engagement record structure
{
    "engagement": {
        "id": str,  # Engagement ID
        "timestamp": int,  # Unix timestamp in milliseconds
        "type": str,  # Engagement type (CALL, EMAIL, MEETING, NOTE, TASK)
        "ownerId": str,  # HubSpot user ID of engagement owner
        "active": bool,  # Whether engagement is active
        "createdAt": int,  # Creation timestamp
        "lastUpdated": int  # Last update timestamp
    },
    "associations": {
        "contactIds": List[str],  # Associated contact IDs
        "companyIds": List[str],  # Associated company IDs  
        "dealIds": List[str],     # Associated deal IDs
        "ticketIds": List[str]    # Associated ticket IDs
    },
    "metadata": {
        # Type-specific metadata fields
        # Varies by engagement type (calls, emails, meetings, notes, tasks)
    }
}

OAuth Scopes

Engagement streams require specific OAuth scopes:

  • Calls: crm.objects.contacts.read
  • Emails: crm.objects.contacts.read, sales-email-read
  • Meetings: crm.objects.contacts.read
  • Notes: crm.objects.contacts.read
  • Tasks: crm.objects.contacts.read

Some engagement types may require additional scopes for full functionality:

  • Companies: crm.objects.companies.read
  • Deals: crm.objects.deals.read
  • Tickets: tickets

Install with Tessl CLI

npx tessl i tessl/pypi-airbyte-source-hubspot

docs

api-client.md

base-stream-classes.md

crm-streams.md

custom-objects.md

engagement-streams.md

error-handling.md

index.md

marketing-sales-streams.md

property-history-streams.md

source-connector.md

web-analytics.md

tile.json