A Python logging handler for Fluentd event collector
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Simplified API wrapper using global sender instances for quick integration and reduced boilerplate code. This interface provides the easiest way to send events to Fluentd with minimal setup.
Module-level functions for initializing and managing a global sender instance, eliminating the need to pass sender objects throughout the application.
def setup(tag: str, **kwargs) -> None:
"""
Initialize global FluentSender instance.
Parameters:
- tag (str): Tag prefix for all events
- **kwargs: FluentSender constructor arguments (host, port, timeout, etc.)
"""
def get_global_sender():
"""
Get the global FluentSender instance.
Returns:
FluentSender or None: The global sender instance
"""
def close() -> None:
"""Close and cleanup the global FluentSender instance."""Simple event creation and emission class that automatically uses the global sender instance.
class Event:
def __init__(self, label: str, data: dict, **kwargs):
"""
Create and immediately send an event.
Parameters:
- label (str): Event label (combined with global tag)
- data (dict): Event data dictionary (must be a dict)
- sender (FluentSender, optional): Custom sender instance, defaults to global
- time (timestamp, optional): Custom timestamp, defaults to current time
Raises:
AssertionError: If data is not a dictionary
"""from fluent import sender, event
# Setup global sender once at application start
sender.setup('myapp')
# Send events anywhere in the application
event.Event('user.login', {
'user_id': 123,
'username': 'alice',
'ip_address': '192.168.1.100'
})
event.Event('user.action', {
'user_id': 123,
'action': 'view_page',
'page': '/dashboard',
'duration_ms': 250
})
# Cleanup at application shutdown
sender.close()from fluent import sender, event
# Setup with remote Fluentd server
sender.setup('webapp', host='logs.company.com', port=24224, timeout=5.0)
# Events are automatically sent to remote server
event.Event('order.created', {
'order_id': 'ord-12345',
'user_id': 456,
'total_amount': 89.99,
'currency': 'USD',
'items_count': 3
})
event.Event('payment.processed', {
'order_id': 'ord-12345',
'payment_method': 'credit_card',
'transaction_id': 'txn-67890'
})
sender.close()import time
from fluent import sender, event
sender.setup('analytics')
# Event with current timestamp (default)
event.Event('page.view', {
'url': '/products/123',
'user_agent': 'Mozilla/5.0...',
'referrer': 'https://google.com'
})
# Event with specific timestamp
custom_time = int(time.time()) - 3600 # 1 hour ago
event.Event('historical.data', {
'metric': 'cpu_usage',
'value': 75.5,
'host': 'server-01'
}, time=custom_time)
sender.close()from fluent import sender, event
# Setup default global sender
sender.setup('app')
# Create custom sender for special events
critical_sender = sender.FluentSender(
'critical',
host='alerts.company.com',
port=24224,
timeout=1.0
)
# Use global sender
event.Event('info', {'message': 'User logged in'})
# Use custom sender for critical events
event.Event('system.error', {
'error_code': 500,
'message': 'Database connection failed',
'severity': 'critical'
}, sender=critical_sender)
# Cleanup
sender.close()
critical_sender.close()from fluent import sender, event
# Setup with error handling configuration
sender.setup('app', host='unreliable-host.example.com', verbose=True)
# The Event class doesn't return success/failure status
# To check for errors, access the global sender directly
event.Event('test', {'data': 'some data'})
# Check for errors on global sender
global_sender = sender.get_global_sender()
if global_sender.last_error:
print(f"Error occurred: {global_sender.last_error}")
global_sender.clear_last_error()
sender.close()from fluent import sender, event
import atexit
class FluentLogger:
def __init__(self, app_name, **config):
self.app_name = app_name
sender.setup(app_name, **config)
atexit.register(self.cleanup)
def log_event(self, event_type, data):
event.Event(event_type, data)
def cleanup(self):
sender.close()
# Application setup
logger = FluentLogger('ecommerce', host='logs.example.com')
# Use throughout application
def handle_user_signup(user_data):
logger.log_event('user.signup', {
'user_id': user_data['id'],
'email': user_data['email'],
'signup_method': 'email',
'referral_code': user_data.get('referral')
})
def handle_purchase(order_data):
logger.log_event('order.completed', {
'order_id': order_data['id'],
'user_id': order_data['user_id'],
'amount': order_data['total'],
'currency': order_data['currency'],
'product_count': len(order_data['items'])
})
# Events are automatically sent, cleanup happens at exitfrom fluent import sender, event
sender.setup('batch_processor')
# Process multiple events in a batch
events_data = [
{'type': 'click', 'user_id': 1, 'element': 'button-a'},
{'type': 'scroll', 'user_id': 1, 'position': 50},
{'type': 'click', 'user_id': 2, 'element': 'link-b'},
{'type': 'pageview', 'user_id': 2, 'page': '/products'}
]
for event_data in events_data:
event_type = event_data.pop('type') # Remove type from data
event.Event(f'user.{event_type}', event_data)
sender.close()Install with Tessl CLI
npx tessl i tessl/pypi-fluent-logger