0
# Asynchronous Communication
1
2
Non-blocking asynchronous versions of both the core sender and logging handler interfaces, using background threads and queues to prevent application blocking during log transmission. This ensures high performance even when Fluentd servers are slow or unreachable.
3
4
## Capabilities
5
6
### AsyncFluentSender Class
7
8
Asynchronous version of FluentSender that uses background threads for non-blocking event transmission with queue-based buffering.
9
10
```python { .api }
11
class FluentSender(fluent.sender.FluentSender):
12
def __init__(
13
self,
14
tag: str,
15
host: str = "localhost",
16
port: int = 24224,
17
bufmax: int = 1048576,
18
timeout: float = 3.0,
19
verbose: bool = False,
20
buffer_overflow_handler = None,
21
nanosecond_precision: bool = False,
22
msgpack_kwargs = None,
23
queue_maxsize: int = 100,
24
queue_circular: bool = False,
25
queue_overflow_handler = None,
26
**kwargs
27
):
28
"""
29
Initialize AsyncFluentSender.
30
31
Parameters:
32
- tag (str): Tag prefix for events
33
- host (str): Fluentd host
34
- port (int): Fluentd port
35
- bufmax (int): Maximum buffer size in bytes
36
- timeout (float): Connection timeout
37
- verbose (bool): Verbose logging
38
- buffer_overflow_handler (callable): Buffer overflow handler
39
- nanosecond_precision (bool): Use nanosecond timestamps
40
- msgpack_kwargs (dict): msgpack options
41
- queue_maxsize (int): Maximum queue size (default 100)
42
- queue_circular (bool): Use circular queue mode (default False)
43
- queue_overflow_handler (callable): Queue overflow handler
44
- **kwargs: Additional sender options
45
"""
46
47
def close(self, flush: bool = True) -> None:
48
"""
49
Close async sender and background thread.
50
51
Parameters:
52
- flush (bool): Whether to flush pending events before closing
53
"""
54
55
@property
56
def queue_maxsize(self) -> int:
57
"""
58
Get queue maximum size.
59
60
Returns:
61
int: Maximum queue size
62
"""
63
64
@property
65
def queue_blocking(self) -> bool:
66
"""
67
Check if queue is in blocking mode.
68
69
Returns:
70
bool: True if queue blocks when full, False if circular
71
"""
72
73
@property
74
def queue_circular(self) -> bool:
75
"""
76
Check if queue is in circular mode.
77
78
Returns:
79
bool: True if queue discards oldest events when full
80
"""
81
```
82
83
### AsyncFluentHandler Class
84
85
Asynchronous logging handler that inherits from FluentHandler but uses AsyncFluentSender for non-blocking log transmission.
86
87
```python { .api }
88
class FluentHandler(fluent.handler.FluentHandler):
89
def getSenderClass(self):
90
"""
91
Get the async sender class.
92
93
Returns:
94
class: AsyncFluentSender class
95
"""
96
```
97
98
### Global Async Functions
99
100
Module-level functions for managing global async sender instances.
101
102
```python { .api }
103
def setup(tag: str, **kwargs) -> None:
104
"""
105
Initialize global AsyncFluentSender instance.
106
107
Parameters:
108
- tag (str): Tag prefix for events
109
- **kwargs: AsyncFluentSender constructor arguments
110
"""
111
112
def get_global_sender():
113
"""
114
Get the global AsyncFluentSender instance.
115
116
Returns:
117
AsyncFluentSender or None: Global async sender instance
118
"""
119
120
def close() -> None:
121
"""Close the global AsyncFluentSender instance."""
122
123
def _set_global_sender(sender):
124
"""
125
[For testing] Set global async sender directly.
126
127
Parameters:
128
- sender (AsyncFluentSender): Async sender instance to use as global sender
129
"""
130
```
131
132
### Constants
133
134
```python { .api }
135
DEFAULT_QUEUE_MAXSIZE = 100
136
DEFAULT_QUEUE_CIRCULAR = False
137
```
138
139
### Exported Classes
140
141
The async module exports classes via `__all__`:
142
143
```python { .api }
144
__all__ = ["EventTime", "FluentSender"]
145
```
146
147
## Usage Examples
148
149
### Basic Async Event Logging
150
151
```python
152
from fluent import asyncsender as sender
153
154
# Create async sender - automatically starts background thread
155
logger = sender.FluentSender('app')
156
157
# These calls return immediately without blocking
158
logger.emit('user.login', {'user_id': 123, 'method': 'password'})
159
logger.emit('user.action', {'user_id': 123, 'action': 'view_dashboard'})
160
logger.emit('user.logout', {'user_id': 123, 'session_duration': 1800})
161
162
# IMPORTANT: Always close to ensure thread cleanup
163
logger.close()
164
```
165
166
### High-Performance Async Logging
167
168
```python
169
from fluent import asyncsender as sender
170
import time
171
172
# Configure for high throughput
173
logger = sender.FluentSender(
174
'metrics',
175
host='high-performance-fluentd.example.com',
176
queue_maxsize=1000, # Large queue for bursts
177
timeout=1.0 # Fast timeout
178
)
179
180
# Send burst of events without blocking
181
start_time = time.time()
182
183
for i in range(10000):
184
logger.emit('metric.point', {
185
'timestamp': time.time(),
186
'metric_name': 'cpu_usage',
187
'value': 50 + (i % 50),
188
'host': f'server-{i % 10}'
189
})
190
191
elapsed = time.time() - start_time
192
print(f"Sent 10000 events in {elapsed:.2f} seconds")
193
194
# Cleanup - waits for background thread to finish
195
logger.close()
196
```
197
198
### Circular Queue Mode
199
200
```python
201
from fluent import asyncsender as sender
202
203
def queue_overflow_handler(discarded_bytes):
204
"""Handle discarded events in circular mode"""
205
print(f"Discarded {len(discarded_bytes)} bytes due to queue overflow")
206
207
# Enable circular queue to never block the application
208
logger = sender.FluentSender(
209
'app',
210
host='slow-fluentd.example.com',
211
queue_maxsize=50,
212
queue_circular=True, # Never block, discard oldest
213
queue_overflow_handler=queue_overflow_handler
214
)
215
216
# Application never blocks, even if Fluentd is slow
217
for i in range(1000):
218
logger.emit('event', {'index': i, 'data': 'important_data'})
219
# This always returns immediately
220
221
logger.close()
222
```
223
224
### Async Global Sender Pattern
225
226
```python
227
from fluent import asyncsender as sender
228
229
# Setup global async sender at application start
230
sender.setup('webapp', host='logs.company.com', queue_maxsize=500)
231
232
def handle_web_request(request):
233
"""Handle web request with non-blocking logging"""
234
start_time = time.time()
235
236
# Process request
237
response = process_request(request)
238
239
# Log without blocking response
240
global_sender = sender.get_global_sender()
241
global_sender.emit('request.completed', {
242
'path': request.path,
243
'method': request.method,
244
'status_code': response.status_code,
245
'duration_ms': int((time.time() - start_time) * 1000),
246
'user_id': request.user_id
247
})
248
249
return response
250
251
# Application shutdown
252
def shutdown():
253
sender.close() # Wait for background threads to finish
254
```
255
256
### Async Logging Handler
257
258
```python
259
import logging
260
from fluent import asynchandler as handler
261
262
# Setup async logging handler
263
logger = logging.getLogger('async_app')
264
logger.setLevel(logging.INFO)
265
266
# Non-blocking log handler
267
async_handler = handler.FluentHandler(
268
'app.logs',
269
host='logs.example.com',
270
queue_maxsize=200,
271
queue_circular=False # Block if queue fills up
272
)
273
274
logger.addHandler(async_handler)
275
276
# Logging calls return immediately
277
logger.info('Application started')
278
logger.info('Processing batch job')
279
logger.info('Batch job completed')
280
281
# IMPORTANT: Close handler before exit
282
async_handler.close()
283
```
284
285
### Queue Management and Monitoring
286
287
```python
288
from fluent import asyncsender as sender
289
import threading
290
import time
291
292
# Create async sender with monitoring
293
logger = sender.FluentSender(
294
'monitored_app',
295
queue_maxsize=100,
296
verbose=True # Enable packet logging
297
)
298
299
def monitor_queue():
300
"""Monitor queue status"""
301
while not logger._closed:
302
queue_size = logger._queue.qsize()
303
print(f"Queue size: {queue_size}/{logger.queue_maxsize}")
304
time.sleep(1)
305
306
# Start monitoring thread
307
monitor_thread = threading.Thread(target=monitor_queue)
308
monitor_thread.daemon = True
309
monitor_thread.start()
310
311
# Send events
312
for i in range(50):
313
logger.emit('test', {'index': i})
314
time.sleep(0.1) # Slow sending to see queue behavior
315
316
logger.close()
317
```
318
319
### Error Handling with Async Sender
320
321
```python
322
from fluent import asyncsender as sender
323
import time
324
325
def connection_error_handler(pendings):
326
"""Handle connection failures"""
327
print(f"Connection failed, {len(pendings)} bytes pending")
328
329
# Save to local file as backup
330
with open('/tmp/failed_events.backup', 'ab') as f:
331
f.write(pendings)
332
333
# Setup with error handling
334
logger = sender.FluentSender(
335
'app',
336
host='unreliable-server.example.com',
337
buffer_overflow_handler=connection_error_handler,
338
timeout=2.0
339
)
340
341
# Send events - connection failures handled in background
342
for i in range(100):
343
logger.emit('event', {'index': i, 'timestamp': time.time()})
344
345
# Check for errors (errors occur in background thread)
346
time.sleep(5) # Wait for background processing
347
348
# Note: last_error is from background thread context
349
if logger.last_error:
350
print(f"Background error: {logger.last_error}")
351
352
logger.close()
353
```
354
355
### Graceful Shutdown
356
357
```python
358
from fluent import asyncsender as sender
359
import signal
360
import sys
361
362
# Global sender for cleanup
363
global_sender = None
364
365
def signal_handler(signum, frame):
366
"""Handle shutdown signals gracefully"""
367
print("Shutting down gracefully...")
368
369
if global_sender:
370
print("Closing async sender...")
371
global_sender.close(flush=True) # Wait for pending events
372
print("Async sender closed")
373
374
sys.exit(0)
375
376
# Setup signal handlers
377
signal.signal(signal.SIGINT, signal_handler)
378
signal.signal(signal.SIGTERM, signal_handler)
379
380
# Create global sender
381
global_sender = sender.FluentSender('app', queue_maxsize=1000)
382
383
# Application main loop
384
try:
385
while True:
386
# Simulate application work
387
global_sender.emit('heartbeat', {
388
'timestamp': time.time(),
389
'status': 'running'
390
})
391
time.sleep(1)
392
393
except KeyboardInterrupt:
394
signal_handler(signal.SIGINT, None)
395
```
396
397
### Performance Comparison
398
399
```python
400
from fluent import sender, asyncsender
401
import time
402
403
def benchmark_sync_sender():
404
"""Benchmark synchronous sender"""
405
logger = sender.FluentSender('sync_test')
406
407
start_time = time.time()
408
409
for i in range(1000):
410
logger.emit('test', {'index': i})
411
412
logger.close()
413
return time.time() - start_time
414
415
def benchmark_async_sender():
416
"""Benchmark asynchronous sender"""
417
logger = asyncsender.FluentSender('async_test', queue_maxsize=1500)
418
419
start_time = time.time()
420
421
for i in range(1000):
422
logger.emit('test', {'index': i})
423
424
# Time to queue all events (not send)
425
queue_time = time.time() - start_time
426
427
# Close and wait for actual sending
428
logger.close()
429
total_time = time.time() - start_time
430
431
return queue_time, total_time
432
433
# Run benchmarks
434
sync_time = benchmark_sync_sender()
435
async_queue_time, async_total_time = benchmark_async_sender()
436
437
print(f"Sync sender: {sync_time:.2f}s")
438
print(f"Async sender (queue): {async_queue_time:.2f}s")
439
print(f"Async sender (total): {async_total_time:.2f}s")
440
print(f"Speedup: {sync_time / async_queue_time:.1f}x")
441
```