or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/pypi-airbyte-source-posthog

Airbyte source connector for extracting analytics data from PostHog platform with support for multiple data streams and incremental synchronization

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/airbyte-source-posthog@1.1.x

To install, run

npx @tessl/cli install tessl/pypi-airbyte-source-posthog@1.1.0

index.mddocs/

Airbyte Source PostHog

An Airbyte connector for extracting analytics data from PostHog, an open-source product analytics platform. This connector enables data synchronization from PostHog's API to various destinations, supporting multiple data streams with incremental synchronization capabilities for events data.

Package Information

  • Package Name: source-posthog
  • Package Type: python (pypi)
  • Language: Python
  • Installation: pip install source-posthog or poetry add source-posthog
  • Version: 1.1.25

Core Imports

from source_posthog import SourcePosthog

For running the connector directly:

from source_posthog.run import run

For custom component implementations:

from source_posthog.components import EventsSimpleRetriever, EventsCartesianProductStreamSlicer

Basic Usage

Running as Airbyte Connector

from source_posthog import SourcePosthog

# Initialize the source connector
source = SourcePosthog()

# Use with Airbyte CDK launch function
from airbyte_cdk.entrypoint import launch
import sys

launch(source, sys.argv[1:])

Command Line Usage

# Install the connector
pip install source-posthog

# Run connector commands
source-posthog spec
source-posthog check --config config.json
source-posthog discover --config config.json
source-posthog read --config config.json --catalog catalog.json

Configuration Example

{
  "api_key": "your-posthog-api-key",
  "start_date": "2021-01-01T00:00:00Z",
  "base_url": "https://app.posthog.com",
  "events_time_step": 30
}

Architecture

The connector is built using Airbyte's declarative YAML-based configuration system, which provides:

  • Declarative Configuration: All stream definitions, authentication, and pagination logic are defined in manifest.yaml
  • Custom Components: Python classes for handling PostHog-specific API behaviors like nested state management and descending order pagination
  • Schema-Driven: JSON schema files define the structure for each data stream
  • Project-Based Partitioning: Data streams are partitioned by PostHog projects with individual cursor state management

Capabilities

Source Connector

The main connector class providing PostHog data extraction capabilities with declarative YAML configuration.

class SourcePosthog(YamlDeclarativeSource):
    def __init__(self):
        """Initialize PostHog source connector with manifest.yaml configuration."""

Entry Point Function

Main function for launching the connector via command line or programmatic execution.

def run() -> None:
    """
    Main entry point for running the PostHog source connector.
    Creates SourcePosthog instance and launches it using Airbyte CDK.
    """

Events Stream Components

Custom components for handling PostHog Events API specific behaviors including descending order pagination and nested state management.

class EventsSimpleRetriever(SimpleRetriever):
    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
        """Post-initialization setup for cursor handling."""

    def request_params(
        self,
        stream_state: StreamSlice,
        stream_slice: Optional[StreamSlice] = None,
        next_page_token: Optional[Mapping[str, Any]] = None,
    ) -> MutableMapping[str, Any]:
        """
        Generate request parameters for PostHog Events API.
        Handles descending order pagination where next_page_token
        contains 'after'/'before' params that override stream_slice params.
        
        Returns:
        Request parameters dictionary with pagination handling
        """
class EventsCartesianProductStreamSlicer(Cursor, CartesianProductStreamSlicer):
    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
        """Initialize cursor and parameters for nested state management."""

    def get_stream_state(self) -> Mapping[str, Any]:
        """
        Get current cursor state supporting nested project states.
        
        Returns:
        State dictionary with project-specific timestamps
        """

    def set_initial_state(self, stream_state: StreamState) -> None:
        """Set initial cursor state from previous sync."""

    def close_slice(self, stream_slice: StreamSlice, most_recent_record: Optional[Record]) -> None:
        """Update cursor with most recent record timestamp for the project."""

    def stream_slices(self) -> Iterable[StreamSlice]:
        """
        Generate datetime slices for each project with project-specific state handling.
        Supports both old-style and new nested state formats.
        
        Returns:
        Iterable of stream slices with project_id and datetime range
        """

    def should_be_synced(self, record: Record) -> bool:
        """Determine if record should be synced (always True for PostHog)."""

    def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
        """Compare records by timestamp for cursor ordering."""

Configuration Specification

The connector accepts the following configuration parameters:

class PostHogConfig:
    api_key: str  # Required: PostHog API key for authentication
    start_date: str  # Required: Start date in ISO format (YYYY-MM-DDTHH:MM:SSZ)
    base_url: str = "https://app.posthog.com"  # Optional: PostHog instance URL
    events_time_step: int = 30  # Optional: Events stream slice size in days (1-91)

