or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/pypi-mixpanel-py-async

Python library for using Mixpanel asynchronously with thread-based batching and flushing

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/mixpanel-py-async@0.3.x

To install, run

npx @tessl/cli install tessl/pypi-mixpanel-py-async@0.3.0

index.mddocs/

Mixpanel Python Async

Python library for using Mixpanel asynchronously with thread-based batching and flushing. This library provides an asynchronous wrapper around the standard Mixpanel Python client, enabling non-blocking event tracking through buffered queues that are flushed in separate threads.

Package Information

  • Package Name: mixpanel-py-async
  • Package Type: pypi
  • Language: Python
  • Installation: pip install mixpanel-py-async

Core Imports

from mixpanel_async import AsyncBufferedConsumer

For testing and debugging:

import threading  # For accessing Thread types and utilities

Basic Usage

from mixpanel import Mixpanel
from mixpanel_async import AsyncBufferedConsumer

# Create an async consumer with default settings
consumer = AsyncBufferedConsumer()

# Initialize Mixpanel client with the async consumer
mp = Mixpanel('YOUR_PROJECT_TOKEN', consumer=consumer)

# Track events - these will be batched and sent asynchronously
mp.track('user_123', 'page_view', {'page': 'homepage', 'source': 'organic'})
mp.track('user_456', 'signup', {'plan': 'premium'})

# Update user profiles - also batched and sent asynchronously
mp.people_set('user_123', {'$first_name': 'John', '$email': 'john@example.com'})

# Ensure all events are sent before application termination
consumer.flush(async_=False)

Architecture

The async consumer extends Mixpanel's standard BufferedConsumer with asynchronous flushing capabilities:

  • AsyncBufferedConsumer: Main class that manages event queues and asynchronous flushing
  • FlushThread: Helper thread class that performs the actual flushing operations
  • Buffer Management: Dual buffer system (async buffers for incoming events, sync buffers for flushing)
  • Automatic Flushing: Events are automatically flushed based on queue size or time intervals
  • Thread Safety: Thread locks ensure only one flush operation runs at a time

Capabilities

Asynchronous Event Consumer

The main AsyncBufferedConsumer class provides thread-based batching and flushing of Mixpanel events to prevent blocking the main application thread.

class AsyncBufferedConsumer:
    def __init__(
        self,
        flush_after=timedelta(0, 10),
        flush_first=True,
        max_size=20,
        events_url=None,
        people_url=None,
        import_url=None,
        request_timeout=None,
        groups_url=None,
        api_host="api.mixpanel.com",
        retry_limit=4,
        retry_backoff_factor=0.25,
        verify_cert=True,
    ):
        """
        Create a new AsyncBufferedConsumer instance.

        Parameters:
        - flush_after (datetime.timedelta): Time period after which events are flushed automatically (default: 10 seconds)
        - flush_first (bool): Whether to flush the first event immediately (default: True)
        - max_size (int): Queue size that triggers automatic flush (default: 20)
        - events_url (str): Custom Mixpanel events API URL (optional)
        - people_url (str): Custom Mixpanel people API URL (optional)
        - import_url (str): Custom Mixpanel import API URL (optional)
        - request_timeout (int): Connection timeout in seconds (optional)
        - groups_url (str): Custom Mixpanel groups API URL (optional)
        - api_host (str): Mixpanel API domain (default: "api.mixpanel.com")
        - retry_limit (int): Number of retry attempts for failed requests (default: 4)
        - retry_backoff_factor (float): Exponential backoff factor for retries (default: 0.25)
        - verify_cert (bool): Whether to verify SSL certificates (default: True)
        """

Public Constants

AsyncBufferedConsumer provides constants used internally for flush logic that may be useful for understanding behavior.

class AsyncBufferedConsumer:
    ALL = "ALL"          # Constant indicating all endpoints should be flushed
    ENDPOINT = "ENDPOINT" # Constant indicating specific endpoint should be flushed

Event Sending

Send events or profile updates to Mixpanel. Events are stored in memory and automatically flushed based on queue size or time thresholds.

def send(self, endpoint, json_message, api_key=None):
    """
    Record an event or profile update.

    Parameters:
    - endpoint (str): Mixpanel endpoint - valid values depend on BufferedConsumer configuration,
                     typically 'events', 'people', 'import', 'groups'
    - json_message (str): JSON-formatted message for the endpoint
    - api_key (str): Mixpanel project API key (optional)

    Raises:
    - MixpanelException: For invalid endpoints or API errors
    """

Manual Flushing

Manually trigger the sending of all queued events to Mixpanel, either synchronously or asynchronously.

