A Python logging handler for Fluentd event collector
—
Non-blocking asynchronous versions of both the core sender and logging handler interfaces, using background threads and queues to prevent application blocking during log transmission. This ensures high performance even when Fluentd servers are slow or unreachable.
Asynchronous version of FluentSender that uses background threads for non-blocking event transmission with queue-based buffering.
class FluentSender(fluent.sender.FluentSender):
def __init__(
self,
tag: str,
host: str = "localhost",
port: int = 24224,
bufmax: int = 1048576,
timeout: float = 3.0,
verbose: bool = False,
buffer_overflow_handler = None,
nanosecond_precision: bool = False,
msgpack_kwargs = None,
queue_maxsize: int = 100,
queue_circular: bool = False,
queue_overflow_handler = None,
**kwargs
):
"""
Initialize AsyncFluentSender.
Parameters:
- tag (str): Tag prefix for events
- host (str): Fluentd host
- port (int): Fluentd port
- bufmax (int): Maximum buffer size in bytes
- timeout (float): Connection timeout
- verbose (bool): Verbose logging
- buffer_overflow_handler (callable): Buffer overflow handler
- nanosecond_precision (bool): Use nanosecond timestamps
- msgpack_kwargs (dict): msgpack options
- queue_maxsize (int): Maximum queue size (default 100)
- queue_circular (bool): Use circular queue mode (default False)
- queue_overflow_handler (callable): Queue overflow handler
- **kwargs: Additional sender options
"""
def close(self, flush: bool = True) -> None:
"""
Close async sender and background thread.
Parameters:
- flush (bool): Whether to flush pending events before closing
"""
@property
def queue_maxsize(self) -> int:
"""
Get queue maximum size.
Returns:
int: Maximum queue size
"""
@property
def queue_blocking(self) -> bool:
"""
Check if queue is in blocking mode.
Returns:
bool: True if queue blocks when full, False if circular
"""
@property
def queue_circular(self) -> bool:
"""
Check if queue is in circular mode.
Returns:
bool: True if queue discards oldest events when full
"""Asynchronous logging handler that inherits from FluentHandler but uses AsyncFluentSender for non-blocking log transmission.
class FluentHandler(fluent.handler.FluentHandler):
def getSenderClass(self):
"""
Get the async sender class.
Returns:
class: AsyncFluentSender class
"""Module-level functions for managing global async sender instances.
def setup(tag: str, **kwargs) -> None:
"""
Initialize global AsyncFluentSender instance.
Parameters:
- tag (str): Tag prefix for events
- **kwargs: AsyncFluentSender constructor arguments
"""
def get_global_sender():
"""
Get the global AsyncFluentSender instance.
Returns:
AsyncFluentSender or None: Global async sender instance
"""
def close() -> None:
"""Close the global AsyncFluentSender instance."""
def _set_global_sender(sender):
"""
[For testing] Set global async sender directly.
Parameters:
- sender (AsyncFluentSender): Async sender instance to use as global sender
"""DEFAULT_QUEUE_MAXSIZE = 100
DEFAULT_QUEUE_CIRCULAR = FalseThe async module exports classes via __all__:
__all__ = ["EventTime", "FluentSender"]from fluent import asyncsender as sender
# Create async sender - automatically starts background thread
logger = sender.FluentSender('app')
# These calls return immediately without blocking
logger.emit('user.login', {'user_id': 123, 'method': 'password'})
logger.emit('user.action', {'user_id': 123, 'action': 'view_dashboard'})
logger.emit('user.logout', {'user_id': 123, 'session_duration': 1800})
# IMPORTANT: Always close to ensure thread cleanup
logger.close()from fluent import asyncsender as sender
import time
# Configure for high throughput
logger = sender.FluentSender(
'metrics',
host='high-performance-fluentd.example.com',
queue_maxsize=1000, # Large queue for bursts
timeout=1.0 # Fast timeout
)
# Send burst of events without blocking
start_time = time.time()
for i in range(10000):
logger.emit('metric.point', {
'timestamp': time.time(),
'metric_name': 'cpu_usage',
'value': 50 + (i % 50),
'host': f'server-{i % 10}'
})
elapsed = time.time() - start_time
print(f"Sent 10000 events in {elapsed:.2f} seconds")
# Cleanup - waits for background thread to finish
logger.close()from fluent import asyncsender as sender
def queue_overflow_handler(discarded_bytes):
"""Handle discarded events in circular mode"""
print(f"Discarded {len(discarded_bytes)} bytes due to queue overflow")
# Enable circular queue to never block the application
logger = sender.FluentSender(
'app',
host='slow-fluentd.example.com',
queue_maxsize=50,
queue_circular=True, # Never block, discard oldest
queue_overflow_handler=queue_overflow_handler
)
# Application never blocks, even if Fluentd is slow
for i in range(1000):
logger.emit('event', {'index': i, 'data': 'important_data'})
# This always returns immediately
logger.close()from fluent import asyncsender as sender
# Setup global async sender at application start
sender.setup('webapp', host='logs.company.com', queue_maxsize=500)
def handle_web_request(request):
"""Handle web request with non-blocking logging"""
start_time = time.time()
# Process request
response = process_request(request)
# Log without blocking response
global_sender = sender.get_global_sender()
global_sender.emit('request.completed', {
'path': request.path,
'method': request.method,
'status_code': response.status_code,
'duration_ms': int((time.time() - start_time) * 1000),
'user_id': request.user_id
})
return response
# Application shutdown
def shutdown():
sender.close() # Wait for background threads to finishimport logging
from fluent import asynchandler as handler
# Setup async logging handler
logger = logging.getLogger('async_app')
logger.setLevel(logging.INFO)
# Non-blocking log handler
async_handler = handler.FluentHandler(
'app.logs',
host='logs.example.com',
queue_maxsize=200,
queue_circular=False # Block if queue fills up
)
logger.addHandler(async_handler)
# Logging calls return immediately
logger.info('Application started')
logger.info('Processing batch job')
logger.info('Batch job completed')
# IMPORTANT: Close handler before exit
async_handler.close()from fluent import asyncsender as sender
import threading
import time
# Create async sender with monitoring
logger = sender.FluentSender(
'monitored_app',
queue_maxsize=100,
verbose=True # Enable packet logging
)
def monitor_queue():
"""Monitor queue status"""
while not logger._closed:
queue_size = logger._queue.qsize()
print(f"Queue size: {queue_size}/{logger.queue_maxsize}")
time.sleep(1)
# Start monitoring thread
monitor_thread = threading.Thread(target=monitor_queue)
monitor_thread.daemon = True
monitor_thread.start()
# Send events
for i in range(50):
logger.emit('test', {'index': i})
time.sleep(0.1) # Slow sending to see queue behavior
logger.close()from fluent import asyncsender as sender
import time
def connection_error_handler(pendings):
"""Handle connection failures"""
print(f"Connection failed, {len(pendings)} bytes pending")
# Save to local file as backup
with open('/tmp/failed_events.backup', 'ab') as f:
f.write(pendings)
# Setup with error handling
logger = sender.FluentSender(
'app',
host='unreliable-server.example.com',
buffer_overflow_handler=connection_error_handler,
timeout=2.0
)
# Send events - connection failures handled in background
for i in range(100):
logger.emit('event', {'index': i, 'timestamp': time.time()})
# Check for errors (errors occur in background thread)
time.sleep(5) # Wait for background processing
# Note: last_error is from background thread context
if logger.last_error:
print(f"Background error: {logger.last_error}")
logger.close()from fluent import asyncsender as sender
import signal
import sys
# Global sender for cleanup
global_sender = None
def signal_handler(signum, frame):
"""Handle shutdown signals gracefully"""
print("Shutting down gracefully...")
if global_sender:
print("Closing async sender...")
global_sender.close(flush=True) # Wait for pending events
print("Async sender closed")
sys.exit(0)
# Setup signal handlers
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Create global sender
global_sender = sender.FluentSender('app', queue_maxsize=1000)
# Application main loop
try:
while True:
# Simulate application work
global_sender.emit('heartbeat', {
'timestamp': time.time(),
'status': 'running'
})
time.sleep(1)
except KeyboardInterrupt:
signal_handler(signal.SIGINT, None)from fluent import sender, asyncsender
import time
def benchmark_sync_sender():
"""Benchmark synchronous sender"""
logger = sender.FluentSender('sync_test')
start_time = time.time()
for i in range(1000):
logger.emit('test', {'index': i})
logger.close()
return time.time() - start_time
def benchmark_async_sender():
"""Benchmark asynchronous sender"""
logger = asyncsender.FluentSender('async_test', queue_maxsize=1500)
start_time = time.time()
for i in range(1000):
logger.emit('test', {'index': i})
# Time to queue all events (not send)
queue_time = time.time() - start_time
# Close and wait for actual sending
logger.close()
total_time = time.time() - start_time
return queue_time, total_time
# Run benchmarks
sync_time = benchmark_sync_sender()
async_queue_time, async_total_time = benchmark_async_sender()
print(f"Sync sender: {sync_time:.2f}s")
print(f"Async sender (queue): {async_queue_time:.2f}s")
print(f"Async sender (total): {async_total_time:.2f}s")
print(f"Speedup: {sync_time / async_queue_time:.1f}x")Install with Tessl CLI
npx tessl i tessl/pypi-fluent-logger