CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-source-outreach

Airbyte source connector for extracting sales engagement data from the Outreach platform via REST API.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

Airbyte Source Outreach

An Airbyte source connector that enables data extraction from Outreach, a sales engagement platform. Built using the Airbyte CDK (Connector Development Kit) and configured as a declarative YAML-based connector, it provides automated data synchronization from Outreach's REST API to various data destinations.

Package Information

  • Package Name: source-outreach
  • Package Type: pypi (Airbyte source connector)
  • Language: Python (^3.9,<3.12)
  • Installation: pip install source-outreach or via Poetry: poetry install
  • Dependencies: airbyte-cdk (^0)

Core Imports

from source_outreach import SourceOutreach
from source_outreach.run import run

For custom components (advanced usage):

from source_outreach.components import CustomExtractor, CustomIncrementalSync

Basic Usage

CLI Usage

Run the connector via Poetry script:

# Display connector specification
source-outreach spec

# Test connection with configuration
source-outreach check --config config.json

# Discover available data streams
source-outreach discover --config config.json

# Extract data from Outreach
source-outreach read --config config.json --catalog catalog.json

Programmatic Usage

from source_outreach import SourceOutreach
from airbyte_cdk.entrypoint import launch
import sys

# Create and run the connector
source = SourceOutreach()
launch(source, sys.argv[1:])

Configuration

Create a config.json file with your Outreach OAuth credentials:

{
  "client_id": "your_oauth_client_id",
  "client_secret": "your_oauth_client_secret", 
  "refresh_token": "your_oauth_refresh_token",
  "redirect_uri": "https://your-app.com/oauth/callback",
  "start_date": "2020-11-16T00:00:00Z"
}

Architecture

The connector follows Airbyte's declarative YAML-based framework:

  • Declarative Configuration: All stream definitions, authentication, and sync logic defined in manifest.yaml
  • OAuth Authentication: Automatic token refresh via Outreach's OAuth 2.0 API
  • Custom Components: Extended extraction and incremental sync for Outreach-specific data handling
  • 17 Data Streams: Comprehensive coverage of Outreach entities (prospects, accounts, sequences, etc.)
  • Incremental Sync: Cursor-based synchronization using updatedAt timestamps

Capabilities

Main Connector Class

The primary entry point for creating and running the Outreach source connector.

class SourceOutreach(YamlDeclarativeSource):
    """
    Declarative source connector for Outreach API data extraction.
    
    Inherits from YamlDeclarativeSource and loads configuration from manifest.yaml.
    """
    
    def __init__(self):
        """Initialize connector with manifest.yaml configuration."""

Entry Point Functions

Functions for launching the connector via CLI or programmatically.

def run():
    """
    Entry point function that creates SourceOutreach instance and launches it.
    
    Uses sys.argv[1:] for command line arguments and airbyte_cdk.entrypoint.launch
    for connector execution.
    """

Custom Data Extraction

Advanced component for handling Outreach API response transformation.

class CustomExtractor(RecordExtractor):
    """
    Custom record extractor for Outreach API responses.
    
    Transforms API responses by flattening relationship data and extracting
    records from the standard Outreach API format.
    """
    
    def extract_records(self, response: requests.Response, **kwargs) -> List[Mapping[str, Any]]:
        """
        Extract and transform records from Outreach API response.
        
        Parameters:
        - response: requests.Response object from Outreach API
        - **kwargs: Additional extraction parameters
        
        Returns:
        List of transformed record dictionaries with flattened relationships
        """

Incremental Synchronization

Custom cursor-based incremental sync implementation.

class CustomIncrementalSync(DatetimeBasedCursor):
    """
    Custom incremental sync cursor for Outreach streams.
    
    Implements cursor-based incremental synchronization using updatedAt timestamps
    with Outreach-specific filtering parameters.
    """
    
    def get_request_params(
        self,
        *,
        stream_state: Optional[StreamState] = None,
        stream_slice: Optional[StreamSlice] = None,
        next_page_token: Optional[Mapping[str, Any]] = None,
    ) -> MutableMapping[str, Any]:
        """
        Generate request parameters for incremental sync.
        
        Parameters:
        - stream_state: Current stream state with cursor values
        - stream_slice: Stream slice configuration
        - next_page_token: Pagination token
        
        Returns:
        Dictionary of request parameters including filter[updatedAt] for incremental sync
        """

Data Streams

