or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.md

index.mddocs/

0

# Mixpanel Python Async

1

2

Python library for using Mixpanel asynchronously with thread-based batching and flushing. This library provides an asynchronous wrapper around the standard Mixpanel Python client, enabling non-blocking event tracking through buffered queues that are flushed in separate threads.

3

4

## Package Information

5

6

- **Package Name**: mixpanel-py-async

7

- **Package Type**: pypi

8

- **Language**: Python

9

- **Installation**: `pip install mixpanel-py-async`

10

11

## Core Imports

12

13

```python

14

from mixpanel_async import AsyncBufferedConsumer

15

```

16

17

For testing and debugging:

18

19

```python

20

import threading # For accessing Thread types and utilities

21

```

22

23

## Basic Usage

24

25

```python

26

from mixpanel import Mixpanel

27

from mixpanel_async import AsyncBufferedConsumer

28

29

# Create an async consumer with default settings

30

consumer = AsyncBufferedConsumer()

31

32

# Initialize Mixpanel client with the async consumer

33

mp = Mixpanel('YOUR_PROJECT_TOKEN', consumer=consumer)

34

35

# Track events - these will be batched and sent asynchronously

36

mp.track('user_123', 'page_view', {'page': 'homepage', 'source': 'organic'})

37

mp.track('user_456', 'signup', {'plan': 'premium'})

38

39

# Update user profiles - also batched and sent asynchronously

40

mp.people_set('user_123', {'$first_name': 'John', '$email': 'john@example.com'})

41

42

# Ensure all events are sent before application termination

43

consumer.flush(async_=False)

44

```

45

46

## Architecture

47

48

The async consumer extends Mixpanel's standard BufferedConsumer with asynchronous flushing capabilities:

49

50

- **AsyncBufferedConsumer**: Main class that manages event queues and asynchronous flushing

51

- **FlushThread**: Helper thread class that performs the actual flushing operations

52

- **Buffer Management**: Dual buffer system (async buffers for incoming events, sync buffers for flushing)

53

- **Automatic Flushing**: Events are automatically flushed based on queue size or time intervals

54

- **Thread Safety**: Thread locks ensure only one flush operation runs at a time

55

56

## Capabilities

57

58

### Asynchronous Event Consumer

59

60

The main AsyncBufferedConsumer class provides thread-based batching and flushing of Mixpanel events to prevent blocking the main application thread.

61

62

```python { .api }

63

class AsyncBufferedConsumer:

64

def __init__(

65

self,

66

flush_after=timedelta(0, 10),

67

flush_first=True,

68

max_size=20,

69

events_url=None,

70

people_url=None,

71

import_url=None,

72

request_timeout=None,

73

groups_url=None,

74

api_host="api.mixpanel.com",

75

retry_limit=4,

76

retry_backoff_factor=0.25,

77

verify_cert=True,

78

):

79

"""

80

Create a new AsyncBufferedConsumer instance.

81

82

Parameters:

83

- flush_after (datetime.timedelta): Time period after which events are flushed automatically (default: 10 seconds)

84

- flush_first (bool): Whether to flush the first event immediately (default: True)

85

- max_size (int): Queue size that triggers automatic flush (default: 20)

86

- events_url (str): Custom Mixpanel events API URL (optional)

87

- people_url (str): Custom Mixpanel people API URL (optional)

88

- import_url (str): Custom Mixpanel import API URL (optional)

89

- request_timeout (int): Connection timeout in seconds (optional)

90

- groups_url (str): Custom Mixpanel groups API URL (optional)

91

- api_host (str): Mixpanel API domain (default: "api.mixpanel.com")

92

- retry_limit (int): Number of retry attempts for failed requests (default: 4)

93

- retry_backoff_factor (float): Exponential backoff factor for retries (default: 0.25)

94

- verify_cert (bool): Whether to verify SSL certificates (default: True)

95

"""

96

```

97

98

### Public Constants

99

100

AsyncBufferedConsumer provides constants used internally for flush logic that may be useful for understanding behavior.

101

102

```python { .api }

103

class AsyncBufferedConsumer:

104

ALL = "ALL" # Constant indicating all endpoints should be flushed

105

ENDPOINT = "ENDPOINT" # Constant indicating specific endpoint should be flushed

106

```

107

108

### Event Sending

109

110

Send events or profile updates to Mixpanel. Events are stored in memory and automatically flushed based on queue size or time thresholds.

111

112

```python { .api }

113

def send(self, endpoint, json_message, api_key=None):

114

"""

115

Record an event or profile update.

116

117

Parameters:

118

- endpoint (str): Mixpanel endpoint - valid values depend on BufferedConsumer configuration,

119

typically 'events', 'people', 'import', 'groups'

120

- json_message (str): JSON-formatted message for the endpoint

121

- api_key (str): Mixpanel project API key (optional)

122

123

Raises:

124

- MixpanelException: For invalid endpoints or API errors

125

"""

126

```

127

128

### Manual Flushing

129

130

Manually trigger the sending of all queued events to Mixpanel, either synchronously or asynchronously.

131

132

