or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.md

index.mddocs/

0

# Airbyte Source PostHog

1

2

An Airbyte connector for extracting analytics data from PostHog, an open-source product analytics platform. This connector enables data synchronization from PostHog's API to various destinations, supporting multiple data streams with incremental synchronization capabilities for events data.

3

4

## Package Information

5

6

- **Package Name**: source-posthog

7

- **Package Type**: python (pypi)

8

- **Language**: Python

9

- **Installation**: `pip install source-posthog` or `poetry add source-posthog`

10

- **Version**: 1.1.25

11

12

## Core Imports

13

14

```python

15

from source_posthog import SourcePosthog

16

```

17

18

For running the connector directly:

19

20

```python

21

from source_posthog.run import run

22

```

23

24

For custom component implementations:

25

26

```python

27

from source_posthog.components import EventsSimpleRetriever, EventsCartesianProductStreamSlicer

28

```

29

30

## Basic Usage

31

32

### Running as Airbyte Connector

33

34

```python

35

from source_posthog import SourcePosthog

36

37

# Initialize the source connector

38

source = SourcePosthog()

39

40

# Use with Airbyte CDK launch function

41

from airbyte_cdk.entrypoint import launch

42

import sys

43

44

launch(source, sys.argv[1:])

45

```

46

47

### Command Line Usage

48

49

```bash

50

# Install the connector

51

pip install source-posthog

52

53

# Run connector commands

54

source-posthog spec

55

source-posthog check --config config.json

56

source-posthog discover --config config.json

57

source-posthog read --config config.json --catalog catalog.json

58

```

59

60

### Configuration Example

61

62

```json

63

{

64

"api_key": "your-posthog-api-key",

65

"start_date": "2021-01-01T00:00:00Z",

66

"base_url": "https://app.posthog.com",

67

"events_time_step": 30

68

}

69

```

70

71

## Architecture

72

73

The connector is built using Airbyte's declarative YAML-based configuration system, which provides:

74

75

- **Declarative Configuration**: All stream definitions, authentication, and pagination logic are defined in `manifest.yaml`

76

- **Custom Components**: Python classes for handling PostHog-specific API behaviors like nested state management and descending order pagination

77

- **Schema-Driven**: JSON schema files define the structure for each data stream

78

- **Project-Based Partitioning**: Data streams are partitioned by PostHog projects with individual cursor state management

79

80

## Capabilities

81

82

### Source Connector

83

84

The main connector class providing PostHog data extraction capabilities with declarative YAML configuration.

85

86

```python { .api }

87

class SourcePosthog(YamlDeclarativeSource):

88

def __init__(self):

89

"""Initialize PostHog source connector with manifest.yaml configuration."""

90

```

91

92

### Entry Point Function

93

94

Main function for launching the connector via command line or programmatic execution.

95

96

```python { .api }

97

def run() -> None:

98

"""

99

Main entry point for running the PostHog source connector.

100

Creates SourcePosthog instance and launches it using Airbyte CDK.

101

"""

102

```

103

104

### Events Stream Components

105

106

Custom components for handling PostHog Events API specific behaviors including descending order pagination and nested state management.

107

108

```python { .api }

109

class EventsSimpleRetriever(SimpleRetriever):

110

def __post_init__(self, parameters: Mapping[str, Any]) -> None:

111

"""Post-initialization setup for cursor handling."""

112

113

def request_params(

114

self,

115

stream_state: StreamSlice,

116

stream_slice: Optional[StreamSlice] = None,

117

next_page_token: Optional[Mapping[str, Any]] = None,

118

) -> MutableMapping[str, Any]:

119

"""

120

Generate request parameters for PostHog Events API.

121

Handles descending order pagination where next_page_token

122

contains 'after'/'before' params that override stream_slice params.

123

124

Returns:

125

Request parameters dictionary with pagination handling

126

"""

127

```

128

129

```python { .api }

130

class EventsCartesianProductStreamSlicer(Cursor, CartesianProductStreamSlicer):

131

def __post_init__(self, parameters: Mapping[str, Any]) -> None:

132

"""Initialize cursor and parameters for nested state management."""

133

134

def get_stream_state(self) -> Mapping[str, Any]:

135

"""

136

Get current cursor state supporting nested project states.

137

138

Returns:

139

State dictionary with project-specific timestamps

140

"""

141

142

def set_initial_state(self, stream_state: StreamState) -> None:

143

"""Set initial cursor state from previous sync."""

144

145

def close_slice(self, stream_slice: StreamSlice, most_recent_record: Optional[Record]) -> None:

146

"""Update cursor with most recent record timestamp for the project."""

147

148

def stream_slices(self) -> Iterable[StreamSlice]:

149

"""

150

Generate datetime slices for each project with project-specific state handling.

151

Supports both old-style and new nested state formats.

152

153

Returns:

154

Iterable of stream slices with project_id and datetime range

155

"""

156

157

def should_be_synced(self, record: Record) -> bool:

158

"""Determine if record should be synced (always True for PostHog)."""

159

160

def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:

161

"""Compare records by timestamp for cursor ordering."""

162

```

163

164

### Configuration Specification

165

166

The connector accepts the following configuration parameters:

167

168

```python { .api }

169

class PostHogConfig:

170

api_key: str # Required: PostHog API key for authentication

171

start_date: str # Required: Start date in ISO format (YYYY-MM-DDTHH:MM:SSZ)

172

base_url: str = "https://app.posthog.com" # Optional: PostHog instance URL

173

events_time_step: int = 30 # Optional: Events stream slice size in days (1-91)

174

```

