Python library for using Mixpanel asynchronously with thread-based batching and flushing
npx @tessl/cli install tessl/pypi-mixpanel-py-async@0.3.00
# Mixpanel Python Async
1
2
Python library for using Mixpanel asynchronously with thread-based batching and flushing. This library provides an asynchronous wrapper around the standard Mixpanel Python client, enabling non-blocking event tracking through buffered queues that are flushed in separate threads.
3
4
## Package Information
5
6
- **Package Name**: mixpanel-py-async
7
- **Package Type**: pypi
8
- **Language**: Python
9
- **Installation**: `pip install mixpanel-py-async`
10
11
## Core Imports
12
13
```python
14
from mixpanel_async import AsyncBufferedConsumer
15
```
16
17
For testing and debugging:
18
19
```python
20
import threading # For accessing Thread types and utilities
21
```
22
23
## Basic Usage
24
25
```python
26
from mixpanel import Mixpanel
27
from mixpanel_async import AsyncBufferedConsumer
28
29
# Create an async consumer with default settings
30
consumer = AsyncBufferedConsumer()
31
32
# Initialize Mixpanel client with the async consumer
33
mp = Mixpanel('YOUR_PROJECT_TOKEN', consumer=consumer)
34
35
# Track events - these will be batched and sent asynchronously
36
mp.track('user_123', 'page_view', {'page': 'homepage', 'source': 'organic'})
37
mp.track('user_456', 'signup', {'plan': 'premium'})
38
39
# Update user profiles - also batched and sent asynchronously
40
mp.people_set('user_123', {'$first_name': 'John', '$email': 'john@example.com'})
41
42
# Ensure all events are sent before application termination
43
consumer.flush(async_=False)
44
```
45
46
## Architecture
47
48
The async consumer extends Mixpanel's standard BufferedConsumer with asynchronous flushing capabilities:
49
50
- **AsyncBufferedConsumer**: Main class that manages event queues and asynchronous flushing
51
- **FlushThread**: Helper thread class that performs the actual flushing operations
52
- **Buffer Management**: Dual buffer system (async buffers for incoming events, sync buffers for flushing)
53
- **Automatic Flushing**: Events are automatically flushed based on queue size or time intervals
54
- **Thread Safety**: Thread locks ensure only one flush operation runs at a time
55
56
## Capabilities
57
58
### Asynchronous Event Consumer
59
60
The main AsyncBufferedConsumer class provides thread-based batching and flushing of Mixpanel events to prevent blocking the main application thread.
61
62
```python { .api }
63
class AsyncBufferedConsumer:
64
def __init__(
65
self,
66
flush_after=timedelta(0, 10),
67
flush_first=True,
68
max_size=20,
69
events_url=None,
70
people_url=None,
71
import_url=None,
72
request_timeout=None,
73
groups_url=None,
74
api_host="api.mixpanel.com",
75
retry_limit=4,
76
retry_backoff_factor=0.25,
77
verify_cert=True,
78
):
79
"""
80
Create a new AsyncBufferedConsumer instance.
81
82
Parameters:
83
- flush_after (datetime.timedelta): Time period after which events are flushed automatically (default: 10 seconds)
84
- flush_first (bool): Whether to flush the first event immediately (default: True)
85
- max_size (int): Queue size that triggers automatic flush (default: 20)
86
- events_url (str): Custom Mixpanel events API URL (optional)
87
- people_url (str): Custom Mixpanel people API URL (optional)
88
- import_url (str): Custom Mixpanel import API URL (optional)
89
- request_timeout (int): Connection timeout in seconds (optional)
90
- groups_url (str): Custom Mixpanel groups API URL (optional)
91
- api_host (str): Mixpanel API domain (default: "api.mixpanel.com")
92
- retry_limit (int): Number of retry attempts for failed requests (default: 4)
93
- retry_backoff_factor (float): Exponential backoff factor for retries (default: 0.25)
94
- verify_cert (bool): Whether to verify SSL certificates (default: True)
95
"""
96
```
97
98
### Public Constants
99
100
AsyncBufferedConsumer provides constants used internally for flush logic that may be useful for understanding behavior.
101
102
```python { .api }
103
class AsyncBufferedConsumer:
104
ALL = "ALL" # Constant indicating all endpoints should be flushed
105
ENDPOINT = "ENDPOINT" # Constant indicating specific endpoint should be flushed
106
```
107
108
### Event Sending
109
110
Send events or profile updates to Mixpanel. Events are stored in memory and automatically flushed based on queue size or time thresholds.
111
112
```python { .api }
113
def send(self, endpoint, json_message, api_key=None):
114
"""
115
Record an event or profile update.
116
117
Parameters:
118
- endpoint (str): Mixpanel endpoint - valid values depend on BufferedConsumer configuration,
119
typically 'events', 'people', 'import', 'groups'
120
- json_message (str): JSON-formatted message for the endpoint
121
- api_key (str): Mixpanel project API key (optional)
122
123
Raises:
124
- MixpanelException: For invalid endpoints or API errors
125
"""
126
```
127
128
### Manual Flushing
129
130
Manually trigger the sending of all queued events to Mixpanel, either synchronously or asynchronously.
131
132
```python { .api }
133
def flush(self, endpoint=None, async_=True):
134
"""
135
Send all remaining messages to Mixpanel.
136
137
Parameters:
138
- endpoint (str): Specific endpoint to flush (optional, flushes all if None)
139
- async_ (bool): Whether to flush in separate thread (default: True)
140
141
Returns:
142
- bool: Whether flush was executed (False if another flush is already running)
143
144
Raises:
145
- MixpanelException: For communication errors with Mixpanel servers
146
"""
147
```
148
149
### Buffer Management
150
151
Transfer events between internal buffer systems for thread-safe processing.
152
153
```python { .api }
154
def transfer_buffers(self, endpoint=None):
155
"""
156
Transfer events from async buffers to sync buffers for flushing.
157
158
Parameters:
159
- endpoint (str): Specific endpoint to transfer (optional, transfers all if None)
160
"""
161
```
162
163
### Public Attributes
164
165
AsyncBufferedConsumer provides access to several public attributes for runtime configuration and monitoring.
166
167
```python { .api }
168
class AsyncBufferedConsumer:
169
flush_after: timedelta # Time period after which events are automatically flushed
170
flush_first: bool # Whether to flush the first event immediately
171
last_flushed: datetime # Timestamp of the last flush operation (None if never flushed)
172
flushing_thread: Thread # Reference to current flush thread (None if no flush active)
173
```
174
175
### Testing and Debugging
176
177
Methods useful for testing and monitoring the async consumer's internal state.
178
179
```python { .api }
180
def _flush_thread_is_free(self):
181
"""
182
Check whether a flush thread is currently active.
183
184
Returns:
185
- bool: True if no flush thread is running, False otherwise
186
"""
187
188
def _sync_flush(self, endpoint=None):
189
"""
190
Perform synchronous flush operation (used internally by flush threads).
191
192
Parameters:
193
- endpoint (str): Specific endpoint to flush (optional, flushes all if None)
194
"""
195
```
196
197
### Helper Classes
198
199
#### FlushThread
200
201
Internal thread class used for asynchronous flushing operations. This class extends threading.Thread and is created automatically by AsyncBufferedConsumer when performing async flushes.
202
203
```python { .api }
204
class FlushThread(threading.Thread):
205
def __init__(self, consumer, endpoint=None):
206
"""
207
Create a flush thread for asynchronous event sending.
208
209
Parameters:
210
- consumer (AsyncBufferedConsumer): Consumer instance to flush
211
- endpoint (str): Specific endpoint to flush (optional, flushes all if None)
212
"""
213
214
def run(self):
215
"""Execute the flush operation in the thread by calling consumer._sync_flush()."""
216
```
217
218
## Configuration Examples
219
220
### Custom Flush Timing
221
222
```python
223
from datetime import timedelta
224
from mixpanel_async import AsyncBufferedConsumer
225
226
# Flush every 30 seconds or when queue reaches 50 events
227
consumer = AsyncBufferedConsumer(
228
flush_after=timedelta(seconds=30),
229
max_size=50,
230
flush_first=False # Don't flush the first event immediately
231
)
232
```
233
234
### Custom API Endpoints and Retry Logic
235
236
```python
237
consumer = AsyncBufferedConsumer(
238
api_host="eu.mixpanel.com", # Use EU data center
239
retry_limit=6, # Retry up to 6 times
240
retry_backoff_factor=0.5, # Longer backoff between retries
241
request_timeout=30 # 30 second timeout
242
)
243
```
244
245
### Production Usage Pattern
246
247
```python
248
import atexit
249
from mixpanel import Mixpanel
250
from mixpanel_async import AsyncBufferedConsumer
251
252
# Create consumer with production settings
253
consumer = AsyncBufferedConsumer(
254
flush_after=timedelta(seconds=5), # Flush frequently
255
max_size=100, # Larger batch size
256
retry_limit=8, # More retries for reliability
257
verify_cert=True # Always verify SSL
258
)
259
260
mp = Mixpanel('YOUR_TOKEN', consumer=consumer)
261
262
# Ensure final flush on application exit
263
atexit.register(lambda: consumer.flush(async_=False))
264
265
# Your application code here
266
mp.track('user_id', 'app_start')
267
```
268
269
## Error Handling
270
271
All errors are raised as `MixpanelException` from the base mixpanel library:
272
273
```python
274
from mixpanel import MixpanelException
275
from mixpanel_async import AsyncBufferedConsumer
276
277
consumer = AsyncBufferedConsumer()
278
279
try:
280
consumer.send('invalid_endpoint', '{"event": "test"}')
281
except MixpanelException as e:
282
print(f"Error: {e}")
283
# Handle the error appropriately
284
```
285
286
## Types
287
288
```python { .api }
289
# From datetime module
290
class datetime:
291
@staticmethod
292
def now():
293
"""Get current datetime, used for tracking flush timing."""
294
295
class timedelta:
296
def __init__(self, days=0, seconds=0, microseconds=0, milliseconds=0, minutes=0, hours=0, weeks=0):
297
"""Time duration object for flush_after parameter."""
298
299
# From threading module
300
class Thread:
301
"""Base thread class extended by FlushThread."""
302
def is_alive(self):
303
"""Check if thread is currently running."""
304
def join(self):
305
"""Wait for thread to complete."""
306
307
# From mixpanel module
308
class MixpanelException(Exception):
309
"""Exception raised for Mixpanel API errors and invalid usage."""
310
message = str # Error message
311
endpoint = str # Endpoint that caused the error (when applicable)
312
```