0
# Consumer Configuration
1
2
Consumer configuration enables customization of data transmission behavior with direct HTTP consumers for immediate sending or buffered consumers for batch processing. This provides flexibility for different performance requirements and network conditions.
3
4
## Capabilities
5
6
### Direct HTTP Consumer
7
8
The default consumer that sends HTTP requests directly to Mixpanel service, one per call, providing immediate data transmission with comprehensive retry logic.
9
10
```python { .api }
11
class Consumer:
12
def __init__(self, events_url: str = None, people_url: str = None, import_url: str = None, request_timeout: int = None, groups_url: str = None, api_host: str = "api.mixpanel.com", retry_limit: int = 4, retry_backoff_factor: float = 0.25, verify_cert: bool = True):
13
"""
14
A consumer that sends an HTTP request directly to the Mixpanel service, one per call.
15
16
Parameters:
17
- events_url (str, optional): Override the default events API endpoint
18
- people_url (str, optional): Override the default people API endpoint
19
- import_url (str, optional): Override the default import API endpoint
20
- request_timeout (int, optional): Connection timeout in seconds
21
- groups_url (str, optional): Override the default groups API endpoint
22
- api_host (str): The Mixpanel API domain where all requests should be issued (default: "api.mixpanel.com")
23
- retry_limit (int): Number of times to retry each request in case of connection or HTTP 5xx error (default: 4)
24
- retry_backoff_factor (float): Controls sleep time between retries (default: 0.25)
25
- verify_cert (bool): Whether to verify the server certificate (default: True)
26
"""
27
28
def send(self, endpoint: str, json_message: str, api_key: str = None, api_secret: str = None):
29
"""
30
Immediately record an event or a profile update.
31
32
Parameters:
33
- endpoint (str): The Mixpanel API endpoint ("events", "people", "groups", "imports")
34
- json_message (str): A JSON message formatted for the endpoint
35
- api_key (str, optional): Your Mixpanel project's API key
36
- api_secret (str, optional): Your Mixpanel project's API secret
37
38
Returns:
39
bool: True if successful
40
41
Raises:
42
MixpanelException: If the endpoint doesn't exist, server is unreachable, or message cannot be processed
43
"""
44
```
45
46
**Usage Example:**
47
48
```python
49
from mixpanel import Mixpanel, Consumer
50
51
# Create consumer with custom configuration
52
consumer = Consumer(
53
api_host="eu.mixpanel.com", # Use EU endpoint
54
request_timeout=30, # 30 second timeout
55
retry_limit=2, # Fewer retries for faster failures
56
verify_cert=True # Verify SSL certificates
57
)
58
59
# Use consumer with Mixpanel instance
60
mp = Mixpanel("YOUR_PROJECT_TOKEN", consumer=consumer)
61
62
# All tracking will use the custom consumer
63
mp.track("user_123", "event_name", {"property": "value"})
64
```
65
66
### Buffered Consumer
67
68
A consumer that maintains per-endpoint buffers of messages and sends them in batches for improved performance and reduced network overhead.
69
70
```python { .api }
71
class BufferedConsumer:
72
def __init__(self, max_size: int = 50, events_url: str = None, people_url: str = None, import_url: str = None, request_timeout: int = None, groups_url: str = None, api_host: str = "api.mixpanel.com", retry_limit: int = 4, retry_backoff_factor: float = 0.25, verify_cert: bool = True):
73
"""
74
A consumer that maintains per-endpoint buffers and sends messages in batches.
75
76
Parameters:
77
- max_size (int): Number of send() calls for a given endpoint to buffer before flushing automatically (default: 50, max: 50)
78
- events_url (str, optional): Override the default events API endpoint
79
- people_url (str, optional): Override the default people API endpoint
80
- import_url (str, optional): Override the default import API endpoint
81
- request_timeout (int, optional): Connection timeout in seconds
82
- groups_url (str, optional): Override the default groups API endpoint
83
- api_host (str): The Mixpanel API domain where all requests should be issued (default: "api.mixpanel.com")
84
- retry_limit (int): Number of times to retry each request in case of connection or HTTP 5xx error (default: 4)
85
- retry_backoff_factor (float): Controls sleep time between retries (default: 0.25)
86
- verify_cert (bool): Whether to verify the server certificate (default: True)
87
88
Note: BufferedConsumer holds events, so you must call flush() when done sending to ensure all events are transmitted.
89
"""
90
91
def send(self, endpoint: str, json_message: str, api_key: str = None, api_secret: str = None):
92
"""
93
Record an event or profile update in buffer, flushing if buffer reaches max_size.
94
95
Parameters:
96
- endpoint (str): The Mixpanel API endpoint ("events", "people", "groups", "imports")
97
- json_message (str): A JSON message formatted for the endpoint
98
- api_key (str, optional): Your Mixpanel project's API key
99
- api_secret (str, optional): Your Mixpanel project's API secret
100
101
Raises:
102
MixpanelException: If the endpoint doesn't exist, server is unreachable, or any buffered message cannot be processed
103
104
Note: Exceptions may be caused by messages buffered from earlier send() calls.
105
"""
106
107
def flush(self):
108
"""
109
Immediately send all buffered messages to Mixpanel.
110
111
Raises:
112
MixpanelException: If the server is unreachable or any buffered message cannot be processed
113
"""
114
```
115
116
**Usage Example:**
117
118
```python
119
from mixpanel import Mixpanel, BufferedConsumer
120
121
# Create buffered consumer for high-throughput scenarios
122
buffered_consumer = BufferedConsumer(
123
max_size=25, # Smaller batches for more frequent sending
124
api_host="api.mixpanel.com", # Default API host
125
request_timeout=60, # Longer timeout for batch requests
126
retry_limit=3 # Moderate retry attempts
127
)
128
129
# Use buffered consumer with Mixpanel instance
130
mp = Mixpanel("YOUR_PROJECT_TOKEN", consumer=buffered_consumer)
131
132
# Track multiple events (they'll be buffered)
133
for i in range(100):
134
mp.track(f"user_{i}", "bulk_event", {"batch_id": "batch_001", "index": i})
135
136
# Explicitly flush remaining events (important!)
137
buffered_consumer.flush()
138
```
139
140
## Consumer Configuration Patterns
141
142
### High-Performance Batch Processing
143
144
For applications with high event volumes that can tolerate some latency.
145
146
```python
147
from mixpanel import Mixpanel, BufferedConsumer
148
149
# Configure for maximum throughput
150
high_throughput_consumer = BufferedConsumer(
151
max_size=50, # Maximum batch size
152
request_timeout=120, # Extended timeout for large batches
153
retry_limit=5, # More retries for reliability
154
retry_backoff_factor=0.5 # Longer backoff between retries
155
)
156
157
mp = Mixpanel("YOUR_PROJECT_TOKEN", consumer=high_throughput_consumer)
158
159
# Process events in bulk
160
events_data = get_events_from_database()
161
for event in events_data:
162
mp.track(event["user_id"], event["event_name"], event["properties"])
163
164
# Always flush at the end of processing
165
high_throughput_consumer.flush()
166
167
# Or use context manager pattern for automatic flushing
168
class AutoFlushMixpanel:
169
def __init__(self, token, consumer):
170
self.mp = Mixpanel(token, consumer=consumer)
171
self.consumer = consumer
172
173
def __enter__(self):
174
return self.mp
175
176
def __exit__(self, exc_type, exc_val, exc_tb):
177
if hasattr(self.consumer, 'flush'):
178
self.consumer.flush()
179
180
# Usage with automatic flushing
181
with AutoFlushMixpanel("YOUR_PROJECT_TOKEN", high_throughput_consumer) as mp:
182
for event in events_data:
183
mp.track(event["user_id"], event["event_name"], event["properties"])
184
# flush() is called automatically when exiting the context
185
```
186
187
### Low-Latency Real-time Tracking
188
189
For applications requiring immediate data transmission with minimal delay.
190
191
```python
192
from mixpanel import Mixpanel, Consumer
193
194
# Configure for minimal latency
195
real_time_consumer = Consumer(
196
request_timeout=5, # Quick timeout for fast failures
197
retry_limit=1, # Minimal retries for speed
198
retry_backoff_factor=0.1 # Short backoff
199
)
200
201
mp = Mixpanel("YOUR_PROJECT_TOKEN", consumer=real_time_consumer)
202
203
# Events are sent immediately
204
mp.track("user_123", "critical_action", {"timestamp": time.time()})
205
```
206
207
### Regional API Endpoints
208
209
Configure consumers for different regional Mixpanel endpoints.
210
211
```python
212
from mixpanel import Mixpanel, Consumer
213
214
# European data residency
215
eu_consumer = Consumer(api_host="eu.mixpanel.com")
216
eu_mp = Mixpanel("EU_PROJECT_TOKEN", consumer=eu_consumer)
217
218
# US endpoint (default)
219
us_consumer = Consumer(api_host="api.mixpanel.com")
220
us_mp = Mixpanel("US_PROJECT_TOKEN", consumer=us_consumer)
221
222
# Custom endpoint for enterprise customers
223
enterprise_consumer = Consumer(
224
events_url="https://custom.mixpanel-endpoint.com/track",
225
people_url="https://custom.mixpanel-endpoint.com/engage"
226
)
227
enterprise_mp = Mixpanel("ENTERPRISE_TOKEN", consumer=enterprise_consumer)
228
```
229
230
### Development and Testing Configuration
231
232
Configure consumers for development environments with different retry and timeout behaviors.
233
234
```python
235
from mixpanel import Mixpanel, Consumer
236
237
# Development consumer with aggressive timeouts and minimal retries
238
dev_consumer = Consumer(
239
request_timeout=2, # Fast timeout for development
240
retry_limit=0, # No retries for immediate feedback
241
verify_cert=False # Allow self-signed certificates
242
)
243
244
# Test consumer that doesn't actually send data
245
class TestConsumer:
246
def __init__(self):
247
self.sent_messages = []
248
249
def send(self, endpoint, json_message, api_key=None, api_secret=None):
250
import json
251
self.sent_messages.append({
252
"endpoint": endpoint,
253
"message": json.loads(json_message),
254
"api_key": api_key,
255
"api_secret": api_secret
256
})
257
return True
258
259
# Use test consumer for unit tests
260
test_consumer = TestConsumer()
261
test_mp = Mixpanel("TEST_TOKEN", consumer=test_consumer)
262
263
test_mp.track("test_user", "test_event", {"test": True})
264
assert len(test_consumer.sent_messages) == 1
265
assert test_consumer.sent_messages[0]["endpoint"] == "events"
266
```
267
268
## Best Practices
269
270
### Consumer Selection
271
272
- **Use Consumer (default)** for real-time applications requiring immediate data transmission
273
- **Use BufferedConsumer** for batch processing, high-volume applications, or when network efficiency is important
274
- Consider application requirements: latency tolerance, throughput needs, and error handling preferences
275
276
### Configuration Guidelines
277
278
- Set appropriate timeouts based on network conditions and performance requirements
279
- Configure retry limits and backoff factors based on reliability needs versus performance
280
- Use regional endpoints (api_host) to comply with data residency requirements
281
282
### Buffer Management
283
284
- Always call `flush()` on BufferedConsumer instances before application shutdown
285
- Consider implementing automatic flushing patterns or context managers
286
- Monitor buffer sizes and flush frequency to balance performance and data freshness
287
288
### Error Handling
289
290
- Implement proper exception handling for both Consumer and BufferedConsumer
291
- Consider different error handling strategies for different consumer types
292
- Log and monitor consumer failures for operational awareness
293
294
### Security Considerations
295
296
- Set `verify_cert=True` in production environments for SSL verification
297
- Use appropriate API endpoints and credentials for different environments
298
- Implement proper credential management and rotation practices