The connector provides access to 17 Outreach API endpoints as data streams:

Core Entity Streams

  • accounts - Company/account records from /accounts
  • prospects - Contact/prospect records from /prospects
  • opportunities - Sales opportunity records from /opportunities
  • users - User account records from /users

Communication Streams

  • calls - Call activity records from /calls
  • mailings - Email mailing records from /mailings
  • mailboxes - Mailbox configuration from /mailboxes
  • templates - Email template definitions from /templates
  • snippets - Email template snippets from /snippets

Sequence & Campaign Streams

  • sequences - Email sequence definitions from /sequences
  • sequence_states - Prospect sequence state tracking from /sequence_states
  • sequence_steps - Individual sequence steps from /sequence_steps

Reference Data Streams

  • call_dispositions - Call disposition types from /callDispositions
  • call_purposes - Call purpose categories from /callPurposes
  • personas - Buyer persona definitions from /personas
  • stages - Sales stage definitions from /stages
  • tasks - Task/activity records from /tasks

Stream Characteristics

All streams support the following features:

# Stream configuration attributes
PRIMARY_KEY = "id"  # Integer primary key
CURSOR_FIELD = "updatedAt"  # Datetime cursor for incremental sync
SYNC_MODES = ["full_refresh", "incremental"]  # Supported sync modes
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"  # ISO datetime format
BASE_URL = "https://api.outreach.io/api/v2/"  # API base URL

Authentication

OAuth 2.0 configuration for Outreach API access:

# OAuth configuration (defined in manifest.yaml)
OAUTH_CONFIG = {
    "client_id": "config['client_id']",
    "client_secret": "config['client_secret']", 
    "refresh_token": "config['refresh_token']",
    "redirect_uri": "config['redirect_uri']",
    "token_refresh_endpoint": "https://api.outreach.io/oauth/token",
    "grant_type": "refresh_token"
}

Request Parameters

Default request parameters applied to all API calls:

DEFAULT_PARAMS = {
    "count": "false",           # Disable response count
    "sort": "updatedAt",        # Sort by update timestamp
    "page[size]": "1000"        # Page size for pagination
}

# Incremental sync filter format
INCREMENTAL_FILTER = "filter[updatedAt]={cursor_value}..inf"

Types

Core type definitions for the connector:

from typing import Any, Dict, List, Mapping, MutableMapping, Optional
from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState
import requests

# Type aliases for connector usage
ConfigDict = Dict[str, Any]
RecordDict = Mapping[str, Any] 
RecordList = List[RecordDict]
RequestParams = MutableMapping[str, Any]

Error Handling

The connector handles common integration scenarios:

  • Authentication Errors: Automatic OAuth token refresh on 401 responses
  • Rate Limiting: Automatic retry with exponential backoff
  • API Errors: Graceful handling of 4xx/5xx responses with detailed logging
  • Network Issues: Connection timeout and retry logic
  • Configuration Errors: Validation of required OAuth parameters

Usage Examples

Running Discovery

import subprocess
import json

# Discover available streams
result = subprocess.run([
    "source-outreach", "discover", 
    "--config", "config.json"
], capture_output=True, text=True)

catalog = json.loads(result.stdout)
streams = [stream["name"] for stream in catalog["streams"]]
print(f"Available streams: {streams}")

Programmatic Data Extraction

from source_outreach import SourceOutreach
from airbyte_cdk.models import AirbyteMessage, Type as MessageType
import json

# Configure the source
config = {
    "client_id": "your_client_id",
    "client_secret": "your_client_secret", 
    "refresh_token": "your_refresh_token",
    "redirect_uri": "https://your-app.com/oauth/callback",
    "start_date": "2024-01-01T00:00:00Z"
}

# Create connector instance
source = SourceOutreach()

# Read data (this would typically be called by Airbyte platform)
catalog = source.discover(None, config)
configured_catalog = {
    "streams": [{
        "stream": stream.json_schema,
        "sync_mode": "incremental",
        "destination_sync_mode": "append"
    } for stream in catalog.streams[:1]]  # Just first stream for example
}

# Extract records
for message in source.read(None, config, configured_catalog):
    if message.type == MessageType.RECORD:
        print(f"Record from {message.record.stream}: {message.record.data}")

Install with Tessl CLI

npx tessl i tessl/pypi-source-outreach
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/source-outreach@1.0.x
Publish Source
CLI
Badge
tessl/pypi-source-outreach badge