def flush(self, endpoint=None, async_=True):
    """
    Send all remaining messages to Mixpanel.

    Parameters:
    - endpoint (str): Specific endpoint to flush (optional, flushes all if None)
    - async_ (bool): Whether to flush in separate thread (default: True)

    Returns:
    - bool: Whether flush was executed (False if another flush is already running)

    Raises:
    - MixpanelException: For communication errors with Mixpanel servers
    """

Buffer Management

Transfer events between internal buffer systems for thread-safe processing.

def transfer_buffers(self, endpoint=None):
    """
    Transfer events from async buffers to sync buffers for flushing.

    Parameters:
    - endpoint (str): Specific endpoint to transfer (optional, transfers all if None)
    """

Public Attributes

AsyncBufferedConsumer provides access to several public attributes for runtime configuration and monitoring.

class AsyncBufferedConsumer:
    flush_after: timedelta     # Time period after which events are automatically flushed
    flush_first: bool          # Whether to flush the first event immediately  
    last_flushed: datetime     # Timestamp of the last flush operation (None if never flushed)
    flushing_thread: Thread    # Reference to current flush thread (None if no flush active)

Testing and Debugging

Methods useful for testing and monitoring the async consumer's internal state.

def _flush_thread_is_free(self):
    """
    Check whether a flush thread is currently active.
    
    Returns:
    - bool: True if no flush thread is running, False otherwise
    """

def _sync_flush(self, endpoint=None):
    """
    Perform synchronous flush operation (used internally by flush threads).
    
    Parameters:
    - endpoint (str): Specific endpoint to flush (optional, flushes all if None)
    """

Helper Classes

FlushThread

Internal thread class used for asynchronous flushing operations. This class extends threading.Thread and is created automatically by AsyncBufferedConsumer when performing async flushes.

class FlushThread(threading.Thread):
    def __init__(self, consumer, endpoint=None):
        """
        Create a flush thread for asynchronous event sending.

        Parameters:
        - consumer (AsyncBufferedConsumer): Consumer instance to flush
        - endpoint (str): Specific endpoint to flush (optional, flushes all if None)
        """

    def run(self):
        """Execute the flush operation in the thread by calling consumer._sync_flush()."""

Configuration Examples

Custom Flush Timing

from datetime import timedelta
from mixpanel_async import AsyncBufferedConsumer

# Flush every 30 seconds or when queue reaches 50 events
consumer = AsyncBufferedConsumer(
    flush_after=timedelta(seconds=30),
    max_size=50,
    flush_first=False  # Don't flush the first event immediately
)

Custom API Endpoints and Retry Logic

consumer = AsyncBufferedConsumer(
    api_host="eu.mixpanel.com",  # Use EU data center
    retry_limit=6,  # Retry up to 6 times
    retry_backoff_factor=0.5,  # Longer backoff between retries
    request_timeout=30  # 30 second timeout
)

Production Usage Pattern

import atexit
from mixpanel import Mixpanel
from mixpanel_async import AsyncBufferedConsumer

# Create consumer with production settings
consumer = AsyncBufferedConsumer(
    flush_after=timedelta(seconds=5),  # Flush frequently
    max_size=100,  # Larger batch size
    retry_limit=8,  # More retries for reliability
    verify_cert=True  # Always verify SSL
)

mp = Mixpanel('YOUR_TOKEN', consumer=consumer)

# Ensure final flush on application exit
atexit.register(lambda: consumer.flush(async_=False))

# Your application code here
mp.track('user_id', 'app_start')

Error Handling

All errors are raised as MixpanelException from the base mixpanel library:

from mixpanel import MixpanelException
from mixpanel_async import AsyncBufferedConsumer

consumer = AsyncBufferedConsumer()

try:
    consumer.send('invalid_endpoint', '{"event": "test"}')
except MixpanelException as e:
    print(f"Error: {e}")
    # Handle the error appropriately

Types

# From datetime module
class datetime:
    @staticmethod
    def now():
        """Get current datetime, used for tracking flush timing."""

class timedelta:
    def __init__(self, days=0, seconds=0, microseconds=0, milliseconds=0, minutes=0, hours=0, weeks=0):
        """Time duration object for flush_after parameter."""

# From threading module
class Thread:
    """Base thread class extended by FlushThread."""
    def is_alive(self):
        """Check if thread is currently running."""
    def join(self):
        """Wait for thread to complete."""

# From mixpanel module  
class MixpanelException(Exception):
    """Exception raised for Mixpanel API errors and invalid usage."""
    message = str  # Error message
    endpoint = str  # Endpoint that caused the error (when applicable)