Airbyte source connector for extracting sales engagement data from the Outreach platform via REST API.
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
An Airbyte source connector that enables data extraction from Outreach, a sales engagement platform. Built using the Airbyte CDK (Connector Development Kit) and configured as a declarative YAML-based connector, it provides automated data synchronization from Outreach's REST API to various data destinations.
pip install source-outreach or via Poetry: poetry installfrom source_outreach import SourceOutreach
from source_outreach.run import runFor custom components (advanced usage):
from source_outreach.components import CustomExtractor, CustomIncrementalSyncRun the connector via Poetry script:
# Display connector specification
source-outreach spec
# Test connection with configuration
source-outreach check --config config.json
# Discover available data streams
source-outreach discover --config config.json
# Extract data from Outreach
source-outreach read --config config.json --catalog catalog.jsonfrom source_outreach import SourceOutreach
from airbyte_cdk.entrypoint import launch
import sys
# Create and run the connector
source = SourceOutreach()
launch(source, sys.argv[1:])Create a config.json file with your Outreach OAuth credentials:
{
"client_id": "your_oauth_client_id",
"client_secret": "your_oauth_client_secret",
"refresh_token": "your_oauth_refresh_token",
"redirect_uri": "https://your-app.com/oauth/callback",
"start_date": "2020-11-16T00:00:00Z"
}The connector follows Airbyte's declarative YAML-based framework:
manifest.yamlupdatedAt timestampsThe primary entry point for creating and running the Outreach source connector.
class SourceOutreach(YamlDeclarativeSource):
"""
Declarative source connector for Outreach API data extraction.
Inherits from YamlDeclarativeSource and loads configuration from manifest.yaml.
"""
def __init__(self):
"""Initialize connector with manifest.yaml configuration."""Functions for launching the connector via CLI or programmatically.
def run():
"""
Entry point function that creates SourceOutreach instance and launches it.
Uses sys.argv[1:] for command line arguments and airbyte_cdk.entrypoint.launch
for connector execution.
"""Advanced component for handling Outreach API response transformation.
class CustomExtractor(RecordExtractor):
"""
Custom record extractor for Outreach API responses.
Transforms API responses by flattening relationship data and extracting
records from the standard Outreach API format.
"""
def extract_records(self, response: requests.Response, **kwargs) -> List[Mapping[str, Any]]:
"""
Extract and transform records from Outreach API response.
Parameters:
- response: requests.Response object from Outreach API
- **kwargs: Additional extraction parameters
Returns:
List of transformed record dictionaries with flattened relationships
"""Custom cursor-based incremental sync implementation.
class CustomIncrementalSync(DatetimeBasedCursor):
"""
Custom incremental sync cursor for Outreach streams.
Implements cursor-based incremental synchronization using updatedAt timestamps
with Outreach-specific filtering parameters.
"""
def get_request_params(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
"""
Generate request parameters for incremental sync.
Parameters:
- stream_state: Current stream state with cursor values
- stream_slice: Stream slice configuration
- next_page_token: Pagination token
Returns:
Dictionary of request parameters including filter[updatedAt] for incremental sync
"""The connector provides access to 17 Outreach API endpoints as data streams:
/accounts/prospects/opportunities/users/calls/mailings/mailboxes/templates/snippets/sequences/sequence_states/sequence_steps/callDispositions/callPurposes/personas/stages/tasksAll streams support the following features:
# Stream configuration attributes
PRIMARY_KEY = "id" # Integer primary key
CURSOR_FIELD = "updatedAt" # Datetime cursor for incremental sync
SYNC_MODES = ["full_refresh", "incremental"] # Supported sync modes
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" # ISO datetime format
BASE_URL = "https://api.outreach.io/api/v2/" # API base URLOAuth 2.0 configuration for Outreach API access:
# OAuth configuration (defined in manifest.yaml)
OAUTH_CONFIG = {
"client_id": "config['client_id']",
"client_secret": "config['client_secret']",
"refresh_token": "config['refresh_token']",
"redirect_uri": "config['redirect_uri']",
"token_refresh_endpoint": "https://api.outreach.io/oauth/token",
"grant_type": "refresh_token"
}Default request parameters applied to all API calls:
DEFAULT_PARAMS = {
"count": "false", # Disable response count
"sort": "updatedAt", # Sort by update timestamp
"page[size]": "1000" # Page size for pagination
}
# Incremental sync filter format
INCREMENTAL_FILTER = "filter[updatedAt]={cursor_value}..inf"Core type definitions for the connector:
from typing import Any, Dict, List, Mapping, MutableMapping, Optional
from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState
import requests
# Type aliases for connector usage
ConfigDict = Dict[str, Any]
RecordDict = Mapping[str, Any]
RecordList = List[RecordDict]
RequestParams = MutableMapping[str, Any]The connector handles common integration scenarios:
import subprocess
import json
# Discover available streams
result = subprocess.run([
"source-outreach", "discover",
"--config", "config.json"
], capture_output=True, text=True)
catalog = json.loads(result.stdout)
streams = [stream["name"] for stream in catalog["streams"]]
print(f"Available streams: {streams}")from source_outreach import SourceOutreach
from airbyte_cdk.models import AirbyteMessage, Type as MessageType
import json
# Configure the source
config = {
"client_id": "your_client_id",
"client_secret": "your_client_secret",
"refresh_token": "your_refresh_token",
"redirect_uri": "https://your-app.com/oauth/callback",
"start_date": "2024-01-01T00:00:00Z"
}
# Create connector instance
source = SourceOutreach()
# Read data (this would typically be called by Airbyte platform)
catalog = source.discover(None, config)
configured_catalog = {
"streams": [{
"stream": stream.json_schema,
"sync_mode": "incremental",
"destination_sync_mode": "append"
} for stream in catalog.streams[:1]] # Just first stream for example
}
# Extract records
for message in source.read(None, config, configured_catalog):
if message.type == MessageType.RECORD:
print(f"Record from {message.record.stream}: {message.record.data}")