0
# Core Sender Interface
1
2
Direct FluentSender interface for programmatic event emission with full control over connection parameters, error handling, and message formatting. This provides the lowest-level interface to Fluentd with maximum flexibility.
3
4
## Capabilities
5
6
### FluentSender Class
7
8
Main synchronous sender class that manages connections to Fluentd servers and handles event transmission with buffering and error recovery.
9
10
```python { .api }
11
class FluentSender:
12
def __init__(
13
self,
14
tag: str,
15
host: str = "localhost",
16
port: int = 24224,
17
bufmax: int = 1048576,
18
timeout: float = 3.0,
19
verbose: bool = False,
20
buffer_overflow_handler = None,
21
nanosecond_precision: bool = False,
22
msgpack_kwargs = None,
23
*,
24
forward_packet_error: bool = True,
25
**kwargs
26
):
27
"""
28
Initialize FluentSender.
29
30
Parameters:
31
- tag (str): Tag prefix for events
32
- host (str): Fluentd host, supports "unix://" URLs for Unix sockets
33
- port (int): Fluentd port for TCP connections
34
- bufmax (int): Maximum buffer size in bytes (default 1MB)
35
- timeout (float): Connection timeout in seconds
36
- verbose (bool): Enable verbose logging of packets
37
- buffer_overflow_handler (callable): Handler for buffer overflow events
38
- nanosecond_precision (bool): Use nanosecond-precision timestamps
39
- msgpack_kwargs (dict): Additional msgpack serialization options
40
- forward_packet_error (bool): Forward packet errors as events
41
"""
42
43
def emit(self, label: str, data: dict) -> bool:
44
"""
45
Emit event with current timestamp.
46
47
Parameters:
48
- label (str): Event label (combined with tag as 'tag.label')
49
- data (dict): Event data dictionary
50
51
Returns:
52
bool: True if successful, False if error occurred
53
"""
54
55
def emit_with_time(self, label: str, timestamp, data: dict) -> bool:
56
"""
57
Emit event with specific timestamp.
58
59
Parameters:
60
- label (str): Event label
61
- timestamp: Unix timestamp (int/float) or EventTime instance
62
- data (dict): Event data dictionary
63
64
Returns:
65
bool: True if successful, False if error occurred
66
"""
67
68
def close(self) -> None:
69
"""
70
Close sender and flush any pending events.
71
Calls buffer_overflow_handler for any remaining pending events.
72
"""
73
74
def clear_last_error(self, _thread_id=None) -> None:
75
"""
76
Clear the last error from thread-local storage.
77
78
Parameters:
79
- _thread_id: Internal parameter for thread identification
80
"""
81
82
@property
83
def last_error(self):
84
"""
85
Get the last error that occurred (thread-local).
86
87
Returns:
88
Exception or None: Last error for current thread
89
"""
90
91
@last_error.setter
92
def last_error(self, err):
93
"""Set the last error for current thread."""
94
95
def __enter__(self):
96
"""Enter context manager."""
97
98
def __exit__(self, typ, value, traceback):
99
"""Exit context manager, closes sender."""
100
```
101
102
### Testing Functions
103
104
Internal functions available for testing purposes:
105
106
```python { .api }
107
def _set_global_sender(sender):
108
"""
109
[For testing] Set global sender directly.
110
111
Parameters:
112
- sender (FluentSender): Sender instance to use as global sender
113
"""
114
```
115
116
### Global Sender Functions
117
118
Module-level functions for managing a global FluentSender instance, providing a singleton pattern for application-wide logging.
119
120
```python { .api }
121
def setup(tag: str, **kwargs) -> None:
122
"""
123
Initialize global FluentSender instance.
124
125
Parameters:
126
- tag (str): Tag prefix for events
127
- **kwargs: Additional FluentSender constructor arguments
128
"""
129
130
def get_global_sender():
131
"""
132
Get the global FluentSender instance.
133
134
Returns:
135
FluentSender or None: Global sender instance
136
"""
137
138
def close() -> None:
139
"""Close the global FluentSender instance."""
140
```
141
142
### EventTime Class
143
144
Specialized timestamp class for nanosecond-precision logging, implemented as msgpack ExtType for efficient serialization.
145
146
```python { .api }
147
class EventTime:
148
def __new__(cls, timestamp: float, nanoseconds: int = None):
149
"""
150
Create EventTime instance.
151
152
Parameters:
153
- timestamp (float): Unix timestamp in seconds
154
- nanoseconds (int, optional): Nanosecond component, calculated from timestamp if not provided
155
156
Returns:
157
EventTime: New EventTime instance (msgpack ExtType)
158
"""
159
160
@classmethod
161
def from_unix_nano(cls, unix_nano: int):
162
"""
163
Create EventTime from nanosecond timestamp.
164
165
Parameters:
166
- unix_nano (int): Unix timestamp in nanoseconds
167
168
Returns:
169
EventTime: New EventTime instance
170
"""
171
```
172
173
## Usage Examples
174
175
### Basic Event Emission
176
177
```python
178
from fluent import sender
179
180
# Create sender for local Fluentd
181
logger = sender.FluentSender('app')
182
183
# Send events
184
logger.emit('user.login', {'user_id': 123, 'ip': '192.168.1.1'})
185
logger.emit('user.action', {'user_id': 123, 'action': 'click', 'target': 'button'})
186
187
logger.close()
188
```
189
190
### Remote Fluentd Connection
191
192
```python
193
from fluent import sender
194
195
# Connect to remote Fluentd server
196
logger = sender.FluentSender('app', host='fluentd.example.com', port=24224)
197
198
# Send event
199
result = logger.emit('purchase', {
200
'user_id': 456,
201
'product_id': 'prod-123',
202
'amount': 29.99,
203
'currency': 'USD'
204
})
205
206
if not result:
207
print(f"Failed to send event: {logger.last_error}")
208
logger.clear_last_error()
209
210
logger.close()
211
```
212
213
### Unix Socket Connection
214
215
```python
216
from fluent import sender
217
218
# Connect via Unix socket
219
logger = sender.FluentSender('app', host='unix:///var/run/fluentd.sock')
220
221
logger.emit('system.alert', {'level': 'warning', 'message': 'Disk usage high'})
222
logger.close()
223
```
224
225
### Nanosecond Precision Timestamps
226
227
```python
228
import time
229
from fluent import sender
230
231
# Enable nanosecond precision
232
logger = sender.FluentSender('app', nanosecond_precision=True)
233
234
# Automatic nanosecond timestamp
235
logger.emit('timing.event', {'operation': 'database_query', 'duration_ms': 150})
236
237
# Manual timestamp with nanosecond precision
238
timestamp = time.time()
239
logger.emit_with_time('timing.precise', timestamp, {'value': 42})
240
241
# Using EventTime directly
242
event_time = sender.EventTime.from_unix_nano(time.time_ns())
243
logger.emit_with_time('timing.nano', event_time, {'precision': 'nanosecond'})
244
245
logger.close()
246
```
247
248
### Buffer Overflow Handling
249
250
```python
251
import msgpack
252
from io import BytesIO
253
from fluent import sender
254
255
def handle_overflow(pendings):
256
"""Custom handler for buffer overflow"""
257
print(f"Buffer overflow! {len(pendings)} bytes pending")
258
259
# Parse pending events
260
unpacker = msgpack.Unpacker(BytesIO(pendings))
261
for event in unpacker:
262
print(f"Lost event: {event}")
263
264
# Create sender with overflow handler
265
logger = sender.FluentSender(
266
'app',
267
host='unreliable-host.example.com',
268
bufmax=1024, # Small buffer for demonstration
269
buffer_overflow_handler=handle_overflow
270
)
271
272
# Send events (some may trigger overflow if connection fails)
273
for i in range(100):
274
logger.emit('test', {'index': i, 'data': 'x' * 100})
275
276
logger.close()
277
```
278
279
### Context Manager Usage
280
281
```python
282
from fluent import sender
283
284
# Automatic cleanup with context manager
285
with sender.FluentSender('app') as logger:
286
logger.emit('session.start', {'user_id': 789})
287
logger.emit('session.action', {'action': 'view_page', 'page': '/home'})
288
logger.emit('session.end', {'duration': 120})
289
290
# Sender is automatically closed
291
```
292
293
### Global Sender Pattern
294
295
```python
296
from fluent import sender
297
298
# Setup global sender at application start
299
sender.setup('myapp', host='logs.company.com', port=24224)
300
301
# Use global sender anywhere in application
302
def process_user_action(user_id, action):
303
global_sender = sender.get_global_sender()
304
global_sender.emit('user.action', {
305
'user_id': user_id,
306
'action': action,
307
'timestamp': time.time()
308
})
309
310
# Application shutdown
311
def shutdown():
312
sender.close()
313
```