CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-fluent-logger

A Python logging handler for Fluentd event collector

Pending
Overview
Eval results
Files

core-sender.mddocs/

Core Sender Interface

Direct FluentSender interface for programmatic event emission with full control over connection parameters, error handling, and message formatting. This provides the lowest-level interface to Fluentd with maximum flexibility.

Capabilities

FluentSender Class

Main synchronous sender class that manages connections to Fluentd servers and handles event transmission with buffering and error recovery.

class FluentSender:
    def __init__(
        self,
        tag: str,
        host: str = "localhost",
        port: int = 24224,
        bufmax: int = 1048576,
        timeout: float = 3.0,
        verbose: bool = False,
        buffer_overflow_handler = None,
        nanosecond_precision: bool = False,
        msgpack_kwargs = None,
        *,
        forward_packet_error: bool = True,
        **kwargs
    ):
        """
        Initialize FluentSender.

        Parameters:
        - tag (str): Tag prefix for events
        - host (str): Fluentd host, supports "unix://" URLs for Unix sockets
        - port (int): Fluentd port for TCP connections
        - bufmax (int): Maximum buffer size in bytes (default 1MB)
        - timeout (float): Connection timeout in seconds
        - verbose (bool): Enable verbose logging of packets
        - buffer_overflow_handler (callable): Handler for buffer overflow events
        - nanosecond_precision (bool): Use nanosecond-precision timestamps
        - msgpack_kwargs (dict): Additional msgpack serialization options
        - forward_packet_error (bool): Forward packet errors as events
        """

    def emit(self, label: str, data: dict) -> bool:
        """
        Emit event with current timestamp.

        Parameters:
        - label (str): Event label (combined with tag as 'tag.label')
        - data (dict): Event data dictionary

        Returns:
        bool: True if successful, False if error occurred
        """

    def emit_with_time(self, label: str, timestamp, data: dict) -> bool:
        """
        Emit event with specific timestamp.

        Parameters:
        - label (str): Event label
        - timestamp: Unix timestamp (int/float) or EventTime instance
        - data (dict): Event data dictionary

        Returns:
        bool: True if successful, False if error occurred
        """

    def close(self) -> None:
        """
        Close sender and flush any pending events.
        Calls buffer_overflow_handler for any remaining pending events.
        """

    def clear_last_error(self, _thread_id=None) -> None:
        """
        Clear the last error from thread-local storage.

        Parameters:
        - _thread_id: Internal parameter for thread identification
        """

    @property
    def last_error(self):
        """
        Get the last error that occurred (thread-local).

        Returns:
        Exception or None: Last error for current thread
        """

    @last_error.setter
    def last_error(self, err):
        """Set the last error for current thread."""

    def __enter__(self):
        """Enter context manager."""
        
    def __exit__(self, typ, value, traceback):
        """Exit context manager, closes sender."""

Testing Functions

Internal functions available for testing purposes:

def _set_global_sender(sender):
    """
    [For testing] Set global sender directly.
    
    Parameters:
    - sender (FluentSender): Sender instance to use as global sender
    """

Global Sender Functions

Module-level functions for managing a global FluentSender instance, providing a singleton pattern for application-wide logging.

def setup(tag: str, **kwargs) -> None:
    """
    Initialize global FluentSender instance.

    Parameters:
    - tag (str): Tag prefix for events
    - **kwargs: Additional FluentSender constructor arguments
    """

def get_global_sender():
    """
    Get the global FluentSender instance.

    Returns:
    FluentSender or None: Global sender instance
    """

def close() -> None:
    """Close the global FluentSender instance."""

EventTime Class

Specialized timestamp class for nanosecond-precision logging, implemented as msgpack ExtType for efficient serialization.

class EventTime:
    def __new__(cls, timestamp: float, nanoseconds: int = None):
        """
        Create EventTime instance.

        Parameters:
        - timestamp (float): Unix timestamp in seconds
        - nanoseconds (int, optional): Nanosecond component, calculated from timestamp if not provided
        
        Returns:
        EventTime: New EventTime instance (msgpack ExtType)
        """

    @classmethod
    def from_unix_nano(cls, unix_nano: int):
        """
        Create EventTime from nanosecond timestamp.

        Parameters:
        - unix_nano (int): Unix timestamp in nanoseconds

        Returns:
        EventTime: New EventTime instance
        """

Usage Examples

Basic Event Emission

from fluent import sender

# Create sender for local Fluentd
logger = sender.FluentSender('app')

# Send events
logger.emit('user.login', {'user_id': 123, 'ip': '192.168.1.1'})
logger.emit('user.action', {'user_id': 123, 'action': 'click', 'target': 'button'})

logger.close()

Remote Fluentd Connection

from fluent import sender

# Connect to remote Fluentd server
logger = sender.FluentSender('app', host='fluentd.example.com', port=24224)

# Send event
result = logger.emit('purchase', {
    'user_id': 456,
    'product_id': 'prod-123',
    'amount': 29.99,
    'currency': 'USD'
})

if not result:
    print(f"Failed to send event: {logger.last_error}")
    logger.clear_last_error()

logger.close()

Unix Socket Connection

from fluent import sender

# Connect via Unix socket
logger = sender.FluentSender('app', host='unix:///var/run/fluentd.sock')

logger.emit('system.alert', {'level': 'warning', 'message': 'Disk usage high'})
logger.close()

Nanosecond Precision Timestamps

import time
from fluent import sender

# Enable nanosecond precision
logger = sender.FluentSender('app', nanosecond_precision=True)

# Automatic nanosecond timestamp
logger.emit('timing.event', {'operation': 'database_query', 'duration_ms': 150})

# Manual timestamp with nanosecond precision
timestamp = time.time()
logger.emit_with_time('timing.precise', timestamp, {'value': 42})

# Using EventTime directly
event_time = sender.EventTime.from_unix_nano(time.time_ns())
logger.emit_with_time('timing.nano', event_time, {'precision': 'nanosecond'})

logger.close()

Buffer Overflow Handling

import msgpack
from io import BytesIO
from fluent import sender

def handle_overflow(pendings):
    """Custom handler for buffer overflow"""
    print(f"Buffer overflow! {len(pendings)} bytes pending")
    
    # Parse pending events
    unpacker = msgpack.Unpacker(BytesIO(pendings))
    for event in unpacker:
        print(f"Lost event: {event}")

# Create sender with overflow handler
logger = sender.FluentSender(
    'app',
    host='unreliable-host.example.com',
    bufmax=1024,  # Small buffer for demonstration
    buffer_overflow_handler=handle_overflow
)

# Send events (some may trigger overflow if connection fails)
for i in range(100):
    logger.emit('test', {'index': i, 'data': 'x' * 100})

logger.close()

Context Manager Usage

from fluent import sender

# Automatic cleanup with context manager
with sender.FluentSender('app') as logger:
    logger.emit('session.start', {'user_id': 789})
    logger.emit('session.action', {'action': 'view_page', 'page': '/home'})
    logger.emit('session.end', {'duration': 120})
    
# Sender is automatically closed

Global Sender Pattern

from fluent import sender

# Setup global sender at application start
sender.setup('myapp', host='logs.company.com', port=24224)

# Use global sender anywhere in application
def process_user_action(user_id, action):
    global_sender = sender.get_global_sender()
    global_sender.emit('user.action', {
        'user_id': user_id,
        'action': action,
        'timestamp': time.time()
    })

# Application shutdown
def shutdown():
    sender.close()

Install with Tessl CLI

npx tessl i tessl/pypi-fluent-logger

docs

async-communication.md

core-sender.md

event-interface.md

index.md

logging-integration.md

tile.json