or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-communication.mdcore-sender.mdevent-interface.mdindex.mdlogging-integration.md

async-communication.mddocs/

0

# Asynchronous Communication

1

2

Non-blocking asynchronous versions of both the core sender and logging handler interfaces, using background threads and queues to prevent application blocking during log transmission. This ensures high performance even when Fluentd servers are slow or unreachable.

3

4

## Capabilities

5

6

### AsyncFluentSender Class

7

8

Asynchronous version of FluentSender that uses background threads for non-blocking event transmission with queue-based buffering.

9

10

```python { .api }

11

class FluentSender(fluent.sender.FluentSender):

12

def __init__(

13

self,

14

tag: str,

15

host: str = "localhost",

16

port: int = 24224,

17

bufmax: int = 1048576,

18

timeout: float = 3.0,

19

verbose: bool = False,

20

buffer_overflow_handler = None,

21

nanosecond_precision: bool = False,

22

msgpack_kwargs = None,

23

queue_maxsize: int = 100,

24

queue_circular: bool = False,

25

queue_overflow_handler = None,

26

**kwargs

27

):

28

"""

29

Initialize AsyncFluentSender.

30

31

Parameters:

32

- tag (str): Tag prefix for events

33

- host (str): Fluentd host

34

- port (int): Fluentd port

35

- bufmax (int): Maximum buffer size in bytes

36

- timeout (float): Connection timeout

37

- verbose (bool): Verbose logging

38

- buffer_overflow_handler (callable): Buffer overflow handler

39

- nanosecond_precision (bool): Use nanosecond timestamps

40

- msgpack_kwargs (dict): msgpack options

41

- queue_maxsize (int): Maximum queue size (default 100)

42

- queue_circular (bool): Use circular queue mode (default False)

43

- queue_overflow_handler (callable): Queue overflow handler

44

- **kwargs: Additional sender options

45

"""

46

47

def close(self, flush: bool = True) -> None:

48

"""

49

Close async sender and background thread.

50

51

Parameters:

52

- flush (bool): Whether to flush pending events before closing

53

"""

54

55

@property

56

def queue_maxsize(self) -> int:

57

"""

58

Get queue maximum size.

59

60

Returns:

61

int: Maximum queue size

62

"""

63

64

@property

65

def queue_blocking(self) -> bool:

66

"""

67

Check if queue is in blocking mode.

68

69

Returns:

70

bool: True if queue blocks when full, False if circular

71

"""

72

73

@property

74

def queue_circular(self) -> bool:

75

"""

76

Check if queue is in circular mode.

77

78

Returns:

79

bool: True if queue discards oldest events when full

80

"""

81

```

82

83

### AsyncFluentHandler Class

84

85

Asynchronous logging handler that inherits from FluentHandler but uses AsyncFluentSender for non-blocking log transmission.

86

87

```python { .api }

88

class FluentHandler(fluent.handler.FluentHandler):

89

def getSenderClass(self):

90

"""

91

Get the async sender class.

92

93

Returns:

94

class: AsyncFluentSender class

95

"""

96

```

97

98

### Global Async Functions

99

100

Module-level functions for managing global async sender instances.

101

102

```python { .api }

103

def setup(tag: str, **kwargs) -> None:

104

"""

105

Initialize global AsyncFluentSender instance.

106

107

Parameters:

108

- tag (str): Tag prefix for events

109

- **kwargs: AsyncFluentSender constructor arguments

110

"""

111

112

def get_global_sender():

113

"""

114

Get the global AsyncFluentSender instance.

115

116

Returns:

117

AsyncFluentSender or None: Global async sender instance

118

"""

119

120

def close() -> None:

121

"""Close the global AsyncFluentSender instance."""

122

123

def _set_global_sender(sender):

124

"""

125

[For testing] Set global async sender directly.

126

127

Parameters:

128

- sender (AsyncFluentSender): Async sender instance to use as global sender

129

"""

130

```

131

132

### Constants

133

134

```python { .api }

135

DEFAULT_QUEUE_MAXSIZE = 100

136

DEFAULT_QUEUE_CIRCULAR = False

137

```

138

139

### Exported Classes

140

141

The async module exports classes via `__all__`:

