CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-mixpanel

Official Mixpanel library for Python providing server-side analytics tracking

Pending
Overview
Eval results
Files

consumer-configuration.mddocs/

Consumer Configuration

Consumer configuration enables customization of data transmission behavior with direct HTTP consumers for immediate sending or buffered consumers for batch processing. This provides flexibility for different performance requirements and network conditions.

Capabilities

Direct HTTP Consumer

The default consumer that sends HTTP requests directly to Mixpanel service, one per call, providing immediate data transmission with comprehensive retry logic.

class Consumer:
    def __init__(self, events_url: str = None, people_url: str = None, import_url: str = None, request_timeout: int = None, groups_url: str = None, api_host: str = "api.mixpanel.com", retry_limit: int = 4, retry_backoff_factor: float = 0.25, verify_cert: bool = True):
        """
        A consumer that sends an HTTP request directly to the Mixpanel service, one per call.

        Parameters:
        - events_url (str, optional): Override the default events API endpoint
        - people_url (str, optional): Override the default people API endpoint
        - import_url (str, optional): Override the default import API endpoint
        - request_timeout (int, optional): Connection timeout in seconds
        - groups_url (str, optional): Override the default groups API endpoint
        - api_host (str): The Mixpanel API domain where all requests should be issued (default: "api.mixpanel.com")
        - retry_limit (int): Number of times to retry each request in case of connection or HTTP 5xx error (default: 4)
        - retry_backoff_factor (float): Controls sleep time between retries (default: 0.25)
        - verify_cert (bool): Whether to verify the server certificate (default: True)
        """

    def send(self, endpoint: str, json_message: str, api_key: str = None, api_secret: str = None):
        """
        Immediately record an event or a profile update.

        Parameters:
        - endpoint (str): The Mixpanel API endpoint ("events", "people", "groups", "imports")
        - json_message (str): A JSON message formatted for the endpoint
        - api_key (str, optional): Your Mixpanel project's API key
        - api_secret (str, optional): Your Mixpanel project's API secret

        Returns:
        bool: True if successful

        Raises:
        MixpanelException: If the endpoint doesn't exist, server is unreachable, or message cannot be processed
        """

Usage Example:

from mixpanel import Mixpanel, Consumer

# Create consumer with custom configuration
consumer = Consumer(
    api_host="eu.mixpanel.com",  # Use EU endpoint
    request_timeout=30,          # 30 second timeout
    retry_limit=2,               # Fewer retries for faster failures
    verify_cert=True             # Verify SSL certificates
)

# Use consumer with Mixpanel instance
mp = Mixpanel("YOUR_PROJECT_TOKEN", consumer=consumer)

# All tracking will use the custom consumer
mp.track("user_123", "event_name", {"property": "value"})

Buffered Consumer

A consumer that maintains per-endpoint buffers of messages and sends them in batches for improved performance and reduced network overhead.

class BufferedConsumer:
    def __init__(self, max_size: int = 50, events_url: str = None, people_url: str = None, import_url: str = None, request_timeout: int = None, groups_url: str = None, api_host: str = "api.mixpanel.com", retry_limit: int = 4, retry_backoff_factor: float = 0.25, verify_cert: bool = True):
        """
        A consumer that maintains per-endpoint buffers and sends messages in batches.

        Parameters:
        - max_size (int): Number of send() calls for a given endpoint to buffer before flushing automatically (default: 50, max: 50)
        - events_url (str, optional): Override the default events API endpoint
        - people_url (str, optional): Override the default people API endpoint
        - import_url (str, optional): Override the default import API endpoint
        - request_timeout (int, optional): Connection timeout in seconds
        - groups_url (str, optional): Override the default groups API endpoint
        - api_host (str): The Mixpanel API domain where all requests should be issued (default: "api.mixpanel.com")
        - retry_limit (int): Number of times to retry each request in case of connection or HTTP 5xx error (default: 4)
        - retry_backoff_factor (float): Controls sleep time between retries (default: 0.25)
        - verify_cert (bool): Whether to verify the server certificate (default: True)

        Note: BufferedConsumer holds events, so you must call flush() when done sending to ensure all events are transmitted.
        """

    def send(self, endpoint: str, json_message: str, api_key: str = None, api_secret: str = None):
        """
        Record an event or profile update in buffer, flushing if buffer reaches max_size.

        Parameters:
        - endpoint (str): The Mixpanel API endpoint ("events", "people", "groups", "imports")
        - json_message (str): A JSON message formatted for the endpoint
        - api_key (str, optional): Your Mixpanel project's API key
        - api_secret (str, optional): Your Mixpanel project's API secret

        Raises:
        MixpanelException: If the endpoint doesn't exist, server is unreachable, or any buffered message cannot be processed

        Note: Exceptions may be caused by messages buffered from earlier send() calls.
        """

    def flush(self):
        """
        Immediately send all buffered messages to Mixpanel.

        Raises:
        MixpanelException: If the server is unreachable or any buffered message cannot be processed
        """

Usage Example:

from mixpanel import Mixpanel, BufferedConsumer

# Create buffered consumer for high-throughput scenarios
buffered_consumer = BufferedConsumer(
    max_size=25,                    # Smaller batches for more frequent sending
    api_host="api.mixpanel.com",    # Default API host
    request_timeout=60,             # Longer timeout for batch requests
    retry_limit=3                   # Moderate retry attempts
)

