0
# Airbyte Source PostHog
1
2
An Airbyte connector for extracting analytics data from PostHog, an open-source product analytics platform. This connector enables data synchronization from PostHog's API to various destinations, supporting multiple data streams with incremental synchronization capabilities for events data.
3
4
## Package Information
5
6
- **Package Name**: source-posthog
7
- **Package Type**: python (pypi)
8
- **Language**: Python
9
- **Installation**: `pip install source-posthog` or `poetry add source-posthog`
10
- **Version**: 1.1.25
11
12
## Core Imports
13
14
```python
15
from source_posthog import SourcePosthog
16
```
17
18
For running the connector directly:
19
20
```python
21
from source_posthog.run import run
22
```
23
24
For custom component implementations:
25
26
```python
27
from source_posthog.components import EventsSimpleRetriever, EventsCartesianProductStreamSlicer
28
```
29
30
## Basic Usage
31
32
### Running as Airbyte Connector
33
34
```python
35
from source_posthog import SourcePosthog
36
37
# Initialize the source connector
38
source = SourcePosthog()
39
40
# Use with Airbyte CDK launch function
41
from airbyte_cdk.entrypoint import launch
42
import sys
43
44
launch(source, sys.argv[1:])
45
```
46
47
### Command Line Usage
48
49
```bash
50
# Install the connector
51
pip install source-posthog
52
53
# Run connector commands
54
source-posthog spec
55
source-posthog check --config config.json
56
source-posthog discover --config config.json
57
source-posthog read --config config.json --catalog catalog.json
58
```
59
60
### Configuration Example
61
62
```json
63
{
64
"api_key": "your-posthog-api-key",
65
"start_date": "2021-01-01T00:00:00Z",
66
"base_url": "https://app.posthog.com",
67
"events_time_step": 30
68
}
69
```
70
71
## Architecture
72
73
The connector is built using Airbyte's declarative YAML-based configuration system, which provides:
74
75
- **Declarative Configuration**: All stream definitions, authentication, and pagination logic are defined in `manifest.yaml`
76
- **Custom Components**: Python classes for handling PostHog-specific API behaviors like nested state management and descending order pagination
77
- **Schema-Driven**: JSON schema files define the structure for each data stream
78
- **Project-Based Partitioning**: Data streams are partitioned by PostHog projects with individual cursor state management
79
80
## Capabilities
81
82
### Source Connector
83
84
The main connector class providing PostHog data extraction capabilities with declarative YAML configuration.
85
86
```python { .api }
87
class SourcePosthog(YamlDeclarativeSource):
88
def __init__(self):
89
"""Initialize PostHog source connector with manifest.yaml configuration."""
90
```
91
92
### Entry Point Function
93
94
Main function for launching the connector via command line or programmatic execution.
95
96
```python { .api }
97
def run() -> None:
98
"""
99
Main entry point for running the PostHog source connector.
100
Creates SourcePosthog instance and launches it using Airbyte CDK.
101
"""
102
```
103
104
### Events Stream Components
105
106
Custom components for handling PostHog Events API specific behaviors including descending order pagination and nested state management.
107
108
```python { .api }
109
class EventsSimpleRetriever(SimpleRetriever):
110
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
111
"""Post-initialization setup for cursor handling."""
112
113
def request_params(
114
self,
115
stream_state: StreamSlice,
116
stream_slice: Optional[StreamSlice] = None,
117
next_page_token: Optional[Mapping[str, Any]] = None,
118
) -> MutableMapping[str, Any]:
119
"""
120
Generate request parameters for PostHog Events API.
121
Handles descending order pagination where next_page_token
122
contains 'after'/'before' params that override stream_slice params.
123
124
Returns:
125
Request parameters dictionary with pagination handling
126
"""
127
```
128
129
```python { .api }
130
class EventsCartesianProductStreamSlicer(Cursor, CartesianProductStreamSlicer):
131
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
132
"""Initialize cursor and parameters for nested state management."""
133
134
def get_stream_state(self) -> Mapping[str, Any]:
135
"""
136
Get current cursor state supporting nested project states.
137
138
Returns:
139
State dictionary with project-specific timestamps
140
"""
141
142
def set_initial_state(self, stream_state: StreamState) -> None:
143
"""Set initial cursor state from previous sync."""
144
145
def close_slice(self, stream_slice: StreamSlice, most_recent_record: Optional[Record]) -> None:
146
"""Update cursor with most recent record timestamp for the project."""
147
148
def stream_slices(self) -> Iterable[StreamSlice]:
149
"""
150
Generate datetime slices for each project with project-specific state handling.
151
Supports both old-style and new nested state formats.
152
153
Returns:
154
Iterable of stream slices with project_id and datetime range
155
"""
156
157
def should_be_synced(self, record: Record) -> bool:
158
"""Determine if record should be synced (always True for PostHog)."""
159
160
def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
161
"""Compare records by timestamp for cursor ordering."""
162
```
163
164
### Configuration Specification
165
166
The connector accepts the following configuration parameters:
167
168
```python { .api }
169
class PostHogConfig:
170
api_key: str # Required: PostHog API key for authentication
171
start_date: str # Required: Start date in ISO format (YYYY-MM-DDTHH:MM:SSZ)
172
base_url: str = "https://app.posthog.com" # Optional: PostHog instance URL
173
events_time_step: int = 30 # Optional: Events stream slice size in days (1-91)
174
```
175
176
### Data Streams
177
178
The connector provides seven data streams from PostHog API:
179
180
```python { .api }
181
class PostHogStreams:
182
projects: Stream # Project information and metadata
183
cohorts: Stream # User cohort definitions (per project)
184
feature_flags: Stream # Feature flag configurations (per project)
185
persons: Stream # Person/user data (per project)
186
events: Stream # Event data with incremental sync (per project)
187
annotations: Stream # Event annotations (per project)
188
insights: Stream # Dashboard insights (per project)
189
```
190
191
### Stream Characteristics
192
193
All streams except `projects` are partitioned by project ID and use the following pattern:
194
195
```python { .api }
196
class StreamConfig:
197
primary_key: str = "id" # Primary key for all streams
198
partition_field: str = "project_id" # Partitioning field for project-based streams
199
pagination_strategy: str = "OffsetIncrement" # Pagination method
200
page_size: int = 100 # Default page size (10000 for events)
201
```
202
203
### Incremental Synchronization
204
205
The events stream supports incremental synchronization with project-specific cursor state:
206
207
```python { .api }
208
class IncrementalConfig:
209
cursor_field: str = "timestamp" # Cursor field for incremental sync
210
cursor_datetime_formats: list[str] = [
211
"%Y-%m-%dT%H:%M:%S.%f%z",
212
"%Y-%m-%dT%H:%M:%S+00:00"
213
]
214
cursor_granularity: str = "PT0.000001S" # Microsecond precision
215
step: str = "P{events_time_step}D" # Configurable time step in days
216
```
217
218
### Authentication
219
220
Bearer token authentication using PostHog API key:
221
222
```python { .api }
223
class AuthConfig:
224
type: str = "BearerAuthenticator"
225
api_token: str # From config['api_key']
226
header_format: str = "Bearer {api_token}"
227
```
228
229
## Data Types
230
231
### Project Data Structure
232
233
```python { .api }
234
class Project:
235
id: int # Project ID
236
uuid: str # Project UUID
237
organization: str # Organization name
238
api_token: str # Project API token
239
name: str # Project name
240
completed_snippet_onboarding: bool # Onboarding status
241
ingested_event: bool # Event ingestion status
242
is_demo: bool # Demo project flag
243
timezone: str # Project timezone
244
access_control: bool # Access control enabled
245
effective_membership_level: int # User membership level
246
```
247
248
### Event Data Structure
249
250
```python { .api }
251
class Event:
252
id: str # Event ID
253
distinct_id: str # User distinct ID
254
properties: dict # Event properties
255
event: str # Event name
256
timestamp: str # Event timestamp (ISO format)
257
person: Person # Associated person data
258
elements: list[Union[str, dict]] # UI elements
259
elements_chain: str # Element chain string
260
261
class Person:
262
is_identified: bool # Person identification status
263
distinct_ids: list[str] # List of distinct IDs
264
properties: dict # Person properties
265
```
266
267
### Stream State Format
268
269
For incremental synchronization, the connector maintains nested state per project:
270
271
```python { .api }
272
class StreamState:
273
# New format (per project)
274
project_states: dict[str, ProjectState]
275
276
# Legacy format (backward compatibility)
277
timestamp: Optional[str]
278
279
class ProjectState:
280
timestamp: str # Last synced timestamp for the project
281
```
282
283
## Error Handling
284
285
The connector handles PostHog API-specific behaviors:
286
287
- Events API returns records in descending order (newest first)
288
- Custom pagination where `next` URL contains datetime ranges
289
- Support for both old-style and nested state formats
290
- Project-specific error handling and retry logic
291
- Time step configuration for handling large event datasets
292
293
## Usage Examples
294
295
### Basic Connector Setup
296
297
```python
298
from source_posthog import SourcePosthog
299
from airbyte_cdk.entrypoint import launch
300
301
def main():
302
source = SourcePosthog()
303
launch(source, ["check", "--config", "config.json"])
304
305
if __name__ == "__main__":
306
main()
307
```
308
309
### Custom Component Usage
310
311
```python
312
from source_posthog.components import EventsCartesianProductStreamSlicer
313
from airbyte_cdk.sources.declarative.types import StreamSlice
314
315
# Initialize custom slicer
316
slicer = EventsCartesianProductStreamSlicer()
317
318
# Set initial state
319
initial_state = {
320
"project_123": {"timestamp": "2021-01-01T00:00:00.000000Z"},
321
"project_456": {"timestamp": "2021-02-01T00:00:00.000000Z"}
322
}
323
slicer.set_initial_state(initial_state)
324
325
# Generate slices
326
for slice in slicer.stream_slices():
327
print(f"Project: {slice['project_id']}, Start: {slice['start_time']}, End: {slice['end_time']}")
328
```
329
330
### Configuration Validation
331
332
```python
333
import json
334
from source_posthog import SourcePosthog
335
336
# Load configuration
337
with open("config.json") as f:
338
config = json.load(f)
339
340
# Validate configuration
341
source = SourcePosthog()
342
connection_status = source.check(None, config)
343
344
if connection_status.status.name == "SUCCEEDED":
345
print("Configuration is valid")
346
else:
347
print(f"Configuration error: {connection_status.message}")
348
```