Airbyte source connector for HubSpot that enables data synchronization from HubSpot's CRM and marketing platform to various destinations.
80
Specialized streams for tracking changes to CRM object properties over time. These streams provide detailed audit trails showing when and how property values changed for contacts, companies, and deals.
Stream for tracking changes to company properties over time.
class CompaniesPropertyHistory(IncrementalStream):
"""
Stream for HubSpot company property change history.
Provides access to property history data including:
- Property value changes over time
- Change timestamps and source attribution
- User or system attribution for changes
- Property metadata and context
- Bulk import tracking
Requires OAuth scope: crm.schemas.companies.read
"""Stream for tracking changes to contact properties over time.
class ContactsPropertyHistory(IncrementalStream):
"""
Stream for HubSpot contact property change history.
Provides access to property history data including:
- Contact property value changes
- Change timestamps and attribution
- Form submission property updates
- Email engagement property changes
- Integration sync property updates
Requires OAuth scope: crm.schemas.contacts.read
"""Stream for tracking changes to deal properties over time.
class DealsPropertyHistory(IncrementalStream):
"""
Stream for HubSpot deal property change history.
Provides access to property history data including:
- Deal stage progression tracking
- Amount and close date changes
- Owner assignment changes
- Pipeline movements
- Custom property modifications
Requires OAuth scope: crm.schemas.deals.read
"""from source_hubspot.streams import API
from source_hubspot.source import SourceHubspot
# Setup authentication
credentials = {
"credentials_title": "OAuth Credentials",
"client_id": "your_client_id",
"client_secret": "your_client_secret",
"refresh_token": "your_refresh_token"
}
config = {
"credentials": credentials,
"start_date": "2023-01-01T00:00:00Z"
}
# Get company property history
source = SourceHubspot(catalog=None, config=config, state=None)
streams = source.streams(config)
# Find property history stream
companies_history = next(
s for s in streams
if s.name == "companies_property_history"
)
# Process property changes
for record in companies_history.read_records(sync_mode="incremental"):
company_id = record['objectId']
property_name = record['propertyName']
old_value = record.get('previousValue')
new_value = record['value']
timestamp = record['timestamp']
print(f"Company {company_id}: {property_name}")
print(f" Changed from '{old_value}' to '{new_value}'")
print(f" At: {timestamp}")# Track specific contact property changes
contacts_history = next(
s for s in streams
if s.name == "contacts_property_history"
)
# Filter for specific properties
target_properties = ['email', 'firstname', 'lastname', 'lifecyclestage']
for record in contacts_history.read_records(sync_mode="incremental"):
if record['propertyName'] in target_properties:
contact_id = record['objectId']
property_name = record['propertyName']
change_source = record.get('sourceType', 'Unknown')
change_user = record.get('sourceId', 'System')
print(f"Contact {contact_id} - {property_name}")
print(f" Source: {change_source} by {change_user}")
print(f" Value: {record['value']}")# Analyze deal stage progression
deals_history = next(
s for s in streams
if s.name == "deals_property_history"
)
stage_changes = {}
for record in deals_history.read_records(sync_mode="incremental"):
if record['propertyName'] == 'dealstage':
deal_id = record['objectId']
new_stage = record['value']
timestamp = record['timestamp']
if deal_id not in stage_changes:
stage_changes[deal_id] = []
stage_changes[deal_id].append({
'stage': new_stage,
'timestamp': timestamp
})
# Calculate average time in each stage
for deal_id, changes in stage_changes.items():
changes.sort(key=lambda x: x['timestamp'])
print(f"Deal {deal_id} progression:")
for i, change in enumerate(changes):
print(f" Stage {i+1}: {change['stage']} at {change['timestamp']}")Property history streams require specific OAuth scopes to access schema information:
crm.schemas.companies.readcrm.schemas.contacts.readcrm.schemas.deals.readThese scopes are in addition to the basic object read permissions and must be explicitly granted during OAuth configuration.
Each property history record contains:
{
"objectId": "12345", # ID of the object that changed
"propertyName": "email", # Name of the property that changed
"value": "new@example.com", # New property value
"previousValue": "old@example.com", # Previous value (may be null)
"timestamp": "2023-01-15T10:30:00Z", # When the change occurred
"sourceType": "FORM", # How the change was made
"sourceId": "form-123", # Specific source identifier
"sourceLabel": "Contact Form", # Human-readable source description
"objectTypeId": "0-1" # HubSpot object type identifier
}Property history streams support incremental synchronization using timestamp-based cursors:
timestampThe streams automatically handle pagination and ensure complete data consistency during incremental updates.
Install with Tessl CLI
npx tessl i tessl/pypi-airbyte-source-hubspot