CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-fluent-logger

A Python logging handler for Fluentd event collector

Pending
Overview
Eval results
Files

async-communication.mddocs/

Asynchronous Communication

Non-blocking asynchronous versions of both the core sender and logging handler interfaces, using background threads and queues to prevent application blocking during log transmission. This ensures high performance even when Fluentd servers are slow or unreachable.

Capabilities

AsyncFluentSender Class

Asynchronous version of FluentSender that uses background threads for non-blocking event transmission with queue-based buffering.

class FluentSender(fluent.sender.FluentSender):
    def __init__(
        self,
        tag: str,
        host: str = "localhost",
        port: int = 24224,
        bufmax: int = 1048576,
        timeout: float = 3.0,
        verbose: bool = False,
        buffer_overflow_handler = None,
        nanosecond_precision: bool = False,
        msgpack_kwargs = None,
        queue_maxsize: int = 100,
        queue_circular: bool = False,
        queue_overflow_handler = None,
        **kwargs
    ):
        """
        Initialize AsyncFluentSender.

        Parameters:
        - tag (str): Tag prefix for events
        - host (str): Fluentd host
        - port (int): Fluentd port  
        - bufmax (int): Maximum buffer size in bytes
        - timeout (float): Connection timeout
        - verbose (bool): Verbose logging
        - buffer_overflow_handler (callable): Buffer overflow handler
        - nanosecond_precision (bool): Use nanosecond timestamps
        - msgpack_kwargs (dict): msgpack options
        - queue_maxsize (int): Maximum queue size (default 100)
        - queue_circular (bool): Use circular queue mode (default False)
        - queue_overflow_handler (callable): Queue overflow handler
        - **kwargs: Additional sender options
        """

    def close(self, flush: bool = True) -> None:
        """
        Close async sender and background thread.

        Parameters:
        - flush (bool): Whether to flush pending events before closing
        """

    @property
    def queue_maxsize(self) -> int:
        """
        Get queue maximum size.

        Returns:
        int: Maximum queue size
        """

    @property
    def queue_blocking(self) -> bool:
        """
        Check if queue is in blocking mode.

        Returns:
        bool: True if queue blocks when full, False if circular
        """

    @property
    def queue_circular(self) -> bool:
        """
        Check if queue is in circular mode.

        Returns:
        bool: True if queue discards oldest events when full
        """

AsyncFluentHandler Class

Asynchronous logging handler that inherits from FluentHandler but uses AsyncFluentSender for non-blocking log transmission.

class FluentHandler(fluent.handler.FluentHandler):
    def getSenderClass(self):
        """
        Get the async sender class.

        Returns:
        class: AsyncFluentSender class
        """

Global Async Functions

Module-level functions for managing global async sender instances.

def setup(tag: str, **kwargs) -> None:
    """
    Initialize global AsyncFluentSender instance.

    Parameters:
    - tag (str): Tag prefix for events
    - **kwargs: AsyncFluentSender constructor arguments
    """

def get_global_sender():
    """
    Get the global AsyncFluentSender instance.

    Returns:
    AsyncFluentSender or None: Global async sender instance
    """

def close() -> None:
    """Close the global AsyncFluentSender instance."""

def _set_global_sender(sender):
    """
    [For testing] Set global async sender directly.
    
    Parameters:
    - sender (AsyncFluentSender): Async sender instance to use as global sender
    """

Constants

DEFAULT_QUEUE_MAXSIZE = 100
DEFAULT_QUEUE_CIRCULAR = False

Exported Classes

The async module exports classes via __all__:

__all__ = ["EventTime", "FluentSender"]

Usage Examples

Basic Async Event Logging

from fluent import asyncsender as sender

# Create async sender - automatically starts background thread
logger = sender.FluentSender('app')

# These calls return immediately without blocking
logger.emit('user.login', {'user_id': 123, 'method': 'password'})
logger.emit('user.action', {'user_id': 123, 'action': 'view_dashboard'})
logger.emit('user.logout', {'user_id': 123, 'session_duration': 1800})

# IMPORTANT: Always close to ensure thread cleanup
logger.close()

High-Performance Async Logging

from fluent import asyncsender as sender
import time

# Configure for high throughput
logger = sender.FluentSender(
    'metrics',
    host='high-performance-fluentd.example.com',
    queue_maxsize=1000,  # Large queue for bursts
    timeout=1.0          # Fast timeout
)

# Send burst of events without blocking
start_time = time.time()

for i in range(10000):
    logger.emit('metric.point', {
        'timestamp': time.time(),
        'metric_name': 'cpu_usage',
        'value': 50 + (i % 50),
        'host': f'server-{i % 10}'
    })

elapsed = time.time() - start_time
print(f"Sent 10000 events in {elapsed:.2f} seconds")

# Cleanup - waits for background thread to finish
logger.close()

Circular Queue Mode

from fluent import asyncsender as sender

def queue_overflow_handler(discarded_bytes):
    """Handle discarded events in circular mode"""
    print(f"Discarded {len(discarded_bytes)} bytes due to queue overflow")

# Enable circular queue to never block the application
logger = sender.FluentSender(
    'app',
    host='slow-fluentd.example.com',
    queue_maxsize=50,
    queue_circular=True,  # Never block, discard oldest
    queue_overflow_handler=queue_overflow_handler
)