```python { .api }

133

def flush(self, endpoint=None, async_=True):

134

"""

135

Send all remaining messages to Mixpanel.

136

137

Parameters:

138

- endpoint (str): Specific endpoint to flush (optional, flushes all if None)

139

- async_ (bool): Whether to flush in separate thread (default: True)

140

141

Returns:

142

- bool: Whether flush was executed (False if another flush is already running)

143

144

Raises:

145

- MixpanelException: For communication errors with Mixpanel servers

146

"""

147

```

148

149

### Buffer Management

150

151

Transfer events between internal buffer systems for thread-safe processing.

152

153

```python { .api }

154

def transfer_buffers(self, endpoint=None):

155

"""

156

Transfer events from async buffers to sync buffers for flushing.

157

158

Parameters:

159

- endpoint (str): Specific endpoint to transfer (optional, transfers all if None)

160

"""

161

```

162

163

### Public Attributes

164

165

AsyncBufferedConsumer provides access to several public attributes for runtime configuration and monitoring.

166

167

```python { .api }

168

class AsyncBufferedConsumer:

169

flush_after: timedelta # Time period after which events are automatically flushed

170

flush_first: bool # Whether to flush the first event immediately

171

last_flushed: datetime # Timestamp of the last flush operation (None if never flushed)

172

flushing_thread: Thread # Reference to current flush thread (None if no flush active)

173

```

174

175

### Testing and Debugging

176

177

Methods useful for testing and monitoring the async consumer's internal state.

178

179

```python { .api }

180

def _flush_thread_is_free(self):

181

"""

182

Check whether a flush thread is currently active.

183

184

Returns:

185

- bool: True if no flush thread is running, False otherwise

186

"""

187

188

def _sync_flush(self, endpoint=None):

189

"""

190

Perform synchronous flush operation (used internally by flush threads).

191

192

Parameters:

193

- endpoint (str): Specific endpoint to flush (optional, flushes all if None)

194

"""

195

```

196

197

### Helper Classes

198

199

#### FlushThread

200

201

Internal thread class used for asynchronous flushing operations. This class extends threading.Thread and is created automatically by AsyncBufferedConsumer when performing async flushes.

202

203

```python { .api }

204

class FlushThread(threading.Thread):

205

def __init__(self, consumer, endpoint=None):

206

"""

207

Create a flush thread for asynchronous event sending.

208

209

Parameters:

210

- consumer (AsyncBufferedConsumer): Consumer instance to flush

211

- endpoint (str): Specific endpoint to flush (optional, flushes all if None)

212

"""

213

214

def run(self):

215

"""Execute the flush operation in the thread by calling consumer._sync_flush()."""

216

```

217

218

## Configuration Examples

219

220

### Custom Flush Timing

221

222

```python

223

from datetime import timedelta

224

from mixpanel_async import AsyncBufferedConsumer

225

226

# Flush every 30 seconds or when queue reaches 50 events

227

consumer = AsyncBufferedConsumer(

228

flush_after=timedelta(seconds=30),

229

max_size=50,

230

flush_first=False # Don't flush the first event immediately

231

)

232

```

233

234

### Custom API Endpoints and Retry Logic

235

236

```python

237

consumer = AsyncBufferedConsumer(

238

api_host="eu.mixpanel.com", # Use EU data center

239

retry_limit=6, # Retry up to 6 times

240

retry_backoff_factor=0.5, # Longer backoff between retries

241

request_timeout=30 # 30 second timeout

242

)

243

```

244

245

### Production Usage Pattern

246

247

```python

248

import atexit

249

from mixpanel import Mixpanel

250

from mixpanel_async import AsyncBufferedConsumer

251

252

# Create consumer with production settings

253

consumer = AsyncBufferedConsumer(

254

flush_after=timedelta(seconds=5), # Flush frequently

255

max_size=100, # Larger batch size

256

retry_limit=8, # More retries for reliability

257

verify_cert=True # Always verify SSL

258

)

259

260

mp = Mixpanel('YOUR_TOKEN', consumer=consumer)

261

262

# Ensure final flush on application exit

263

atexit.register(lambda: consumer.flush(async_=False))

264

265

# Your application code here

266

mp.track('user_id', 'app_start')

267

```

268

269

## Error Handling

270

271

All errors are raised as `MixpanelException` from the base mixpanel library:

272

273

```python

274

from mixpanel import MixpanelException

275

from mixpanel_async import AsyncBufferedConsumer

276

277

consumer = AsyncBufferedConsumer()

278

279

try:

280

consumer.send('invalid_endpoint', '{"event": "test"}')

281

except MixpanelException as e:

282

print(f"Error: {e}")

283

# Handle the error appropriately

284

```

285

286

## Types

287

288

```python { .api }

289

# From datetime module

290

class datetime:

291

@staticmethod

292

def now():

293

"""Get current datetime, used for tracking flush timing."""

294

295

class timedelta:

296

def __init__(self, days=0, seconds=0, microseconds=0, milliseconds=0, minutes=0, hours=0, weeks=0):

297

"""Time duration object for flush_after parameter."""

298

299

# From threading module

300

class Thread:

301

"""Base thread class extended by FlushThread."""

302

def is_alive(self):

303

"""Check if thread is currently running."""

304

def join(self):

305

"""Wait for thread to complete."""

306

307

# From mixpanel module

308

class MixpanelException(Exception):

309

"""Exception raised for Mixpanel API errors and invalid usage."""

310

message = str # Error message

311

endpoint = str # Endpoint that caused the error (when applicable)

312

```