Data Streams

The connector provides seven data streams from PostHog API:

class PostHogStreams:
    projects: Stream  # Project information and metadata
    cohorts: Stream  # User cohort definitions (per project)
    feature_flags: Stream  # Feature flag configurations (per project)  
    persons: Stream  # Person/user data (per project)
    events: Stream  # Event data with incremental sync (per project)
    annotations: Stream  # Event annotations (per project)
    insights: Stream  # Dashboard insights (per project)

Stream Characteristics

All streams except projects are partitioned by project ID and use the following pattern:

class StreamConfig:
    primary_key: str = "id"  # Primary key for all streams
    partition_field: str = "project_id"  # Partitioning field for project-based streams  
    pagination_strategy: str = "OffsetIncrement"  # Pagination method
    page_size: int = 100  # Default page size (10000 for events)

Incremental Synchronization

The events stream supports incremental synchronization with project-specific cursor state:

class IncrementalConfig:
    cursor_field: str = "timestamp"  # Cursor field for incremental sync
    cursor_datetime_formats: list[str] = [
        "%Y-%m-%dT%H:%M:%S.%f%z",
        "%Y-%m-%dT%H:%M:%S+00:00"
    ]
    cursor_granularity: str = "PT0.000001S"  # Microsecond precision
    step: str = "P{events_time_step}D"  # Configurable time step in days

Authentication

Bearer token authentication using PostHog API key:

class AuthConfig:
    type: str = "BearerAuthenticator"
    api_token: str  # From config['api_key']
    header_format: str = "Bearer {api_token}"

Data Types

Project Data Structure

class Project:
    id: int  # Project ID
    uuid: str  # Project UUID
    organization: str  # Organization name
    api_token: str  # Project API token
    name: str  # Project name
    completed_snippet_onboarding: bool  # Onboarding status
    ingested_event: bool  # Event ingestion status
    is_demo: bool  # Demo project flag
    timezone: str  # Project timezone
    access_control: bool  # Access control enabled
    effective_membership_level: int  # User membership level

Event Data Structure

class Event:
    id: str  # Event ID
    distinct_id: str  # User distinct ID
    properties: dict  # Event properties
    event: str  # Event name
    timestamp: str  # Event timestamp (ISO format)
    person: Person  # Associated person data
    elements: list[Union[str, dict]]  # UI elements
    elements_chain: str  # Element chain string

class Person:
    is_identified: bool  # Person identification status
    distinct_ids: list[str]  # List of distinct IDs
    properties: dict  # Person properties

Stream State Format

For incremental synchronization, the connector maintains nested state per project:

class StreamState:
    # New format (per project)
    project_states: dict[str, ProjectState]
    
    # Legacy format (backward compatibility)
    timestamp: Optional[str]

class ProjectState:
    timestamp: str  # Last synced timestamp for the project

Error Handling

The connector handles PostHog API-specific behaviors:

  • Events API returns records in descending order (newest first)
  • Custom pagination where next URL contains datetime ranges
  • Support for both old-style and nested state formats
  • Project-specific error handling and retry logic
  • Time step configuration for handling large event datasets

Usage Examples

Basic Connector Setup

from source_posthog import SourcePosthog
from airbyte_cdk.entrypoint import launch

def main():
    source = SourcePosthog()
    launch(source, ["check", "--config", "config.json"])

if __name__ == "__main__":
    main()

Custom Component Usage

from source_posthog.components import EventsCartesianProductStreamSlicer
from airbyte_cdk.sources.declarative.types import StreamSlice

# Initialize custom slicer
slicer = EventsCartesianProductStreamSlicer()

# Set initial state
initial_state = {
    "project_123": {"timestamp": "2021-01-01T00:00:00.000000Z"},
    "project_456": {"timestamp": "2021-02-01T00:00:00.000000Z"}
}
slicer.set_initial_state(initial_state)

# Generate slices
for slice in slicer.stream_slices():
    print(f"Project: {slice['project_id']}, Start: {slice['start_time']}, End: {slice['end_time']}")

Configuration Validation

import json
from source_posthog import SourcePosthog

# Load configuration
with open("config.json") as f:
    config = json.load(f)

# Validate configuration
source = SourcePosthog()
connection_status = source.check(None, config)

if connection_status.status.name == "SUCCEEDED":
    print("Configuration is valid")
else:
    print(f"Configuration error: {connection_status.message}")