# Application never blocks, even if Fluentd is slow
for i in range(1000):
    logger.emit('event', {'index': i, 'data': 'important_data'})
    # This always returns immediately

logger.close()

Async Global Sender Pattern

from fluent import asyncsender as sender

# Setup global async sender at application start
sender.setup('webapp', host='logs.company.com', queue_maxsize=500)

def handle_web_request(request):
    """Handle web request with non-blocking logging"""
    start_time = time.time()
    
    # Process request
    response = process_request(request)
    
    # Log without blocking response
    global_sender = sender.get_global_sender()
    global_sender.emit('request.completed', {
        'path': request.path,
        'method': request.method,
        'status_code': response.status_code,
        'duration_ms': int((time.time() - start_time) * 1000),
        'user_id': request.user_id
    })
    
    return response

# Application shutdown
def shutdown():
    sender.close()  # Wait for background threads to finish

Async Logging Handler

import logging
from fluent import asynchandler as handler

# Setup async logging handler
logger = logging.getLogger('async_app')
logger.setLevel(logging.INFO)

# Non-blocking log handler
async_handler = handler.FluentHandler(
    'app.logs',
    host='logs.example.com',
    queue_maxsize=200,
    queue_circular=False  # Block if queue fills up
)

logger.addHandler(async_handler)

# Logging calls return immediately
logger.info('Application started')
logger.info('Processing batch job')
logger.info('Batch job completed')

# IMPORTANT: Close handler before exit
async_handler.close()

Queue Management and Monitoring

from fluent import asyncsender as sender
import threading
import time

# Create async sender with monitoring
logger = sender.FluentSender(
    'monitored_app',
    queue_maxsize=100,
    verbose=True  # Enable packet logging
)

def monitor_queue():
    """Monitor queue status"""
    while not logger._closed:
        queue_size = logger._queue.qsize()
        print(f"Queue size: {queue_size}/{logger.queue_maxsize}")
        time.sleep(1)

# Start monitoring thread
monitor_thread = threading.Thread(target=monitor_queue)
monitor_thread.daemon = True
monitor_thread.start()

# Send events
for i in range(50):
    logger.emit('test', {'index': i})
    time.sleep(0.1)  # Slow sending to see queue behavior

logger.close()

Error Handling with Async Sender

from fluent import asyncsender as sender
import time

def connection_error_handler(pendings):
    """Handle connection failures"""
    print(f"Connection failed, {len(pendings)} bytes pending")
    
    # Save to local file as backup
    with open('/tmp/failed_events.backup', 'ab') as f:
        f.write(pendings)

# Setup with error handling
logger = sender.FluentSender(
    'app',
    host='unreliable-server.example.com',
    buffer_overflow_handler=connection_error_handler,
    timeout=2.0
)

# Send events - connection failures handled in background
for i in range(100):
    logger.emit('event', {'index': i, 'timestamp': time.time()})

# Check for errors (errors occur in background thread)
time.sleep(5)  # Wait for background processing

# Note: last_error is from background thread context
if logger.last_error:
    print(f"Background error: {logger.last_error}")

logger.close()

Graceful Shutdown

from fluent import asyncsender as sender
import signal
import sys

# Global sender for cleanup
global_sender = None

def signal_handler(signum, frame):
    """Handle shutdown signals gracefully"""
    print("Shutting down gracefully...")
    
    if global_sender:
        print("Closing async sender...")
        global_sender.close(flush=True)  # Wait for pending events
        print("Async sender closed")
    
    sys.exit(0)

# Setup signal handlers
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

# Create global sender
global_sender = sender.FluentSender('app', queue_maxsize=1000)

# Application main loop
try:
    while True:
        # Simulate application work
        global_sender.emit('heartbeat', {
            'timestamp': time.time(),
            'status': 'running'
        })
        time.sleep(1)
        
except KeyboardInterrupt:
    signal_handler(signal.SIGINT, None)

Performance Comparison

from fluent import sender, asyncsender
import time

def benchmark_sync_sender():
    """Benchmark synchronous sender"""
    logger = sender.FluentSender('sync_test')
    
    start_time = time.time()
    
    for i in range(1000):
        logger.emit('test', {'index': i})
    
    logger.close()
    return time.time() - start_time

def benchmark_async_sender():
    """Benchmark asynchronous sender"""
    logger = asyncsender.FluentSender('async_test', queue_maxsize=1500)
    
    start_time = time.time()
    
    for i in range(1000):
        logger.emit('test', {'index': i})
    
    # Time to queue all events (not send)
    queue_time = time.time() - start_time
    
    # Close and wait for actual sending
    logger.close()
    total_time = time.time() - start_time
    
    return queue_time, total_time

# Run benchmarks
sync_time = benchmark_sync_sender()
async_queue_time, async_total_time = benchmark_async_sender()

print(f"Sync sender: {sync_time:.2f}s")
print(f"Async sender (queue): {async_queue_time:.2f}s")
print(f"Async sender (total): {async_total_time:.2f}s")
print(f"Speedup: {sync_time / async_queue_time:.1f}x")

Install with Tessl CLI

npx tessl i tessl/pypi-fluent-logger

docs

async-communication.md

core-sender.md

event-interface.md

index.md

logging-integration.md

tile.json