Airbyte source connector for extracting analytics data from PostHog platform with support for multiple data streams and incremental synchronization
npx @tessl/cli install tessl/pypi-airbyte-source-posthog@1.1.0An Airbyte connector for extracting analytics data from PostHog, an open-source product analytics platform. This connector enables data synchronization from PostHog's API to various destinations, supporting multiple data streams with incremental synchronization capabilities for events data.
pip install source-posthog or poetry add source-posthogfrom source_posthog import SourcePosthogFor running the connector directly:
from source_posthog.run import runFor custom component implementations:
from source_posthog.components import EventsSimpleRetriever, EventsCartesianProductStreamSlicerfrom source_posthog import SourcePosthog
# Initialize the source connector
source = SourcePosthog()
# Use with Airbyte CDK launch function
from airbyte_cdk.entrypoint import launch
import sys
launch(source, sys.argv[1:])# Install the connector
pip install source-posthog
# Run connector commands
source-posthog spec
source-posthog check --config config.json
source-posthog discover --config config.json
source-posthog read --config config.json --catalog catalog.json{
"api_key": "your-posthog-api-key",
"start_date": "2021-01-01T00:00:00Z",
"base_url": "https://app.posthog.com",
"events_time_step": 30
}The connector is built using Airbyte's declarative YAML-based configuration system, which provides:
manifest.yamlThe main connector class providing PostHog data extraction capabilities with declarative YAML configuration.
class SourcePosthog(YamlDeclarativeSource):
def __init__(self):
"""Initialize PostHog source connector with manifest.yaml configuration."""Main function for launching the connector via command line or programmatic execution.
def run() -> None:
"""
Main entry point for running the PostHog source connector.
Creates SourcePosthog instance and launches it using Airbyte CDK.
"""Custom components for handling PostHog Events API specific behaviors including descending order pagination and nested state management.
class EventsSimpleRetriever(SimpleRetriever):
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
"""Post-initialization setup for cursor handling."""
def request_params(
self,
stream_state: StreamSlice,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
"""
Generate request parameters for PostHog Events API.
Handles descending order pagination where next_page_token
contains 'after'/'before' params that override stream_slice params.
Returns:
Request parameters dictionary with pagination handling
"""class EventsCartesianProductStreamSlicer(Cursor, CartesianProductStreamSlicer):
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
"""Initialize cursor and parameters for nested state management."""
def get_stream_state(self) -> Mapping[str, Any]:
"""
Get current cursor state supporting nested project states.
Returns:
State dictionary with project-specific timestamps
"""
def set_initial_state(self, stream_state: StreamState) -> None:
"""Set initial cursor state from previous sync."""
def close_slice(self, stream_slice: StreamSlice, most_recent_record: Optional[Record]) -> None:
"""Update cursor with most recent record timestamp for the project."""
def stream_slices(self) -> Iterable[StreamSlice]:
"""
Generate datetime slices for each project with project-specific state handling.
Supports both old-style and new nested state formats.
Returns:
Iterable of stream slices with project_id and datetime range
"""
def should_be_synced(self, record: Record) -> bool:
"""Determine if record should be synced (always True for PostHog)."""
def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
"""Compare records by timestamp for cursor ordering."""The connector accepts the following configuration parameters:
class PostHogConfig:
api_key: str # Required: PostHog API key for authentication
start_date: str # Required: Start date in ISO format (YYYY-MM-DDTHH:MM:SSZ)
base_url: str = "https://app.posthog.com" # Optional: PostHog instance URL
events_time_step: int = 30 # Optional: Events stream slice size in days (1-91)The connector provides seven data streams from PostHog API:
class PostHogStreams:
projects: Stream # Project information and metadata
cohorts: Stream # User cohort definitions (per project)
feature_flags: Stream # Feature flag configurations (per project)
persons: Stream # Person/user data (per project)
events: Stream # Event data with incremental sync (per project)
annotations: Stream # Event annotations (per project)
insights: Stream # Dashboard insights (per project)All streams except projects are partitioned by project ID and use the following pattern:
class StreamConfig:
primary_key: str = "id" # Primary key for all streams
partition_field: str = "project_id" # Partitioning field for project-based streams
pagination_strategy: str = "OffsetIncrement" # Pagination method
page_size: int = 100 # Default page size (10000 for events)The events stream supports incremental synchronization with project-specific cursor state:
class IncrementalConfig:
cursor_field: str = "timestamp" # Cursor field for incremental sync
cursor_datetime_formats: list[str] = [
"%Y-%m-%dT%H:%M:%S.%f%z",
"%Y-%m-%dT%H:%M:%S+00:00"
]
cursor_granularity: str = "PT0.000001S" # Microsecond precision
step: str = "P{events_time_step}D" # Configurable time step in daysBearer token authentication using PostHog API key:
class AuthConfig:
type: str = "BearerAuthenticator"
api_token: str # From config['api_key']
header_format: str = "Bearer {api_token}"class Project:
id: int # Project ID
uuid: str # Project UUID
organization: str # Organization name
api_token: str # Project API token
name: str # Project name
completed_snippet_onboarding: bool # Onboarding status
ingested_event: bool # Event ingestion status
is_demo: bool # Demo project flag
timezone: str # Project timezone
access_control: bool # Access control enabled
effective_membership_level: int # User membership levelclass Event:
id: str # Event ID
distinct_id: str # User distinct ID
properties: dict # Event properties
event: str # Event name
timestamp: str # Event timestamp (ISO format)
person: Person # Associated person data
elements: list[Union[str, dict]] # UI elements
elements_chain: str # Element chain string
class Person:
is_identified: bool # Person identification status
distinct_ids: list[str] # List of distinct IDs
properties: dict # Person propertiesFor incremental synchronization, the connector maintains nested state per project:
class StreamState:
# New format (per project)
project_states: dict[str, ProjectState]
# Legacy format (backward compatibility)
timestamp: Optional[str]
class ProjectState:
timestamp: str # Last synced timestamp for the projectThe connector handles PostHog API-specific behaviors:
next URL contains datetime rangesfrom source_posthog import SourcePosthog
from airbyte_cdk.entrypoint import launch
def main():
source = SourcePosthog()
launch(source, ["check", "--config", "config.json"])
if __name__ == "__main__":
main()from source_posthog.components import EventsCartesianProductStreamSlicer
from airbyte_cdk.sources.declarative.types import StreamSlice
# Initialize custom slicer
slicer = EventsCartesianProductStreamSlicer()
# Set initial state
initial_state = {
"project_123": {"timestamp": "2021-01-01T00:00:00.000000Z"},
"project_456": {"timestamp": "2021-02-01T00:00:00.000000Z"}
}
slicer.set_initial_state(initial_state)
# Generate slices
for slice in slicer.stream_slices():
print(f"Project: {slice['project_id']}, Start: {slice['start_time']}, End: {slice['end_time']}")import json
from source_posthog import SourcePosthog
# Load configuration
with open("config.json") as f:
config = json.load(f)
# Validate configuration
source = SourcePosthog()
connection_status = source.check(None, config)
if connection_status.status.name == "SUCCEEDED":
print("Configuration is valid")
else:
print(f"Configuration error: {connection_status.message}")