or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-communication.mdcore-sender.mdevent-interface.mdindex.mdlogging-integration.md

core-sender.mddocs/

0

# Core Sender Interface

1

2

Direct FluentSender interface for programmatic event emission with full control over connection parameters, error handling, and message formatting. This provides the lowest-level interface to Fluentd with maximum flexibility.

3

4

## Capabilities

5

6

### FluentSender Class

7

8

Main synchronous sender class that manages connections to Fluentd servers and handles event transmission with buffering and error recovery.

9

10

```python { .api }

11

class FluentSender:

12

def __init__(

13

self,

14

tag: str,

15

host: str = "localhost",

16

port: int = 24224,

17

bufmax: int = 1048576,

18

timeout: float = 3.0,

19

verbose: bool = False,

20

buffer_overflow_handler = None,

21

nanosecond_precision: bool = False,

22

msgpack_kwargs = None,

23

*,

24

forward_packet_error: bool = True,

25

**kwargs

26

):

27

"""

28

Initialize FluentSender.

29

30

Parameters:

31

- tag (str): Tag prefix for events

32

- host (str): Fluentd host, supports "unix://" URLs for Unix sockets

33

- port (int): Fluentd port for TCP connections

34

- bufmax (int): Maximum buffer size in bytes (default 1MB)

35

- timeout (float): Connection timeout in seconds

36

- verbose (bool): Enable verbose logging of packets

37

- buffer_overflow_handler (callable): Handler for buffer overflow events

38

- nanosecond_precision (bool): Use nanosecond-precision timestamps

39

- msgpack_kwargs (dict): Additional msgpack serialization options

40

- forward_packet_error (bool): Forward packet errors as events

41

"""

42

43

def emit(self, label: str, data: dict) -> bool:

44

"""

45

Emit event with current timestamp.

46

47

Parameters:

48

- label (str): Event label (combined with tag as 'tag.label')

49

- data (dict): Event data dictionary

50

51

Returns:

52

bool: True if successful, False if error occurred

53

"""

54

55

def emit_with_time(self, label: str, timestamp, data: dict) -> bool:

56

"""

57

Emit event with specific timestamp.

58

59

Parameters:

60

- label (str): Event label

61

- timestamp: Unix timestamp (int/float) or EventTime instance

62

- data (dict): Event data dictionary

63

64

Returns:

65

bool: True if successful, False if error occurred

66

"""

67

68

def close(self) -> None:

69

"""

70

Close sender and flush any pending events.

71

Calls buffer_overflow_handler for any remaining pending events.

72

"""

73

74

def clear_last_error(self, _thread_id=None) -> None:

75

"""

76

Clear the last error from thread-local storage.

77

78

Parameters:

79

- _thread_id: Internal parameter for thread identification

80

"""

81

82

@property

83

def last_error(self):

84

"""

85

Get the last error that occurred (thread-local).

86

87

Returns:

88

Exception or None: Last error for current thread

89

"""

90

91

@last_error.setter

92

def last_error(self, err):

93

"""Set the last error for current thread."""

94

95

def __enter__(self):

96

"""Enter context manager."""

97

98

def __exit__(self, typ, value, traceback):

99

"""Exit context manager, closes sender."""

100

```

101

102

### Testing Functions

103

104

Internal functions available for testing purposes:

105

106

```python { .api }

107

def _set_global_sender(sender):

108

"""

109

[For testing] Set global sender directly.

110

111

Parameters:

112

- sender (FluentSender): Sender instance to use as global sender

113

"""

114

```

115

116

### Global Sender Functions

117

118

Module-level functions for managing a global FluentSender instance, providing a singleton pattern for application-wide logging.

119

120

```python { .api }

121

def setup(tag: str, **kwargs) -> None:

122

"""

123

Initialize global FluentSender instance.

124

125

Parameters:

126

- tag (str): Tag prefix for events

127

- **kwargs: Additional FluentSender constructor arguments

128

"""

129

130

def get_global_sender():

131

"""

132

Get the global FluentSender instance.

133

134

Returns:

135

FluentSender or None: Global sender instance

136

"""

137

138

def close() -> None:

139

"""Close the global FluentSender instance."""

140

```

141

142

### EventTime Class

143

144

Specialized timestamp class for nanosecond-precision logging, implemented as msgpack ExtType for efficient serialization.

145

146