142

143

```python { .api }

144

__all__ = ["EventTime", "FluentSender"]

145

```

146

147

## Usage Examples

148

149

### Basic Async Event Logging

150

151

```python

152

from fluent import asyncsender as sender

153

154

# Create async sender - automatically starts background thread

155

logger = sender.FluentSender('app')

156

157

# These calls return immediately without blocking

158

logger.emit('user.login', {'user_id': 123, 'method': 'password'})

159

logger.emit('user.action', {'user_id': 123, 'action': 'view_dashboard'})

160

logger.emit('user.logout', {'user_id': 123, 'session_duration': 1800})

161

162

# IMPORTANT: Always close to ensure thread cleanup

163

logger.close()

164

```

165

166

### High-Performance Async Logging

167

168

```python

169

from fluent import asyncsender as sender

170

import time

171

172

# Configure for high throughput

173

logger = sender.FluentSender(

174

'metrics',

175

host='high-performance-fluentd.example.com',

176

queue_maxsize=1000, # Large queue for bursts

177

timeout=1.0 # Fast timeout

178

)

179

180

# Send burst of events without blocking

181

start_time = time.time()

182

183

for i in range(10000):

184

logger.emit('metric.point', {

185

'timestamp': time.time(),

186

'metric_name': 'cpu_usage',

187

'value': 50 + (i % 50),

188

'host': f'server-{i % 10}'

189

})

190

191

elapsed = time.time() - start_time

192

print(f"Sent 10000 events in {elapsed:.2f} seconds")

193

194

# Cleanup - waits for background thread to finish

195

logger.close()

196

```

197

198

### Circular Queue Mode

199

200

```python

201

from fluent import asyncsender as sender

202

203

def queue_overflow_handler(discarded_bytes):

204

"""Handle discarded events in circular mode"""

205

print(f"Discarded {len(discarded_bytes)} bytes due to queue overflow")

206

207

# Enable circular queue to never block the application

208

logger = sender.FluentSender(

209

'app',

210

host='slow-fluentd.example.com',

211

queue_maxsize=50,

212

queue_circular=True, # Never block, discard oldest

213

queue_overflow_handler=queue_overflow_handler

214

)

215

216

# Application never blocks, even if Fluentd is slow

217

for i in range(1000):

218

logger.emit('event', {'index': i, 'data': 'important_data'})

219

# This always returns immediately

220

221

logger.close()

222

```

223

224

### Async Global Sender Pattern

225

226

```python

227

from fluent import asyncsender as sender

228

229

# Setup global async sender at application start

230

sender.setup('webapp', host='logs.company.com', queue_maxsize=500)

231

232

def handle_web_request(request):

233

"""Handle web request with non-blocking logging"""

234

start_time = time.time()

235

236

# Process request

237

response = process_request(request)

238

239

# Log without blocking response

240

global_sender = sender.get_global_sender()

241

global_sender.emit('request.completed', {

242

'path': request.path,

243

'method': request.method,

244

'status_code': response.status_code,

245

'duration_ms': int((time.time() - start_time) * 1000),

246

'user_id': request.user_id

247

})

248

249

return response

250

251

# Application shutdown

252

def shutdown():

253

sender.close() # Wait for background threads to finish

254

```

255

256

### Async Logging Handler

257

258

```python

259

import logging

260

from fluent import asynchandler as handler

261

262

# Setup async logging handler

263

logger = logging.getLogger('async_app')

264

logger.setLevel(logging.INFO)

265

266

# Non-blocking log handler

267

async_handler = handler.FluentHandler(

268

'app.logs',

269

host='logs.example.com',

270

queue_maxsize=200,

271

queue_circular=False # Block if queue fills up

272

)

273

274

logger.addHandler(async_handler)

275

276

# Logging calls return immediately

277

logger.info('Application started')

278

logger.info('Processing batch job')

279

logger.info('Batch job completed')

280

281

# IMPORTANT: Close handler before exit

282

async_handler.close()

283

```

284

285

### Queue Management and Monitoring

286

287