175

176

### Data Streams

177

178

The connector provides seven data streams from PostHog API:

179

180

```python { .api }

181

class PostHogStreams:

182

projects: Stream # Project information and metadata

183

cohorts: Stream # User cohort definitions (per project)

184

feature_flags: Stream # Feature flag configurations (per project)

185

persons: Stream # Person/user data (per project)

186

events: Stream # Event data with incremental sync (per project)

187

annotations: Stream # Event annotations (per project)

188

insights: Stream # Dashboard insights (per project)

189

```

190

191

### Stream Characteristics

192

193

All streams except `projects` are partitioned by project ID and use the following pattern:

194

195

```python { .api }

196

class StreamConfig:

197

primary_key: str = "id" # Primary key for all streams

198

partition_field: str = "project_id" # Partitioning field for project-based streams

199

pagination_strategy: str = "OffsetIncrement" # Pagination method

200

page_size: int = 100 # Default page size (10000 for events)

201

```

202

203

### Incremental Synchronization

204

205

The events stream supports incremental synchronization with project-specific cursor state:

206

207

```python { .api }

208

class IncrementalConfig:

209

cursor_field: str = "timestamp" # Cursor field for incremental sync

210

cursor_datetime_formats: list[str] = [

211

"%Y-%m-%dT%H:%M:%S.%f%z",

212

"%Y-%m-%dT%H:%M:%S+00:00"

213

]

214

cursor_granularity: str = "PT0.000001S" # Microsecond precision

215

step: str = "P{events_time_step}D" # Configurable time step in days

216

```

217

218

### Authentication

219

220

Bearer token authentication using PostHog API key:

221

222

```python { .api }

223

class AuthConfig:

224

type: str = "BearerAuthenticator"

225

api_token: str # From config['api_key']

226

header_format: str = "Bearer {api_token}"

227

```

228

229

## Data Types

230

231

### Project Data Structure

232

233

```python { .api }

234

class Project:

235

id: int # Project ID

236

uuid: str # Project UUID

237

organization: str # Organization name

238

api_token: str # Project API token

239

name: str # Project name

240

completed_snippet_onboarding: bool # Onboarding status

241

ingested_event: bool # Event ingestion status

242

is_demo: bool # Demo project flag

243

timezone: str # Project timezone

244

access_control: bool # Access control enabled

245

effective_membership_level: int # User membership level

246

```

247

248

### Event Data Structure

249

250

```python { .api }

251

class Event:

252

id: str # Event ID

253

distinct_id: str # User distinct ID

254

properties: dict # Event properties

255

event: str # Event name

256

timestamp: str # Event timestamp (ISO format)

257

person: Person # Associated person data

258

elements: list[Union[str, dict]] # UI elements

259

elements_chain: str # Element chain string

260

261

class Person:

262

is_identified: bool # Person identification status

263

distinct_ids: list[str] # List of distinct IDs

264

properties: dict # Person properties

265

```

266

267

### Stream State Format

268

269

For incremental synchronization, the connector maintains nested state per project:

270

271

```python { .api }

272

class StreamState:

273

# New format (per project)

274

project_states: dict[str, ProjectState]

275

276

# Legacy format (backward compatibility)

277

timestamp: Optional[str]

278

279

class ProjectState:

280

timestamp: str # Last synced timestamp for the project

281

```

282

283

## Error Handling

284

285

The connector handles PostHog API-specific behaviors:

286

287

- Events API returns records in descending order (newest first)

288

- Custom pagination where `next` URL contains datetime ranges

289

- Support for both old-style and nested state formats

290

- Project-specific error handling and retry logic

291

- Time step configuration for handling large event datasets

292

293

## Usage Examples

294

295

### Basic Connector Setup

296

297

```python

298

from source_posthog import SourcePosthog

299

from airbyte_cdk.entrypoint import launch

300

301

def main():

302

source = SourcePosthog()

303

launch(source, ["check", "--config", "config.json"])

304

305

if __name__ == "__main__":

306

main()

307

```

308

309

### Custom Component Usage

310

311

```python

312

from source_posthog.components import EventsCartesianProductStreamSlicer

313

from airbyte_cdk.sources.declarative.types import StreamSlice

314

315

# Initialize custom slicer

316

slicer = EventsCartesianProductStreamSlicer()

317

318

# Set initial state

319

initial_state = {

320

"project_123": {"timestamp": "2021-01-01T00:00:00.000000Z"},

321

"project_456": {"timestamp": "2021-02-01T00:00:00.000000Z"}

322

}

323

slicer.set_initial_state(initial_state)

324

325

# Generate slices

326

for slice in slicer.stream_slices():

327

print(f"Project: {slice['project_id']}, Start: {slice['start_time']}, End: {slice['end_time']}")

328

```

329

330

### Configuration Validation

331

332

```python

333

import json

334

from source_posthog import SourcePosthog

335

336

# Load configuration

337

with open("config.json") as f:

338

config = json.load(f)

339

340

# Validate configuration

341

source = SourcePosthog()

342

connection_status = source.check(None, config)

343

344

if connection_status.status.name == "SUCCEEDED":

345

print("Configuration is valid")

346

else:

347

print(f"Configuration error: {connection_status.message}")

348

```