```python { .api }

147

class EventTime:

148

def __new__(cls, timestamp: float, nanoseconds: int = None):

149

"""

150

Create EventTime instance.

151

152

Parameters:

153

- timestamp (float): Unix timestamp in seconds

154

- nanoseconds (int, optional): Nanosecond component, calculated from timestamp if not provided

155

156

Returns:

157

EventTime: New EventTime instance (msgpack ExtType)

158

"""

159

160

@classmethod

161

def from_unix_nano(cls, unix_nano: int):

162

"""

163

Create EventTime from nanosecond timestamp.

164

165

Parameters:

166

- unix_nano (int): Unix timestamp in nanoseconds

167

168

Returns:

169

EventTime: New EventTime instance

170

"""

171

```

172

173

## Usage Examples

174

175

### Basic Event Emission

176

177

```python

178

from fluent import sender

179

180

# Create sender for local Fluentd

181

logger = sender.FluentSender('app')

182

183

# Send events

184

logger.emit('user.login', {'user_id': 123, 'ip': '192.168.1.1'})

185

logger.emit('user.action', {'user_id': 123, 'action': 'click', 'target': 'button'})

186

187

logger.close()

188

```

189

190

### Remote Fluentd Connection

191

192

```python

193

from fluent import sender

194

195

# Connect to remote Fluentd server

196

logger = sender.FluentSender('app', host='fluentd.example.com', port=24224)

197

198

# Send event

199

result = logger.emit('purchase', {

200

'user_id': 456,

201

'product_id': 'prod-123',

202

'amount': 29.99,

203

'currency': 'USD'

204

})

205

206

if not result:

207

print(f"Failed to send event: {logger.last_error}")

208

logger.clear_last_error()

209

210

logger.close()

211

```

212

213

### Unix Socket Connection

214

215

```python

216

from fluent import sender

217

218

# Connect via Unix socket

219

logger = sender.FluentSender('app', host='unix:///var/run/fluentd.sock')

220

221

logger.emit('system.alert', {'level': 'warning', 'message': 'Disk usage high'})

222

logger.close()

223

```

224

225

### Nanosecond Precision Timestamps

226

227

```python

228

import time

229

from fluent import sender

230

231

# Enable nanosecond precision

232

logger = sender.FluentSender('app', nanosecond_precision=True)

233

234

# Automatic nanosecond timestamp

235

logger.emit('timing.event', {'operation': 'database_query', 'duration_ms': 150})

236

237

# Manual timestamp with nanosecond precision

238

timestamp = time.time()

239

logger.emit_with_time('timing.precise', timestamp, {'value': 42})

240

241

# Using EventTime directly

242

event_time = sender.EventTime.from_unix_nano(time.time_ns())

243

logger.emit_with_time('timing.nano', event_time, {'precision': 'nanosecond'})

244

245

logger.close()

246

```

247

248

### Buffer Overflow Handling

249

250

```python

251

import msgpack

252

from io import BytesIO

253

from fluent import sender

254

255

def handle_overflow(pendings):

256

"""Custom handler for buffer overflow"""

257

print(f"Buffer overflow! {len(pendings)} bytes pending")

258

259

# Parse pending events

260

unpacker = msgpack.Unpacker(BytesIO(pendings))

261

for event in unpacker:

262

print(f"Lost event: {event}")

263

264

# Create sender with overflow handler

265

logger = sender.FluentSender(

266

'app',

267

host='unreliable-host.example.com',

268

bufmax=1024, # Small buffer for demonstration

269

buffer_overflow_handler=handle_overflow

270

)

271

272

# Send events (some may trigger overflow if connection fails)

273

for i in range(100):

274

logger.emit('test', {'index': i, 'data': 'x' * 100})

275

276

logger.close()

277

```

278

279

### Context Manager Usage

280

281

```python

282

from fluent import sender

283

284

# Automatic cleanup with context manager

285

with sender.FluentSender('app') as logger:

286

logger.emit('session.start', {'user_id': 789})

287

logger.emit('session.action', {'action': 'view_page', 'page': '/home'})

288

logger.emit('session.end', {'duration': 120})

289

290

# Sender is automatically closed

291

```

292

293

### Global Sender Pattern

294

295

```python

296

from fluent import sender

297

298

# Setup global sender at application start

299

sender.setup('myapp', host='logs.company.com', port=24224)

300

301

# Use global sender anywhere in application

302

def process_user_action(user_id, action):

303

global_sender = sender.get_global_sender()

304

global_sender.emit('user.action', {

305

'user_id': user_id,

306

'action': action,

307

'timestamp': time.time()

308

})

309

310

# Application shutdown

311

def shutdown():

312

sender.close()

313

```