Python logging handler for Logstash with support for UDP, TCP, and AMQP transport protocols.
npx @tessl/cli install tessl/pypi-python-logstash@0.4.0Python logging handler for Logstash that enables applications to send log messages directly to Logstash using multiple transport protocols. The library provides custom logging handlers for UDP, TCP, and AMQP connections with configurable message formatting compatible with different Logstash schema versions.
pip install python-logstashpip install pika (required for AMQP handler)import logstashCommon usage patterns:
from logstash import UDPLogstashHandler, TCPLogstashHandler, AMQPLogstashHandler
from logstash import LogstashHandler # alias for UDPLogstashHandler
from logstash import LogstashFormatterVersion0, LogstashFormatterVersion1For custom formatter usage:
from logstash.formatter import LogstashFormatterBaseimport logging
import logstash
# Configure logger
logger = logging.getLogger('my-app')
logger.setLevel(logging.INFO)
# Add UDP handler (default)
logger.addHandler(logstash.UDPLogstashHandler('localhost', 5959, version=1))
# Send log messages
logger.info('Application started')
logger.error('An error occurred')
# Add extra fields
extra = {
'user_id': 12345,
'request_id': 'abc-123',
'custom_data': {'key': 'value'}
}
logger.info('User action completed', extra=extra)The python-logstash library follows a modular handler-formatter architecture:
UDPLogstashHandler, TCPLogstashHandler, AMQPLogstashHandler) that inherit from Python's standard logging handlers and manage network connectionsLogstashFormatterVersion0, LogstashFormatterVersion1) that convert Python log records into Logstash-compatible JSON formatLogstashFormatterBase provides common functionality for all formattersEach handler automatically selects the appropriate formatter based on the specified schema version, enabling seamless integration with different Logstash configurations while maintaining backward compatibility.
Fast, fire-and-forget logging via UDP. Suitable for high-throughput scenarios where message delivery is not critical.
class UDPLogstashHandler(TCPLogstashHandler, DatagramHandler):
def __init__(self, host, port=5959, message_type='logstash', tags=None, fqdn=False, version=0):
"""
Python logging handler for Logstash. Sends events over UDP.
Parameters:
- host (str): The host of the logstash server
- port (int): The port of the logstash server (default: 5959)
- message_type (str): The type of the message (default: 'logstash')
- tags (list): List of tags for a logger (default: None)
- fqdn (bool): Show fully qualified domain name (default: False)
- version (int): Version of logstash event schema (default: 0)
"""
def makePickle(self, record):
"""
Serialize log record for UDP transmission.
Parameters:
- record: LogRecord object
Returns:
- bytes: Formatted message without newline
"""Reliable logging via TCP connections. Provides guaranteed message delivery with connection management.
class TCPLogstashHandler(SocketHandler):
def __init__(self, host, port=5959, message_type='logstash', tags=None, fqdn=False, version=0):
"""
Python logging handler for Logstash. Sends events over TCP.
Parameters:
- host (str): The host of the logstash server
- port (int): The port of the logstash server (default: 5959)
- message_type (str): The type of the message (default: 'logstash')
- tags (list): List of tags for a logger (default: None)
- fqdn (bool): Show fully qualified domain name (default: False)
- version (int): Version of logstash event schema (default: 0)
"""
def makePickle(self, record):
"""
Serialize log record for TCP transmission.
Parameters:
- record: LogRecord object
Returns:
- bytes: Formatted message with trailing newline
"""Message queue-based logging via RabbitMQ. Provides reliable, asynchronous message delivery with advanced routing capabilities.
class AMQPLogstashHandler(SocketHandler):
def __init__(self, host='localhost', port=5672, username='guest', password='guest',
exchange='logstash', exchange_type='fanout', virtual_host='/',
message_type='logstash', tags=None, durable=False, passive=False,
version=0, extra_fields=True, fqdn=False, facility=None,
exchange_routing_key=''):
"""
AMQP Log Format handler for RabbitMQ integration.
Parameters:
- host (str): AMQP host (default: 'localhost')
- port (int): AMQP port (default: 5672)
- username (str): AMQP username (default: 'guest')
- password (str): AMQP password (default: 'guest')
- exchange (str): AMQP exchange (default: 'logstash')
- exchange_type (str): AMQP exchange type (default: 'fanout')
- virtual_host (str): AMQP virtual host (default: '/')
- message_type (str): The type of the message (default: 'logstash')
- tags (list): List of tags for a logger (default: None)
- durable (bool): AMQP exchange durability (default: False)
- passive (bool): Exchange declaration mode (default: False)
- version (int): Version of logstash event schema (default: 0)
- extra_fields (bool): Send extra fields on log record (default: True)
- fqdn (bool): Use fully qualified domain name (default: False)
- facility (str): Replace facility with specified value (default: None)
- exchange_routing_key (str): AMQP routing key (default: '')
"""
def makeSocket(self):
"""
Create AMQP socket connection.
Returns:
- PikaSocket: AMQP connection wrapper
"""
def makePickle(self, record):
"""
Serialize log record for AMQP transmission.
Parameters:
- record: LogRecord object
Returns:
- bytes: Formatted message
"""Alias for UDPLogstashHandler to maintain backward compatibility.
LogstashHandler = UDPLogstashHandlerControl the structure and format of log messages sent to Logstash.
class LogstashFormatterBase(logging.Formatter):
def __init__(self, message_type='Logstash', tags=None, fqdn=False):
"""
Base formatter providing common functionality for all Logstash formatters.
Parameters:
- message_type (str): Type of message (default: 'Logstash')
- tags (list): List of tags (default: None)
- fqdn (bool): Use fully qualified domain name (default: False)
"""
def get_extra_fields(self, record):
"""
Extract extra fields from log record, filtering sensitive data.
Parameters:
- record: LogRecord object
Returns:
- dict: Extra fields to include in message
"""
def get_debug_fields(self, record):
"""
Extract debug information from log record.
Parameters:
- record: LogRecord object
Returns:
- dict: Debug fields including stack trace and process info
"""
@classmethod
def format_source(cls, message_type, host, path):
"""
Format source field for Logstash message.
Parameters:
- message_type (str): Message type
- host (str): Hostname
- path (str): File path
Returns:
- str: Formatted source string
"""
@classmethod
def format_timestamp(cls, time):
"""
Format timestamp in ISO 8601 format.
Parameters:
- time (float): Unix timestamp
Returns:
- str: ISO 8601 formatted timestamp
"""
@classmethod
def format_exception(cls, exc_info):
"""
Format exception information as string.
Parameters:
- exc_info: Exception info tuple
Returns:
- str: Formatted exception string
"""
@classmethod
def serialize(cls, message):
"""
Serialize message to JSON bytes.
Parameters:
- message (dict): Message dictionary
Returns:
- bytes: JSON-serialized message
"""
class LogstashFormatterVersion0(LogstashFormatterBase):
version = 0
def format(self, record):
"""
Format log record to Logstash v0 schema (@fields structure).
Parameters:
- record: LogRecord object
Returns:
- bytes: JSON-formatted message
"""
class LogstashFormatterVersion1(LogstashFormatterBase):
def format(self, record):
"""
Format log record to Logstash v1 schema (flat structure).
Parameters:
- record: LogRecord object
Returns:
- bytes: JSON-formatted message
"""Internal socket wrapper for AMQP connections used by AMQPLogstashHandler.
class PikaSocket:
def __init__(self, host, port, username, password, virtual_host, exchange,
routing_key, durable, passive, exchange_type):
"""
AMQP socket wrapper for RabbitMQ connections.
Parameters:
- host (str): AMQP host
- port (int): AMQP port
- username (str): AMQP username
- password (str): AMQP password
- virtual_host (str): AMQP virtual host
- exchange (str): AMQP exchange name
- routing_key (str): AMQP routing key
- durable (bool): Exchange durability
- passive (bool): Exchange declaration mode
- exchange_type (str): Exchange type
"""
def sendall(self, data):
"""
Send data via AMQP channel.
Parameters:
- data (bytes): Message data to send
"""
def close(self):
"""
Close AMQP connection.
"""import logging
import logstash
# Create logger
test_logger = logging.getLogger('python-logstash-logger')
test_logger.setLevel(logging.INFO)
# Add UDP handler with version 1 schema
test_logger.addHandler(logstash.UDPLogstashHandler('localhost', 5959, version=1))
# Log messages
test_logger.error('python-logstash: test logstash error message.')
test_logger.info('python-logstash: test logstash info message.')
test_logger.warning('python-logstash: test logstash warning message.')
# Add extra fields to logstash message
extra = {
'test_string': 'python version: ' + repr(sys.version_info),
'test_boolean': True,
'test_dict': {'a': 1, 'b': 'c'},
'test_float': 1.23,
'test_integer': 123,
'test_list': [1, 2, '3'],
}
test_logger.info('python-logstash: test extra fields', extra=extra)import logging
import logstash
# Create logger
test_logger = logging.getLogger('python-logstash-logger')
test_logger.setLevel(logging.INFO)
# Add TCP handler for reliable delivery
test_logger.addHandler(logstash.TCPLogstashHandler('localhost', 5959, version=1))
# Log messages
test_logger.info('Reliable TCP logging message')import logging
import logstash
# AMQP configuration
host = 'localhost'
username = 'guest'
password = 'guest'
exchange = 'logstash.py'
# Create logger
test_logger = logging.getLogger('python-logstash-logger')
test_logger.setLevel(logging.INFO)
# Add AMQP handler
test_logger.addHandler(logstash.AMQPLogstashHandler(
version=1,
host=host,
durable=True,
username=username,
password=password,
exchange=exchange
))
# Log messages
test_logger.error('python-logstash: test logstash error message.')
test_logger.info('python-logstash: test logstash info message.')
test_logger.warning('python-logstash: test logstash warning message.')
# Log exception with stack trace
try:
1/0
except:
test_logger.exception('python-logstash: test logstash exception with stack trace')Configure in settings.py:
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'handlers': {
'logstash': {
'level': 'DEBUG',
'class': 'logstash.TCPLogstashHandler',
'host': 'localhost',
'port': 5959,
'version': 1,
'message_type': 'logstash',
'fqdn': False,
'tags': ['django', 'production'],
},
},
'loggers': {
'django.request': {
'handlers': ['logstash'],
'level': 'DEBUG',
'propagate': True,
},
},
}@fields structureAll handlers automatically include:
@timestamp: ISO 8601 timestampmessage: The log messagelevel/levelname: Log levellogger_name: Logger namehost: Hostname (FQDN if enabled)path: Source file pathCustom fields can be added via the extra parameter in logging calls. Sensitive fields (auth_token, password, etc.) are automatically filtered out.
logging, socket, json, datetime)pika library for RabbitMQ connectivity