```python

288

from fluent import asyncsender as sender

289

import threading

290

import time

291

292

# Create async sender with monitoring

293

logger = sender.FluentSender(

294

'monitored_app',

295

queue_maxsize=100,

296

verbose=True # Enable packet logging

297

)

298

299

def monitor_queue():

300

"""Monitor queue status"""

301

while not logger._closed:

302

queue_size = logger._queue.qsize()

303

print(f"Queue size: {queue_size}/{logger.queue_maxsize}")

304

time.sleep(1)

305

306

# Start monitoring thread

307

monitor_thread = threading.Thread(target=monitor_queue)

308

monitor_thread.daemon = True

309

monitor_thread.start()

310

311

# Send events

312

for i in range(50):

313

logger.emit('test', {'index': i})

314

time.sleep(0.1) # Slow sending to see queue behavior

315

316

logger.close()

317

```

318

319

### Error Handling with Async Sender

320

321

```python

322

from fluent import asyncsender as sender

323

import time

324

325

def connection_error_handler(pendings):

326

"""Handle connection failures"""

327

print(f"Connection failed, {len(pendings)} bytes pending")

328

329

# Save to local file as backup

330

with open('/tmp/failed_events.backup', 'ab') as f:

331

f.write(pendings)

332

333

# Setup with error handling

334

logger = sender.FluentSender(

335

'app',

336

host='unreliable-server.example.com',

337

buffer_overflow_handler=connection_error_handler,

338

timeout=2.0

339

)

340

341

# Send events - connection failures handled in background

342

for i in range(100):

343

logger.emit('event', {'index': i, 'timestamp': time.time()})

344

345

# Check for errors (errors occur in background thread)

346

time.sleep(5) # Wait for background processing

347

348

# Note: last_error is from background thread context

349

if logger.last_error:

350

print(f"Background error: {logger.last_error}")

351

352

logger.close()

353

```

354

355

### Graceful Shutdown

356

357

```python

358

from fluent import asyncsender as sender

359

import signal

360

import sys

361

362

# Global sender for cleanup

363

global_sender = None

364

365

def signal_handler(signum, frame):

366

"""Handle shutdown signals gracefully"""

367

print("Shutting down gracefully...")

368

369

if global_sender:

370

print("Closing async sender...")

371

global_sender.close(flush=True) # Wait for pending events

372

print("Async sender closed")

373

374

sys.exit(0)

375

376

# Setup signal handlers

377

signal.signal(signal.SIGINT, signal_handler)

378

signal.signal(signal.SIGTERM, signal_handler)

379

380

# Create global sender

381

global_sender = sender.FluentSender('app', queue_maxsize=1000)

382

383

# Application main loop

384

try:

385

while True:

386

# Simulate application work

387

global_sender.emit('heartbeat', {

388

'timestamp': time.time(),

389

'status': 'running'

390

})

391

time.sleep(1)

392

393

except KeyboardInterrupt:

394

signal_handler(signal.SIGINT, None)

395

```

396

397

### Performance Comparison

398

399

```python

400

from fluent import sender, asyncsender

401

import time

402

403

def benchmark_sync_sender():

404

"""Benchmark synchronous sender"""

405

logger = sender.FluentSender('sync_test')

406

407

start_time = time.time()

408

409

for i in range(1000):

410

logger.emit('test', {'index': i})

411

412

logger.close()

413

return time.time() - start_time

414

415

def benchmark_async_sender():

416

"""Benchmark asynchronous sender"""

417

logger = asyncsender.FluentSender('async_test', queue_maxsize=1500)

418

419

start_time = time.time()

420

421

for i in range(1000):

422

logger.emit('test', {'index': i})

423

424

# Time to queue all events (not send)

425

queue_time = time.time() - start_time

426

427

# Close and wait for actual sending

428

logger.close()

429

total_time = time.time() - start_time

430

431

return queue_time, total_time

432

433

# Run benchmarks

434

sync_time = benchmark_sync_sender()

435

async_queue_time, async_total_time = benchmark_async_sender()

436

437

print(f"Sync sender: {sync_time:.2f}s")

438

print(f"Async sender (queue): {async_queue_time:.2f}s")

439

print(f"Async sender (total): {async_total_time:.2f}s")

440

print(f"Speedup: {sync_time / async_queue_time:.1f}x")

441

```