or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

async-communication.mdcore-sender.mdevent-interface.mdindex.mdlogging-integration.md
tile.json

tessl/pypi-fluent-logger

A Python logging handler for Fluentd event collector

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/fluent-logger@0.11.x

To install, run

npx @tessl/cli install tessl/pypi-fluent-logger@0.11.0

index.mddocs/

Fluent Logger

A Python structured logger for Fluentd/Fluent Bit event collectors that enables structured event logging from Python applications to centralized log processing systems. It provides multiple interfaces including direct FluentSender for programmatic event emission, standard Python logging handler integration, and asynchronous communication options to prevent blocking during log transmission.

Package Information

  • Package Name: fluent-logger
  • Version: 0.11.1
  • Language: Python
  • Installation: pip install fluent-logger

Core Imports

Note: The package has an empty __init__.py, so all functionality must be imported from specific modules:

from fluent import sender

For event-based interface:

from fluent import event

For Python logging integration:

from fluent import handler

For asynchronous versions:

from fluent import asyncsender
from fluent import asynchandler

For version information:

from fluent.__about__ import __version__

Basic Usage

from fluent import sender

# Create a FluentSender for local Fluentd
logger = sender.FluentSender('app')

# Send an event with current timestamp
logger.emit('follow', {'from': 'userA', 'to': 'userB'})

# Send event with specific timestamp
import time
cur_time = int(time.time())
logger.emit_with_time('follow', cur_time, {'from': 'userA', 'to': 'userB'})

# Check for errors
if not logger.emit('follow', {'from': 'userA', 'to': 'userB'}):
    print(logger.last_error)
    logger.clear_last_error()

# Clean up
logger.close()

Architecture

The fluent-logger package is built around several key components:

  • FluentSender: Core synchronous sender that manages TCP/Unix socket connections to Fluentd
  • AsyncFluentSender: Asynchronous version using background threads and queues for non-blocking operation
  • Event-based API: Simplified wrapper interface using global sender instances
  • Logging Integration: Full Python logging.Handler compatibility with structured formatting
  • Buffer Management: Automatic buffering, reconnection, and overflow handling for reliability

Capabilities

Core Sender Interface

Direct FluentSender interface for programmatic event emission with full control over connection parameters, error handling, and message formatting.

class FluentSender:
    def __init__(
        self,
        tag: str,
        host: str = "localhost",
        port: int = 24224,
        bufmax: int = 1048576,
        timeout: float = 3.0,
        verbose: bool = False,
        buffer_overflow_handler = None,
        nanosecond_precision: bool = False,
        msgpack_kwargs = None,
        *,
        forward_packet_error: bool = True,
        **kwargs
    ): ...

    def emit(self, label: str, data: dict) -> bool: ...
    def emit_with_time(self, label: str, timestamp, data: dict) -> bool: ...
    def close(self) -> None: ...
    
    def __enter__(self): ...
    def __exit__(self, typ, value, traceback): ...

Core Sender

Event-Based Interface

Simplified API wrapper using global sender instances for quick integration and reduced boilerplate code.

# Global functions
def setup(tag: str, **kwargs) -> None: ...
def get_global_sender(): ...
def close() -> None: ...

# Event class
class Event:
    def __init__(self, label: str, data: dict, **kwargs): ...

Event Interface

Python Logging Integration

Full logging.Handler compatibility with structured formatting, custom formatters, and seamless integration with existing Python logging workflows.

class FluentHandler(logging.Handler):
    def __init__(
        self,
        tag: str,
        host: str = "localhost",
        port: int = 24224,
        timeout: float = 3.0,
        verbose: bool = False,
        buffer_overflow_handler = None,
        msgpack_kwargs = None,
        nanosecond_precision: bool = False,
        **kwargs
    ): ...
    
    def __enter__(self): ...
    def __exit__(self, exc_type, exc_val, exc_tb): ...

class FluentRecordFormatter(logging.Formatter):
    def __init__(
        self,
        fmt = None,
        datefmt = None,
        style: str = "%",
        fill_missing_fmt_key: bool = False,
        format_json: bool = True,
        exclude_attrs = None
    ): ...

Logging Integration

Asynchronous Communication

Non-blocking asynchronous versions of both the core sender and logging handler interfaces, using background threads and queues to prevent application blocking during log transmission.

class FluentSender(fluent.sender.FluentSender):
    def __init__(
        self,
        tag: str,
        host: str = "localhost",
        port: int = 24224,
        bufmax: int = 1048576,
        timeout: float = 3.0,
        verbose: bool = False,
        buffer_overflow_handler = None,
        nanosecond_precision: bool = False,
        msgpack_kwargs = None,
        queue_maxsize: int = 100,
        queue_circular: bool = False,
        queue_overflow_handler = None,
        **kwargs
    ): ...

    def close(self, flush: bool = True) -> None: ...
    
    @property
    def queue_maxsize(self) -> int: ...
    
    @property
    def queue_blocking(self) -> bool: ...
    
    @property
    def queue_circular(self) -> bool: ...

Async Communication

Types

class EventTime:
    """msgpack ExtType for nanosecond-precision timestamps"""
    def __new__(cls, timestamp: float, nanoseconds: int = None): ...
    
    @classmethod
    def from_unix_nano(cls, unix_nano: int): ...

Version Information

# Access package version
from fluent.__about__ import __version__
# __version__ = "0.11.1"