Airbyte source connector for HubSpot that enables data synchronization from HubSpot's CRM and marketing platform to various destinations.
80
Support for HubSpot custom objects with dynamic schema generation and custom properties handling. Custom objects allow HubSpot users to create their own data structures beyond the standard CRM objects.
Base class for all HubSpot custom object streams with dynamic schema support.
class CustomObject(CRMSearchStream, ABC):
"""
Abstract base class for HubSpot custom object streams.
Provides dynamic schema generation and property handling
for user-defined custom objects in HubSpot.
"""
def __init__(
self,
entity: str,
schema: Mapping[str, Any],
fully_qualified_name: str,
custom_properties: Mapping[str, Any],
**kwargs
):
"""
Initialize custom object stream.
Parameters:
- entity: Custom object entity name
- schema: JSON schema for the custom object
- fully_qualified_name: Fully qualified object name from HubSpot
- custom_properties: Custom property definitions
- **kwargs: Additional stream parameters (api, start_date, etc.)
"""Methods for discovering and processing custom object metadata from HubSpot.
# From API class
def get_custom_objects_metadata(self) -> Iterable[Tuple[str, str, Mapping[str, Any]]]:
"""
Discover custom objects in the HubSpot account.
Yields:
- Tuples of (entity_name, fully_qualified_name, schema, properties)
"""
def get_properties(self, raw_schema: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Extract properties from custom object schema.
Parameters:
- raw_schema: Raw schema definition from HubSpot API
Returns:
- Dictionary mapping property names to property definitions
"""
def generate_schema(self, properties: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Generate JSON schema from custom properties.
Parameters:
- properties: Property definitions
Returns:
- JSON schema for the custom object
"""Methods in SourceHubspot for creating custom object stream instances.
def get_custom_object_streams(self, api: API, common_params: Mapping[str, Any]) -> Generator[CustomObject, None, None]:
"""
Generate custom object stream instances.
Parameters:
- api: API client instance
- common_params: Common stream parameters
Yields:
- CustomObject stream instances for each discovered custom object
"""from source_hubspot.streams import API
from source_hubspot import SourceHubspot
# Setup API client
api = API(credentials)
# Discover custom objects
print("Discovering custom objects...")
for entity, fqn, schema, properties in api.get_custom_objects_metadata():
print(f"\nCustom Object: {entity}")
print(f"Fully Qualified Name: {fqn}")
print(f"Properties: {list(properties.keys())}")
print(f"Schema type: {schema.get('type', 'unknown')}")
# Show property details
for prop_name, prop_def in properties.items():
prop_type = prop_def.get('type', 'unknown')
print(f" - {prop_name}: {prop_type}")from source_hubspot import SourceHubspot
# Create source with custom objects enabled
source = SourceHubspot(catalog=None, config=config, state=None)
# Get all streams including custom objects
all_streams = source.streams(config)
# Filter for custom object streams
custom_streams = []
for stream in all_streams:
if hasattr(stream, 'entity') and hasattr(stream, 'fully_qualified_name'):
custom_streams.append(stream)
print(f"Found {len(custom_streams)} custom object streams:")
for stream in custom_streams:
print(f" - {stream.name} ({stream.entity})")# Assuming we have a custom object stream for "Vehicle" objects
for stream in custom_streams:
if stream.entity == "2-123456": # Custom object ID example
print(f"\nReading data from custom object: {stream.name}")
record_count = 0
for record in stream.read_records(sync_mode="full_refresh"):
record_count += 1
properties = record.get('properties', {})
# Display first few records
if record_count <= 3:
print(f"Record {record_count}:")
for prop_name, prop_value in properties.items():
print(f" {prop_name}: {prop_value}")
print(f"Total records: {record_count}")
break# Analyze custom object schemas in detail
api = API(credentials)
for entity, fqn, schema, properties in api.get_custom_objects_metadata():
print(f"\n=== Custom Object: {entity} ===")
print(f"Fully Qualified Name: {fqn}")
# Analyze schema structure
if 'properties' in schema:
schema_props = schema['properties']
print(f"Schema properties: {len(schema_props)}")
# Categorize property types
type_counts = {}
for prop_name, prop_schema in schema_props.items():
prop_type = prop_schema.get('type', 'unknown')
if prop_type not in type_counts:
type_counts[prop_type] = 0
type_counts[prop_type] += 1
print("Property type distribution:")
for prop_type, count in sorted(type_counts.items()):
print(f" {prop_type}: {count}")
# Analyze custom properties
print(f"Custom properties: {len(properties)}")
required_props = []
optional_props = []
for prop_name, prop_def in properties.items():
if prop_def.get('required', False):
required_props.append(prop_name)
else:
optional_props.append(prop_name)
if required_props:
print(f"Required properties: {', '.join(required_props)}")
if optional_props:
print(f"Optional properties: {len(optional_props)} total")# Manually create a custom object stream
from source_hubspot.streams import CustomObject
# This would typically be done automatically by SourceHubspot
custom_stream = CustomObject(
entity="2-123456", # HubSpot custom object ID
schema={
"type": "object",
"properties": {
"vehicle_make": {"type": "string"},
"vehicle_model": {"type": "string"},
"year": {"type": "integer"},
"price": {"type": "number"}
}
},
fully_qualified_name="p123456_vehicle",
custom_properties={
"vehicle_make": {"type": "string", "label": "Vehicle Make"},
"vehicle_model": {"type": "string", "label": "Vehicle Model"},
"year": {"type": "integer", "label": "Year"},
"price": {"type": "number", "label": "Price"}
},
api=api,
start_date="2023-01-01T00:00:00Z",
credentials=credentials
)
# Use the custom stream
for record in custom_stream.read_records(sync_mode="full_refresh"):
properties = record['properties']
print(f"Vehicle: {properties.get('vehicle_make')} {properties.get('vehicle_model')} ({properties.get('year')})")
print(f"Price: ${properties.get('price', 0):,.2f}")# If experimental streams are enabled, custom objects get web analytics streams too
config_with_experimental = {
**config,
"enable_experimental_streams": True
}
source = SourceHubspot(catalog=None, config=config_with_experimental, state=None)
streams = source.streams(config_with_experimental)
# Find web analytics streams for custom objects
custom_web_analytics = [
s for s in streams
if "WebAnalytics" in s.__class__.__name__ and hasattr(s, 'parent')
]
print(f"Found {len(custom_web_analytics)} custom object web analytics streams")
for stream in custom_web_analytics:
print(f" - {stream.name}")HubSpot custom objects support various property types:
# Common custom property types
CUSTOM_FIELD_TYPE_TO_VALUE = {
bool: "boolean",
str: "string",
float: "number",
int: "integer"
}
# Schema type conversions
KNOWN_CONVERTIBLE_SCHEMA_TYPES = {
"bool": ("boolean", None),
"enumeration": ("string", None),
"date": ("string", "date"),
"date-time": ("string", "date-time"),
"datetime": ("string", "date-time"),
"json": ("string", None),
"phone_number": ("string", None)
}# Example custom property definition
{
"name": "property_name",
"label": "Display Label",
"type": "string", # or "number", "boolean", "datetime", etc.
"fieldType": "text", # HubSpot field type
"required": False,
"options": [], # For enumeration types
"description": "Property description"
}Custom object streams require specific OAuth scopes:
crm.schemas.custom.read, crm.objects.custom.readcrm.schemas.custom.write, crm.objects.custom.writeThe connector automatically detects available custom objects based on granted scopes and includes only those that are accessible to the authenticated user.
Install with Tessl CLI
npx tessl i tessl/pypi-airbyte-source-hubspot