Airbyte source connector for HubSpot that enables data synchronization from HubSpot's CRM and marketing platform to various destinations.
80
Stream classes for HubSpot engagement data including calls, emails, meetings, notes, and tasks. These streams provide access to interaction history and communication records between users and CRM objects.
Access to call engagement records including call details, duration, and outcomes.
class EngagementsCalls(CRMSearchStream):
"""
Stream for HubSpot call engagement records.
Provides access to call data including:
- Call duration and timestamp
- Call outcome and disposition
- Associated contacts, companies, and deals
- Call recording and notes
- Custom call properties
"""Access to email engagement records including email content and interaction data.
class EngagementsEmails(CRMSearchStream):
"""
Stream for HubSpot email engagement records.
Provides access to email data including:
- Email subject and body content
- Send timestamp and status
- Email recipient information
- Associated contacts, companies, and deals
- Email tracking data (opens, clicks)
- Custom email properties
"""Access to meeting engagement records including meeting details and attendee information.
class EngagementsMeetings(CRMSearchStream):
"""
Stream for HubSpot meeting engagement records.
Provides access to meeting data including:
- Meeting title and description
- Start and end timestamps
- Meeting location and type
- Attendee information
- Associated contacts, companies, and deals
- Meeting outcome and follow-up notes
- Custom meeting properties
"""Access to note engagement records including text content and associations.
class EngagementsNotes(CRMSearchStream):
"""
Stream for HubSpot note engagement records.
Provides access to note data including:
- Note content and timestamp
- Note author information
- Associated contacts, companies, and deals
- Note attachments
- Custom note properties
"""Access to task engagement records including task details and completion status.
class EngagementsTasks(CRMSearchStream):
"""
Stream for HubSpot task engagement records.
Provides access to task data including:
- Task subject and description
- Due date and completion status
- Task priority and type
- Task assignee information
- Associated contacts, companies, and deals
- Task completion notes
- Custom task properties
"""from source_hubspot.streams import EngagementsCalls, API
# Setup API client
api = API(credentials)
# Create calls stream
calls = EngagementsCalls(
api=api,
start_date="2023-01-01T00:00:00Z",
credentials=credentials
)
# Analyze call outcomes
call_outcomes = {}
total_duration = 0
for record in calls.read_records(sync_mode="full_refresh"):
engagement = record['engagement']
metadata = record['metadata']
# Call outcome analysis
outcome = metadata.get('disposition', 'Unknown')
duration = metadata.get('durationMilliseconds', 0)
if outcome not in call_outcomes:
call_outcomes[outcome] = {'count': 0, 'total_duration': 0}
call_outcomes[outcome]['count'] += 1
call_outcomes[outcome]['total_duration'] += duration
total_duration += duration
print(f"Total call duration: {total_duration / 1000 / 60:.2f} minutes")
for outcome, data in call_outcomes.items():
avg_duration = data['total_duration'] / data['count'] / 1000 / 60
print(f"{outcome}: {data['count']} calls, avg {avg_duration:.2f} min")from source_hubspot.streams import EngagementsEmails
emails = EngagementsEmails(
api=api,
start_date="2023-01-01T00:00:00Z",
credentials=credentials
)
# Track email engagement metrics
email_stats = {
'sent': 0,
'opened': 0,
'clicked': 0,
'replied': 0
}
for record in emails.read_records(sync_mode="full_refresh"):
metadata = record['metadata']
email_stats['sent'] += 1
if metadata.get('opened'):
email_stats['opened'] += 1
if metadata.get('clicked'):
email_stats['clicked'] += 1
if metadata.get('replied'):
email_stats['replied'] += 1
print(f"Email Performance:")
print(f"Sent: {email_stats['sent']}")
print(f"Open Rate: {email_stats['opened'] / email_stats['sent'] * 100:.1f}%")
print(f"Click Rate: {email_stats['clicked'] / email_stats['sent'] * 100:.1f}%")
print(f"Reply Rate: {email_stats['replied'] / email_stats['sent'] * 100:.1f}%")from source_hubspot.streams import EngagementsMeetings
from datetime import datetime, timedelta
meetings = EngagementsMeetings(
api=api,
start_date="2023-01-01T00:00:00Z",
credentials=credentials
)
# Analyze meeting patterns
meeting_by_day = {}
upcoming_meetings = []
for record in meetings.read_records(sync_mode="full_refresh"):
engagement = record['engagement']
metadata = record['metadata']
# Parse meeting timestamp
timestamp = engagement['timestamp']
meeting_date = datetime.fromtimestamp(timestamp / 1000)
day_of_week = meeting_date.strftime('%A')
# Count meetings by day of week
if day_of_week not in meeting_by_day:
meeting_by_day[day_of_week] = 0
meeting_by_day[day_of_week] += 1
# Check for upcoming meetings
if meeting_date > datetime.now():
upcoming_meetings.append({
'title': metadata.get('title', 'Untitled'),
'date': meeting_date,
'location': metadata.get('location', 'Not specified')
})
print("Meetings by day of week:")
for day, count in meeting_by_day.items():
print(f"{day}: {count}")
print(f"\nUpcoming meetings: {len(upcoming_meetings)}")from source_hubspot.streams import EngagementsTasks
tasks = EngagementsTasks(
api=api,
start_date="2023-01-01T00:00:00Z",
credentials=credentials
)
# Track task completion
task_stats = {
'total': 0,
'completed': 0,
'overdue': 0,
'by_priority': {}
}
current_time = datetime.now().timestamp() * 1000
for record in tasks.read_records(sync_mode="full_refresh"):
engagement = record['engagement']
metadata = record['metadata']
task_stats['total'] += 1
# Check completion status
if metadata.get('status') == 'COMPLETED':
task_stats['completed'] += 1
# Check for overdue tasks
due_date = metadata.get('forObjectType') # This would need proper field mapping
if due_date and due_date < current_time and metadata.get('status') != 'COMPLETED':
task_stats['overdue'] += 1
# Track by priority
priority = metadata.get('priority', 'NONE')
if priority not in task_stats['by_priority']:
task_stats['by_priority'][priority] = 0
task_stats['by_priority'][priority] += 1
completion_rate = task_stats['completed'] / task_stats['total'] * 100
print(f"Task completion rate: {completion_rate:.1f}%")
print(f"Overdue tasks: {task_stats['overdue']}")
print("Tasks by priority:", task_stats['by_priority'])from source_hubspot.streams import EngagementsNotes
notes = EngagementsNotes(
api=api,
start_date="2023-01-01T00:00:00Z",
credentials=credentials
)
# Analyze note content
note_stats = {
'total': 0,
'total_length': 0,
'by_author': {}
}
for record in notes.read_records(sync_mode="full_refresh"):
engagement = record['engagement']
metadata = record['metadata']
note_stats['total'] += 1
# Note content length
body = metadata.get('body', '')
note_stats['total_length'] += len(body)
# Notes by author
owner_id = engagement.get('ownerId')
if owner_id not in note_stats['by_author']:
note_stats['by_author'][owner_id] = 0
note_stats['by_author'][owner_id] += 1
avg_length = note_stats['total_length'] / note_stats['total']
print(f"Average note length: {avg_length:.0f} characters")
print(f"Total notes: {note_stats['total']}")
print("Most active note authors:",
sorted(note_stats['by_author'].items(), key=lambda x: x[1], reverse=True)[:5])All engagement streams return records with a consistent structure:
# Engagement record structure
{
"engagement": {
"id": str, # Engagement ID
"timestamp": int, # Unix timestamp in milliseconds
"type": str, # Engagement type (CALL, EMAIL, MEETING, NOTE, TASK)
"ownerId": str, # HubSpot user ID of engagement owner
"active": bool, # Whether engagement is active
"createdAt": int, # Creation timestamp
"lastUpdated": int # Last update timestamp
},
"associations": {
"contactIds": List[str], # Associated contact IDs
"companyIds": List[str], # Associated company IDs
"dealIds": List[str], # Associated deal IDs
"ticketIds": List[str] # Associated ticket IDs
},
"metadata": {
# Type-specific metadata fields
# Varies by engagement type (calls, emails, meetings, notes, tasks)
}
}Engagement streams require specific OAuth scopes:
crm.objects.contacts.readcrm.objects.contacts.read, sales-email-readcrm.objects.contacts.readcrm.objects.contacts.readcrm.objects.contacts.readSome engagement types may require additional scopes for full functionality:
crm.objects.companies.readcrm.objects.deals.readticketsInstall with Tessl CLI
npx tessl i tessl/pypi-airbyte-source-hubspot