# Use buffered consumer with Mixpanel instance
mp = Mixpanel("YOUR_PROJECT_TOKEN", consumer=buffered_consumer)

# Track multiple events (they'll be buffered)
for i in range(100):
    mp.track(f"user_{i}", "bulk_event", {"batch_id": "batch_001", "index": i})

# Explicitly flush remaining events (important!)
buffered_consumer.flush()

Consumer Configuration Patterns

High-Performance Batch Processing

For applications with high event volumes that can tolerate some latency.

from mixpanel import Mixpanel, BufferedConsumer

# Configure for maximum throughput
high_throughput_consumer = BufferedConsumer(
    max_size=50,                    # Maximum batch size
    request_timeout=120,            # Extended timeout for large batches  
    retry_limit=5,                  # More retries for reliability
    retry_backoff_factor=0.5        # Longer backoff between retries
)

mp = Mixpanel("YOUR_PROJECT_TOKEN", consumer=high_throughput_consumer)

# Process events in bulk
events_data = get_events_from_database()
for event in events_data:
    mp.track(event["user_id"], event["event_name"], event["properties"])

# Always flush at the end of processing
high_throughput_consumer.flush()

# Or use context manager pattern for automatic flushing
class AutoFlushMixpanel:
    def __init__(self, token, consumer):
        self.mp = Mixpanel(token, consumer=consumer)
        self.consumer = consumer
    
    def __enter__(self):
        return self.mp
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        if hasattr(self.consumer, 'flush'):
            self.consumer.flush()

# Usage with automatic flushing
with AutoFlushMixpanel("YOUR_PROJECT_TOKEN", high_throughput_consumer) as mp:
    for event in events_data:
        mp.track(event["user_id"], event["event_name"], event["properties"])
# flush() is called automatically when exiting the context

Low-Latency Real-time Tracking

For applications requiring immediate data transmission with minimal delay.

from mixpanel import Mixpanel, Consumer

# Configure for minimal latency
real_time_consumer = Consumer(
    request_timeout=5,              # Quick timeout for fast failures
    retry_limit=1,                  # Minimal retries for speed
    retry_backoff_factor=0.1        # Short backoff
)

mp = Mixpanel("YOUR_PROJECT_TOKEN", consumer=real_time_consumer)

# Events are sent immediately
mp.track("user_123", "critical_action", {"timestamp": time.time()})

Regional API Endpoints

Configure consumers for different regional Mixpanel endpoints.

from mixpanel import Mixpanel, Consumer

# European data residency
eu_consumer = Consumer(api_host="eu.mixpanel.com")
eu_mp = Mixpanel("EU_PROJECT_TOKEN", consumer=eu_consumer)

# US endpoint (default)
us_consumer = Consumer(api_host="api.mixpanel.com")
us_mp = Mixpanel("US_PROJECT_TOKEN", consumer=us_consumer)

# Custom endpoint for enterprise customers
enterprise_consumer = Consumer(
    events_url="https://custom.mixpanel-endpoint.com/track",
    people_url="https://custom.mixpanel-endpoint.com/engage"
)
enterprise_mp = Mixpanel("ENTERPRISE_TOKEN", consumer=enterprise_consumer)

Development and Testing Configuration

Configure consumers for development environments with different retry and timeout behaviors.

from mixpanel import Mixpanel, Consumer

# Development consumer with aggressive timeouts and minimal retries
dev_consumer = Consumer(
    request_timeout=2,              # Fast timeout for development
    retry_limit=0,                  # No retries for immediate feedback
    verify_cert=False               # Allow self-signed certificates
)

# Test consumer that doesn't actually send data
class TestConsumer:
    def __init__(self):
        self.sent_messages = []
    
    def send(self, endpoint, json_message, api_key=None, api_secret=None):
        import json
        self.sent_messages.append({
            "endpoint": endpoint,
            "message": json.loads(json_message),
            "api_key": api_key,
            "api_secret": api_secret
        })
        return True

# Use test consumer for unit tests
test_consumer = TestConsumer()
test_mp = Mixpanel("TEST_TOKEN", consumer=test_consumer)

test_mp.track("test_user", "test_event", {"test": True})
assert len(test_consumer.sent_messages) == 1
assert test_consumer.sent_messages[0]["endpoint"] == "events"

Best Practices

Consumer Selection

  • Use Consumer (default) for real-time applications requiring immediate data transmission
  • Use BufferedConsumer for batch processing, high-volume applications, or when network efficiency is important
  • Consider application requirements: latency tolerance, throughput needs, and error handling preferences

Configuration Guidelines

  • Set appropriate timeouts based on network conditions and performance requirements
  • Configure retry limits and backoff factors based on reliability needs versus performance
  • Use regional endpoints (api_host) to comply with data residency requirements

Buffer Management

  • Always call flush() on BufferedConsumer instances before application shutdown
  • Consider implementing automatic flushing patterns or context managers
  • Monitor buffer sizes and flush frequency to balance performance and data freshness

Error Handling

  • Implement proper exception handling for both Consumer and BufferedConsumer
  • Consider different error handling strategies for different consumer types
  • Log and monitor consumer failures for operational awareness

Security Considerations

  • Set verify_cert=True in production environments for SSL verification
  • Use appropriate API endpoints and credentials for different environments
  • Implement proper credential management and rotation practices

Install with Tessl CLI

npx tessl i tessl/pypi-mixpanel

docs

consumer-configuration.md

event-tracking.md

group-analytics.md

identity-management.md

index.md

people-analytics.md

tile.json