Airbyte source connector for extracting sales engagement data from the Outreach platform via REST API.
npx @tessl/cli install tessl/pypi-source-outreach@1.0.0An Airbyte source connector that enables data extraction from Outreach, a sales engagement platform. Built using the Airbyte CDK (Connector Development Kit) and configured as a declarative YAML-based connector, it provides automated data synchronization from Outreach's REST API to various data destinations.
pip install source-outreach or via Poetry: poetry installfrom source_outreach import SourceOutreach
from source_outreach.run import runFor custom components (advanced usage):
from source_outreach.components import CustomExtractor, CustomIncrementalSyncRun the connector via Poetry script:
# Display connector specification
source-outreach spec
# Test connection with configuration
source-outreach check --config config.json
# Discover available data streams
source-outreach discover --config config.json
# Extract data from Outreach
source-outreach read --config config.json --catalog catalog.jsonfrom source_outreach import SourceOutreach
from airbyte_cdk.entrypoint import launch
import sys
# Create and run the connector
source = SourceOutreach()
launch(source, sys.argv[1:])Create a config.json file with your Outreach OAuth credentials:
{
"client_id": "your_oauth_client_id",
"client_secret": "your_oauth_client_secret",
"refresh_token": "your_oauth_refresh_token",
"redirect_uri": "https://your-app.com/oauth/callback",
"start_date": "2020-11-16T00:00:00Z"
}The connector follows Airbyte's declarative YAML-based framework:
manifest.yamlupdatedAt timestampsThe primary entry point for creating and running the Outreach source connector.
class SourceOutreach(YamlDeclarativeSource):
"""
Declarative source connector for Outreach API data extraction.
Inherits from YamlDeclarativeSource and loads configuration from manifest.yaml.
"""
def __init__(self):
"""Initialize connector with manifest.yaml configuration."""Functions for launching the connector via CLI or programmatically.
def run():
"""
Entry point function that creates SourceOutreach instance and launches it.
Uses sys.argv[1:] for command line arguments and airbyte_cdk.entrypoint.launch
for connector execution.
"""Advanced component for handling Outreach API response transformation.
class CustomExtractor(RecordExtractor):
"""
Custom record extractor for Outreach API responses.
Transforms API responses by flattening relationship data and extracting
records from the standard Outreach API format.
"""
def extract_records(self, response: requests.Response, **kwargs) -> List[Mapping[str, Any]]:
"""
Extract and transform records from Outreach API response.
Parameters:
- response: requests.Response object from Outreach API
- **kwargs: Additional extraction parameters
Returns:
List of transformed record dictionaries with flattened relationships
"""Custom cursor-based incremental sync implementation.
class CustomIncrementalSync(DatetimeBasedCursor):
"""
Custom incremental sync cursor for Outreach streams.
Implements cursor-based incremental synchronization using updatedAt timestamps
with Outreach-specific filtering parameters.
"""
def get_request_params(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
"""
Generate request parameters for incremental sync.
Parameters:
- stream_state: Current stream state with cursor values
- stream_slice: Stream slice configuration
- next_page_token: Pagination token
Returns:
Dictionary of request parameters including filter[updatedAt] for incremental sync
"""The connector provides access to 17 Outreach API endpoints as data streams:
/accounts/prospects/opportunities/users/calls/mailings/mailboxes/templates/snippets/sequences/sequence_states/sequence_steps/callDispositions/callPurposes/personas/stages/tasksAll streams support the following features:
# Stream configuration attributes
PRIMARY_KEY = "id" # Integer primary key
CURSOR_FIELD = "updatedAt" # Datetime cursor for incremental sync
SYNC_MODES = ["full_refresh", "incremental"] # Supported sync modes
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" # ISO datetime format
BASE_URL = "https://api.outreach.io/api/v2/" # API base URLOAuth 2.0 configuration for Outreach API access:
# OAuth configuration (defined in manifest.yaml)
OAUTH_CONFIG = {
"client_id": "config['client_id']",
"client_secret": "config['client_secret']",
"refresh_token": "config['refresh_token']",
"redirect_uri": "config['redirect_uri']",
"token_refresh_endpoint": "https://api.outreach.io/oauth/token",
"grant_type": "refresh_token"
}Default request parameters applied to all API calls:
DEFAULT_PARAMS = {
"count": "false", # Disable response count
"sort": "updatedAt", # Sort by update timestamp
"page[size]": "1000" # Page size for pagination
}
# Incremental sync filter format
INCREMENTAL_FILTER = "filter[updatedAt]={cursor_value}..inf"Core type definitions for the connector:
from typing import Any, Dict, List, Mapping, MutableMapping, Optional
from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState
import requests
# Type aliases for connector usage
ConfigDict = Dict[str, Any]
RecordDict = Mapping[str, Any]
RecordList = List[RecordDict]
RequestParams = MutableMapping[str, Any]The connector handles common integration scenarios:
import subprocess
import json
# Discover available streams
result = subprocess.run([
"source-outreach", "discover",
"--config", "config.json"
], capture_output=True, text=True)
catalog = json.loads(result.stdout)
streams = [stream["name"] for stream in catalog["streams"]]
print(f"Available streams: {streams}")from source_outreach import SourceOutreach
from airbyte_cdk.models import AirbyteMessage, Type as MessageType
import json
# Configure the source
config = {
"client_id": "your_client_id",
"client_secret": "your_client_secret",
"refresh_token": "your_refresh_token",
"redirect_uri": "https://your-app.com/oauth/callback",
"start_date": "2024-01-01T00:00:00Z"
}
# Create connector instance
source = SourceOutreach()
# Read data (this would typically be called by Airbyte platform)
catalog = source.discover(None, config)
configured_catalog = {
"streams": [{
"stream": stream.json_schema,
"sync_mode": "incremental",
"destination_sync_mode": "append"
} for stream in catalog.streams[:1]] # Just first stream for example
}
# Extract records
for message in source.read(None, config, configured_catalog):
if message.type == MessageType.RECORD:
print(f"Record from {message.record.stream}: {message.record.data}")