or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/pypi-source-outreach

Airbyte source connector for extracting sales engagement data from the Outreach platform via REST API.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/source-outreach@1.0.x

To install, run

npx @tessl/cli install tessl/pypi-source-outreach@1.0.0

index.mddocs/

Airbyte Source Outreach

An Airbyte source connector that enables data extraction from Outreach, a sales engagement platform. Built using the Airbyte CDK (Connector Development Kit) and configured as a declarative YAML-based connector, it provides automated data synchronization from Outreach's REST API to various data destinations.

Package Information

  • Package Name: source-outreach
  • Package Type: pypi (Airbyte source connector)
  • Language: Python (^3.9,<3.12)
  • Installation: pip install source-outreach or via Poetry: poetry install
  • Dependencies: airbyte-cdk (^0)

Core Imports

from source_outreach import SourceOutreach
from source_outreach.run import run

For custom components (advanced usage):

from source_outreach.components import CustomExtractor, CustomIncrementalSync

Basic Usage

CLI Usage

Run the connector via Poetry script:

# Display connector specification
source-outreach spec

# Test connection with configuration
source-outreach check --config config.json

# Discover available data streams
source-outreach discover --config config.json

# Extract data from Outreach
source-outreach read --config config.json --catalog catalog.json

Programmatic Usage

from source_outreach import SourceOutreach
from airbyte_cdk.entrypoint import launch
import sys

# Create and run the connector
source = SourceOutreach()
launch(source, sys.argv[1:])

Configuration

Create a config.json file with your Outreach OAuth credentials:

{
  "client_id": "your_oauth_client_id",
  "client_secret": "your_oauth_client_secret", 
  "refresh_token": "your_oauth_refresh_token",
  "redirect_uri": "https://your-app.com/oauth/callback",
  "start_date": "2020-11-16T00:00:00Z"
}

Architecture

The connector follows Airbyte's declarative YAML-based framework:

  • Declarative Configuration: All stream definitions, authentication, and sync logic defined in manifest.yaml
  • OAuth Authentication: Automatic token refresh via Outreach's OAuth 2.0 API
  • Custom Components: Extended extraction and incremental sync for Outreach-specific data handling
  • 17 Data Streams: Comprehensive coverage of Outreach entities (prospects, accounts, sequences, etc.)
  • Incremental Sync: Cursor-based synchronization using updatedAt timestamps

Capabilities

Main Connector Class

The primary entry point for creating and running the Outreach source connector.

class SourceOutreach(YamlDeclarativeSource):
    """
    Declarative source connector for Outreach API data extraction.
    
    Inherits from YamlDeclarativeSource and loads configuration from manifest.yaml.
    """
    
    def __init__(self):
        """Initialize connector with manifest.yaml configuration."""

Entry Point Functions

Functions for launching the connector via CLI or programmatically.

def run():
    """
    Entry point function that creates SourceOutreach instance and launches it.
    
    Uses sys.argv[1:] for command line arguments and airbyte_cdk.entrypoint.launch
    for connector execution.
    """

Custom Data Extraction

Advanced component for handling Outreach API response transformation.

class CustomExtractor(RecordExtractor):
    """
    Custom record extractor for Outreach API responses.
    
    Transforms API responses by flattening relationship data and extracting
    records from the standard Outreach API format.
    """
    
    def extract_records(self, response: requests.Response, **kwargs) -> List[Mapping[str, Any]]:
        """
        Extract and transform records from Outreach API response.
        
        Parameters:
        - response: requests.Response object from Outreach API
        - **kwargs: Additional extraction parameters
        
        Returns:
        List of transformed record dictionaries with flattened relationships
        """

Incremental Synchronization

Custom cursor-based incremental sync implementation.

class CustomIncrementalSync(DatetimeBasedCursor):
    """
    Custom incremental sync cursor for Outreach streams.
    
    Implements cursor-based incremental synchronization using updatedAt timestamps
    with Outreach-specific filtering parameters.
    """
    
    def get_request_params(
        self,
        *,
        stream_state: Optional[StreamState] = None,
        stream_slice: Optional[StreamSlice] = None,
        next_page_token: Optional[Mapping[str, Any]] = None,
    ) -> MutableMapping[str, Any]:
        """
        Generate request parameters for incremental sync.
        
        Parameters:
        - stream_state: Current stream state with cursor values
        - stream_slice: Stream slice configuration
        - next_page_token: Pagination token
        
        Returns:
        Dictionary of request parameters including filter[updatedAt] for incremental sync
        """

