Python library for using Mixpanel asynchronously with thread-based batching and flushing
npx @tessl/cli install tessl/pypi-mixpanel-py-async@0.3.0Python library for using Mixpanel asynchronously with thread-based batching and flushing. This library provides an asynchronous wrapper around the standard Mixpanel Python client, enabling non-blocking event tracking through buffered queues that are flushed in separate threads.
pip install mixpanel-py-asyncfrom mixpanel_async import AsyncBufferedConsumerFor testing and debugging:
import threading # For accessing Thread types and utilitiesfrom mixpanel import Mixpanel
from mixpanel_async import AsyncBufferedConsumer
# Create an async consumer with default settings
consumer = AsyncBufferedConsumer()
# Initialize Mixpanel client with the async consumer
mp = Mixpanel('YOUR_PROJECT_TOKEN', consumer=consumer)
# Track events - these will be batched and sent asynchronously
mp.track('user_123', 'page_view', {'page': 'homepage', 'source': 'organic'})
mp.track('user_456', 'signup', {'plan': 'premium'})
# Update user profiles - also batched and sent asynchronously
mp.people_set('user_123', {'$first_name': 'John', '$email': 'john@example.com'})
# Ensure all events are sent before application termination
consumer.flush(async_=False)The async consumer extends Mixpanel's standard BufferedConsumer with asynchronous flushing capabilities:
The main AsyncBufferedConsumer class provides thread-based batching and flushing of Mixpanel events to prevent blocking the main application thread.
class AsyncBufferedConsumer:
def __init__(
self,
flush_after=timedelta(0, 10),
flush_first=True,
max_size=20,
events_url=None,
people_url=None,
import_url=None,
request_timeout=None,
groups_url=None,
api_host="api.mixpanel.com",
retry_limit=4,
retry_backoff_factor=0.25,
verify_cert=True,
):
"""
Create a new AsyncBufferedConsumer instance.
Parameters:
- flush_after (datetime.timedelta): Time period after which events are flushed automatically (default: 10 seconds)
- flush_first (bool): Whether to flush the first event immediately (default: True)
- max_size (int): Queue size that triggers automatic flush (default: 20)
- events_url (str): Custom Mixpanel events API URL (optional)
- people_url (str): Custom Mixpanel people API URL (optional)
- import_url (str): Custom Mixpanel import API URL (optional)
- request_timeout (int): Connection timeout in seconds (optional)
- groups_url (str): Custom Mixpanel groups API URL (optional)
- api_host (str): Mixpanel API domain (default: "api.mixpanel.com")
- retry_limit (int): Number of retry attempts for failed requests (default: 4)
- retry_backoff_factor (float): Exponential backoff factor for retries (default: 0.25)
- verify_cert (bool): Whether to verify SSL certificates (default: True)
"""AsyncBufferedConsumer provides constants used internally for flush logic that may be useful for understanding behavior.
class AsyncBufferedConsumer:
ALL = "ALL" # Constant indicating all endpoints should be flushed
ENDPOINT = "ENDPOINT" # Constant indicating specific endpoint should be flushedSend events or profile updates to Mixpanel. Events are stored in memory and automatically flushed based on queue size or time thresholds.
def send(self, endpoint, json_message, api_key=None):
"""
Record an event or profile update.
Parameters:
- endpoint (str): Mixpanel endpoint - valid values depend on BufferedConsumer configuration,
typically 'events', 'people', 'import', 'groups'
- json_message (str): JSON-formatted message for the endpoint
- api_key (str): Mixpanel project API key (optional)
Raises:
- MixpanelException: For invalid endpoints or API errors
"""Manually trigger the sending of all queued events to Mixpanel, either synchronously or asynchronously.
def flush(self, endpoint=None, async_=True):
"""
Send all remaining messages to Mixpanel.
Parameters:
- endpoint (str): Specific endpoint to flush (optional, flushes all if None)
- async_ (bool): Whether to flush in separate thread (default: True)
Returns:
- bool: Whether flush was executed (False if another flush is already running)
Raises:
- MixpanelException: For communication errors with Mixpanel servers
"""Transfer events between internal buffer systems for thread-safe processing.
def transfer_buffers(self, endpoint=None):
"""
Transfer events from async buffers to sync buffers for flushing.
Parameters:
- endpoint (str): Specific endpoint to transfer (optional, transfers all if None)
"""AsyncBufferedConsumer provides access to several public attributes for runtime configuration and monitoring.
class AsyncBufferedConsumer:
flush_after: timedelta # Time period after which events are automatically flushed
flush_first: bool # Whether to flush the first event immediately
last_flushed: datetime # Timestamp of the last flush operation (None if never flushed)
flushing_thread: Thread # Reference to current flush thread (None if no flush active)Methods useful for testing and monitoring the async consumer's internal state.
def _flush_thread_is_free(self):
"""
Check whether a flush thread is currently active.
Returns:
- bool: True if no flush thread is running, False otherwise
"""
def _sync_flush(self, endpoint=None):
"""
Perform synchronous flush operation (used internally by flush threads).
Parameters:
- endpoint (str): Specific endpoint to flush (optional, flushes all if None)
"""Internal thread class used for asynchronous flushing operations. This class extends threading.Thread and is created automatically by AsyncBufferedConsumer when performing async flushes.
class FlushThread(threading.Thread):
def __init__(self, consumer, endpoint=None):
"""
Create a flush thread for asynchronous event sending.
Parameters:
- consumer (AsyncBufferedConsumer): Consumer instance to flush
- endpoint (str): Specific endpoint to flush (optional, flushes all if None)
"""
def run(self):
"""Execute the flush operation in the thread by calling consumer._sync_flush()."""from datetime import timedelta
from mixpanel_async import AsyncBufferedConsumer
# Flush every 30 seconds or when queue reaches 50 events
consumer = AsyncBufferedConsumer(
flush_after=timedelta(seconds=30),
max_size=50,
flush_first=False # Don't flush the first event immediately
)consumer = AsyncBufferedConsumer(
api_host="eu.mixpanel.com", # Use EU data center
retry_limit=6, # Retry up to 6 times
retry_backoff_factor=0.5, # Longer backoff between retries
request_timeout=30 # 30 second timeout
)import atexit
from mixpanel import Mixpanel
from mixpanel_async import AsyncBufferedConsumer
# Create consumer with production settings
consumer = AsyncBufferedConsumer(
flush_after=timedelta(seconds=5), # Flush frequently
max_size=100, # Larger batch size
retry_limit=8, # More retries for reliability
verify_cert=True # Always verify SSL
)
mp = Mixpanel('YOUR_TOKEN', consumer=consumer)
# Ensure final flush on application exit
atexit.register(lambda: consumer.flush(async_=False))
# Your application code here
mp.track('user_id', 'app_start')All errors are raised as MixpanelException from the base mixpanel library:
from mixpanel import MixpanelException
from mixpanel_async import AsyncBufferedConsumer
consumer = AsyncBufferedConsumer()
try:
consumer.send('invalid_endpoint', '{"event": "test"}')
except MixpanelException as e:
print(f"Error: {e}")
# Handle the error appropriately# From datetime module
class datetime:
@staticmethod
def now():
"""Get current datetime, used for tracking flush timing."""
class timedelta:
def __init__(self, days=0, seconds=0, microseconds=0, milliseconds=0, minutes=0, hours=0, weeks=0):
"""Time duration object for flush_after parameter."""
# From threading module
class Thread:
"""Base thread class extended by FlushThread."""
def is_alive(self):
"""Check if thread is currently running."""
def join(self):
"""Wait for thread to complete."""
# From mixpanel module
class MixpanelException(Exception):
"""Exception raised for Mixpanel API errors and invalid usage."""
message = str # Error message
endpoint = str # Endpoint that caused the error (when applicable)