A Python logging handler for Fluentd event collector
npx @tessl/cli install tessl/pypi-fluent-logger@0.11.0A Python structured logger for Fluentd/Fluent Bit event collectors that enables structured event logging from Python applications to centralized log processing systems. It provides multiple interfaces including direct FluentSender for programmatic event emission, standard Python logging handler integration, and asynchronous communication options to prevent blocking during log transmission.
pip install fluent-loggerNote: The package has an empty __init__.py, so all functionality must be imported from specific modules:
from fluent import senderFor event-based interface:
from fluent import eventFor Python logging integration:
from fluent import handlerFor asynchronous versions:
from fluent import asyncsender
from fluent import asynchandlerFor version information:
from fluent.__about__ import __version__from fluent import sender
# Create a FluentSender for local Fluentd
logger = sender.FluentSender('app')
# Send an event with current timestamp
logger.emit('follow', {'from': 'userA', 'to': 'userB'})
# Send event with specific timestamp
import time
cur_time = int(time.time())
logger.emit_with_time('follow', cur_time, {'from': 'userA', 'to': 'userB'})
# Check for errors
if not logger.emit('follow', {'from': 'userA', 'to': 'userB'}):
print(logger.last_error)
logger.clear_last_error()
# Clean up
logger.close()The fluent-logger package is built around several key components:
Direct FluentSender interface for programmatic event emission with full control over connection parameters, error handling, and message formatting.
class FluentSender:
def __init__(
self,
tag: str,
host: str = "localhost",
port: int = 24224,
bufmax: int = 1048576,
timeout: float = 3.0,
verbose: bool = False,
buffer_overflow_handler = None,
nanosecond_precision: bool = False,
msgpack_kwargs = None,
*,
forward_packet_error: bool = True,
**kwargs
): ...
def emit(self, label: str, data: dict) -> bool: ...
def emit_with_time(self, label: str, timestamp, data: dict) -> bool: ...
def close(self) -> None: ...
def __enter__(self): ...
def __exit__(self, typ, value, traceback): ...Simplified API wrapper using global sender instances for quick integration and reduced boilerplate code.
# Global functions
def setup(tag: str, **kwargs) -> None: ...
def get_global_sender(): ...
def close() -> None: ...
# Event class
class Event:
def __init__(self, label: str, data: dict, **kwargs): ...Full logging.Handler compatibility with structured formatting, custom formatters, and seamless integration with existing Python logging workflows.
class FluentHandler(logging.Handler):
def __init__(
self,
tag: str,
host: str = "localhost",
port: int = 24224,
timeout: float = 3.0,
verbose: bool = False,
buffer_overflow_handler = None,
msgpack_kwargs = None,
nanosecond_precision: bool = False,
**kwargs
): ...
def __enter__(self): ...
def __exit__(self, exc_type, exc_val, exc_tb): ...
class FluentRecordFormatter(logging.Formatter):
def __init__(
self,
fmt = None,
datefmt = None,
style: str = "%",
fill_missing_fmt_key: bool = False,
format_json: bool = True,
exclude_attrs = None
): ...Non-blocking asynchronous versions of both the core sender and logging handler interfaces, using background threads and queues to prevent application blocking during log transmission.
class FluentSender(fluent.sender.FluentSender):
def __init__(
self,
tag: str,
host: str = "localhost",
port: int = 24224,
bufmax: int = 1048576,
timeout: float = 3.0,
verbose: bool = False,
buffer_overflow_handler = None,
nanosecond_precision: bool = False,
msgpack_kwargs = None,
queue_maxsize: int = 100,
queue_circular: bool = False,
queue_overflow_handler = None,
**kwargs
): ...
def close(self, flush: bool = True) -> None: ...
@property
def queue_maxsize(self) -> int: ...
@property
def queue_blocking(self) -> bool: ...
@property
def queue_circular(self) -> bool: ...class EventTime:
"""msgpack ExtType for nanosecond-precision timestamps"""
def __new__(cls, timestamp: float, nanoseconds: int = None): ...
@classmethod
def from_unix_nano(cls, unix_nano: int): ...# Access package version
from fluent.__about__ import __version__
# __version__ = "0.11.1"