Data Streams

The connector provides access to 17 Outreach API endpoints as data streams:

Core Entity Streams

  • accounts - Company/account records from /accounts
  • prospects - Contact/prospect records from /prospects
  • opportunities - Sales opportunity records from /opportunities
  • users - User account records from /users

Communication Streams

  • calls - Call activity records from /calls
  • mailings - Email mailing records from /mailings
  • mailboxes - Mailbox configuration from /mailboxes
  • templates - Email template definitions from /templates
  • snippets - Email template snippets from /snippets

Sequence & Campaign Streams

  • sequences - Email sequence definitions from /sequences
  • sequence_states - Prospect sequence state tracking from /sequence_states
  • sequence_steps - Individual sequence steps from /sequence_steps

Reference Data Streams

  • call_dispositions - Call disposition types from /callDispositions
  • call_purposes - Call purpose categories from /callPurposes
  • personas - Buyer persona definitions from /personas
  • stages - Sales stage definitions from /stages
  • tasks - Task/activity records from /tasks

Stream Characteristics

All streams support the following features:

# Stream configuration attributes
PRIMARY_KEY = "id"  # Integer primary key
CURSOR_FIELD = "updatedAt"  # Datetime cursor for incremental sync
SYNC_MODES = ["full_refresh", "incremental"]  # Supported sync modes
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"  # ISO datetime format
BASE_URL = "https://api.outreach.io/api/v2/"  # API base URL

Authentication

OAuth 2.0 configuration for Outreach API access:

# OAuth configuration (defined in manifest.yaml)
OAUTH_CONFIG = {
    "client_id": "config['client_id']",
    "client_secret": "config['client_secret']", 
    "refresh_token": "config['refresh_token']",
    "redirect_uri": "config['redirect_uri']",
    "token_refresh_endpoint": "https://api.outreach.io/oauth/token",
    "grant_type": "refresh_token"
}

Request Parameters

Default request parameters applied to all API calls:

DEFAULT_PARAMS = {
    "count": "false",           # Disable response count
    "sort": "updatedAt",        # Sort by update timestamp
    "page[size]": "1000"        # Page size for pagination
}

# Incremental sync filter format
INCREMENTAL_FILTER = "filter[updatedAt]={cursor_value}..inf"

Types

Core type definitions for the connector:

from typing import Any, Dict, List, Mapping, MutableMapping, Optional
from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState
import requests

# Type aliases for connector usage
ConfigDict = Dict[str, Any]
RecordDict = Mapping[str, Any] 
RecordList = List[RecordDict]
RequestParams = MutableMapping[str, Any]

Error Handling

The connector handles common integration scenarios:

  • Authentication Errors: Automatic OAuth token refresh on 401 responses
  • Rate Limiting: Automatic retry with exponential backoff
  • API Errors: Graceful handling of 4xx/5xx responses with detailed logging
  • Network Issues: Connection timeout and retry logic
  • Configuration Errors: Validation of required OAuth parameters

Usage Examples

Running Discovery

import subprocess
import json

# Discover available streams
result = subprocess.run([
    "source-outreach", "discover", 
    "--config", "config.json"
], capture_output=True, text=True)

catalog = json.loads(result.stdout)
streams = [stream["name"] for stream in catalog["streams"]]
print(f"Available streams: {streams}")

Programmatic Data Extraction

from source_outreach import SourceOutreach
from airbyte_cdk.models import AirbyteMessage, Type as MessageType
import json

# Configure the source
config = {
    "client_id": "your_client_id",
    "client_secret": "your_client_secret", 
    "refresh_token": "your_refresh_token",
    "redirect_uri": "https://your-app.com/oauth/callback",
    "start_date": "2024-01-01T00:00:00Z"
}

# Create connector instance
source = SourceOutreach()

# Read data (this would typically be called by Airbyte platform)
catalog = source.discover(None, config)
configured_catalog = {
    "streams": [{
        "stream": stream.json_schema,
        "sync_mode": "incremental",
        "destination_sync_mode": "append"
    } for stream in catalog.streams[:1]]  # Just first stream for example
}

# Extract records
for message in source.read(None, config, configured_catalog):
    if message.type == MessageType.RECORD:
        print(f"Record from {message.record.stream}: {message.record.data}")