Airbyte source connector for extracting sales engagement data from the Outreach platform via REST API.
npx @tessl/cli install tessl/pypi-source-outreach@1.0.00
# Airbyte Source Outreach
1
2
An Airbyte source connector that enables data extraction from Outreach, a sales engagement platform. Built using the Airbyte CDK (Connector Development Kit) and configured as a declarative YAML-based connector, it provides automated data synchronization from Outreach's REST API to various data destinations.
3
4
## Package Information
5
6
- **Package Name**: source-outreach
7
- **Package Type**: pypi (Airbyte source connector)
8
- **Language**: Python (^3.9,<3.12)
9
- **Installation**: `pip install source-outreach` or via Poetry: `poetry install`
10
- **Dependencies**: airbyte-cdk (^0)
11
12
## Core Imports
13
14
```python
15
from source_outreach import SourceOutreach
16
from source_outreach.run import run
17
```
18
19
For custom components (advanced usage):
20
```python
21
from source_outreach.components import CustomExtractor, CustomIncrementalSync
22
```
23
24
## Basic Usage
25
26
### CLI Usage
27
28
Run the connector via Poetry script:
29
30
```bash
31
# Display connector specification
32
source-outreach spec
33
34
# Test connection with configuration
35
source-outreach check --config config.json
36
37
# Discover available data streams
38
source-outreach discover --config config.json
39
40
# Extract data from Outreach
41
source-outreach read --config config.json --catalog catalog.json
42
```
43
44
### Programmatic Usage
45
46
```python
47
from source_outreach import SourceOutreach
48
from airbyte_cdk.entrypoint import launch
49
import sys
50
51
# Create and run the connector
52
source = SourceOutreach()
53
launch(source, sys.argv[1:])
54
```
55
56
### Configuration
57
58
Create a `config.json` file with your Outreach OAuth credentials:
59
60
```json
61
{
62
"client_id": "your_oauth_client_id",
63
"client_secret": "your_oauth_client_secret",
64
"refresh_token": "your_oauth_refresh_token",
65
"redirect_uri": "https://your-app.com/oauth/callback",
66
"start_date": "2020-11-16T00:00:00Z"
67
}
68
```
69
70
## Architecture
71
72
The connector follows Airbyte's declarative YAML-based framework:
73
74
- **Declarative Configuration**: All stream definitions, authentication, and sync logic defined in `manifest.yaml`
75
- **OAuth Authentication**: Automatic token refresh via Outreach's OAuth 2.0 API
76
- **Custom Components**: Extended extraction and incremental sync for Outreach-specific data handling
77
- **17 Data Streams**: Comprehensive coverage of Outreach entities (prospects, accounts, sequences, etc.)
78
- **Incremental Sync**: Cursor-based synchronization using `updatedAt` timestamps
79
80
## Capabilities
81
82
### Main Connector Class
83
84
The primary entry point for creating and running the Outreach source connector.
85
86
```python { .api }
87
class SourceOutreach(YamlDeclarativeSource):
88
"""
89
Declarative source connector for Outreach API data extraction.
90
91
Inherits from YamlDeclarativeSource and loads configuration from manifest.yaml.
92
"""
93
94
def __init__(self):
95
"""Initialize connector with manifest.yaml configuration."""
96
```
97
98
### Entry Point Functions
99
100
Functions for launching the connector via CLI or programmatically.
101
102
```python { .api }
103
def run():
104
"""
105
Entry point function that creates SourceOutreach instance and launches it.
106
107
Uses sys.argv[1:] for command line arguments and airbyte_cdk.entrypoint.launch
108
for connector execution.
109
"""
110
```
111
112
### Custom Data Extraction
113
114
Advanced component for handling Outreach API response transformation.
115
116
```python { .api }
117
class CustomExtractor(RecordExtractor):
118
"""
119
Custom record extractor for Outreach API responses.
120
121
Transforms API responses by flattening relationship data and extracting
122
records from the standard Outreach API format.
123
"""
124
125
def extract_records(self, response: requests.Response, **kwargs) -> List[Mapping[str, Any]]:
126
"""
127
Extract and transform records from Outreach API response.
128
129
Parameters:
130
- response: requests.Response object from Outreach API
131
- **kwargs: Additional extraction parameters
132
133
Returns:
134
List of transformed record dictionaries with flattened relationships
135
"""
136
```
137
138
### Incremental Synchronization
139
140
Custom cursor-based incremental sync implementation.
141
142
```python { .api }
143
class CustomIncrementalSync(DatetimeBasedCursor):
144
"""
145
Custom incremental sync cursor for Outreach streams.
146
147
Implements cursor-based incremental synchronization using updatedAt timestamps
148
with Outreach-specific filtering parameters.
149
"""
150
151
def get_request_params(
152
self,
153
*,
154
stream_state: Optional[StreamState] = None,
155
stream_slice: Optional[StreamSlice] = None,
156
next_page_token: Optional[Mapping[str, Any]] = None,
157
) -> MutableMapping[str, Any]:
158
"""
159
Generate request parameters for incremental sync.
160
161
Parameters:
162
- stream_state: Current stream state with cursor values
163
- stream_slice: Stream slice configuration
164
- next_page_token: Pagination token
165
166
Returns:
167
Dictionary of request parameters including filter[updatedAt] for incremental sync
168
"""
169
```
170
171
## Data Streams
172
173
The connector provides access to 17 Outreach API endpoints as data streams:
174
175
### Core Entity Streams
176
177
- **accounts** - Company/account records from `/accounts`
178
- **prospects** - Contact/prospect records from `/prospects`
179
- **opportunities** - Sales opportunity records from `/opportunities`
180
- **users** - User account records from `/users`
181
182
### Communication Streams
183
184
- **calls** - Call activity records from `/calls`
185
- **mailings** - Email mailing records from `/mailings`
186
- **mailboxes** - Mailbox configuration from `/mailboxes`
187
- **templates** - Email template definitions from `/templates`
188
- **snippets** - Email template snippets from `/snippets`
189
190
### Sequence & Campaign Streams
191
192
- **sequences** - Email sequence definitions from `/sequences`
193
- **sequence_states** - Prospect sequence state tracking from `/sequence_states`
194
- **sequence_steps** - Individual sequence steps from `/sequence_steps`
195
196
### Reference Data Streams
197
198
- **call_dispositions** - Call disposition types from `/callDispositions`
199
- **call_purposes** - Call purpose categories from `/callPurposes`
200
- **personas** - Buyer persona definitions from `/personas`
201
- **stages** - Sales stage definitions from `/stages`
202
- **tasks** - Task/activity records from `/tasks`
203
204
## Stream Characteristics
205
206
All streams support the following features:
207
208
```python { .api }
209
# Stream configuration attributes
210
PRIMARY_KEY = "id" # Integer primary key
211
CURSOR_FIELD = "updatedAt" # Datetime cursor for incremental sync
212
SYNC_MODES = ["full_refresh", "incremental"] # Supported sync modes
213
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" # ISO datetime format
214
BASE_URL = "https://api.outreach.io/api/v2/" # API base URL
215
```
216
217
## Authentication
218
219
OAuth 2.0 configuration for Outreach API access:
220
221
```python { .api }
222
# OAuth configuration (defined in manifest.yaml)
223
OAUTH_CONFIG = {
224
"client_id": "config['client_id']",
225
"client_secret": "config['client_secret']",
226
"refresh_token": "config['refresh_token']",
227
"redirect_uri": "config['redirect_uri']",
228
"token_refresh_endpoint": "https://api.outreach.io/oauth/token",
229
"grant_type": "refresh_token"
230
}
231
```
232
233
## Request Parameters
234
235
Default request parameters applied to all API calls:
236
237
```python { .api }
238
DEFAULT_PARAMS = {
239
"count": "false", # Disable response count
240
"sort": "updatedAt", # Sort by update timestamp
241
"page[size]": "1000" # Page size for pagination
242
}
243
244
# Incremental sync filter format
245
INCREMENTAL_FILTER = "filter[updatedAt]={cursor_value}..inf"
246
```
247
248
## Types
249
250
Core type definitions for the connector:
251
252
```python { .api }
253
from typing import Any, Dict, List, Mapping, MutableMapping, Optional
254
from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState
255
import requests
256
257
# Type aliases for connector usage
258
ConfigDict = Dict[str, Any]
259
RecordDict = Mapping[str, Any]
260
RecordList = List[RecordDict]
261
RequestParams = MutableMapping[str, Any]
262
```
263
264
## Error Handling
265
266
The connector handles common integration scenarios:
267
268
- **Authentication Errors**: Automatic OAuth token refresh on 401 responses
269
- **Rate Limiting**: Automatic retry with exponential backoff
270
- **API Errors**: Graceful handling of 4xx/5xx responses with detailed logging
271
- **Network Issues**: Connection timeout and retry logic
272
- **Configuration Errors**: Validation of required OAuth parameters
273
274
## Usage Examples
275
276
### Running Discovery
277
278
```python
279
import subprocess
280
import json
281
282
# Discover available streams
283
result = subprocess.run([
284
"source-outreach", "discover",
285
"--config", "config.json"
286
], capture_output=True, text=True)
287
288
catalog = json.loads(result.stdout)
289
streams = [stream["name"] for stream in catalog["streams"]]
290
print(f"Available streams: {streams}")
291
```
292
293
### Programmatic Data Extraction
294
295
```python
296
from source_outreach import SourceOutreach
297
from airbyte_cdk.models import AirbyteMessage, Type as MessageType
298
import json
299
300
# Configure the source
301
config = {
302
"client_id": "your_client_id",
303
"client_secret": "your_client_secret",
304
"refresh_token": "your_refresh_token",
305
"redirect_uri": "https://your-app.com/oauth/callback",
306
"start_date": "2024-01-01T00:00:00Z"
307
}
308
309
# Create connector instance
310
source = SourceOutreach()
311
312
# Read data (this would typically be called by Airbyte platform)
313
catalog = source.discover(None, config)
314
configured_catalog = {
315
"streams": [{
316
"stream": stream.json_schema,
317
"sync_mode": "incremental",
318
"destination_sync_mode": "append"
319
} for stream in catalog.streams[:1]] # Just first stream for example
320
}
321
322
# Extract records
323
for message in source.read(None, config, configured_catalog):
324
if message.type == MessageType.RECORD:
325
print(f"Record from {message.record.stream}: {message.record.data}")
326
```