or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

audit-destinations.mdcore-framework.mdindex.mdqueue-async.mdwriters-formats.md

queue-async.mddocs/

0

# Queue and Async Processing

1

2

Asynchronous audit processing capabilities with configurable queues, batching, and file spooling that provide reliability and performance optimization for high-volume audit scenarios.

3

4

## Capabilities

5

6

### Asynchronous Audit Provider

7

8

Asynchronous audit provider that processes audit events in background threads with configurable queue sizes and batch intervals.

9

10

```java { .api }

11

/**

12

* Asynchronous audit provider with background processing

13

*/

14

public class AsyncAuditProvider extends BaseAuditHandler {

15

/**

16

* Create asynchronous audit provider with queue configuration

17

* @param name String provider name identifier

18

* @param maxQueueSize int maximum queue size before blocking

19

* @param maxFlushInterval int maximum flush interval in milliseconds

20

*/

21

public AsyncAuditProvider(String name, int maxQueueSize, int maxFlushInterval);

22

23

/**

24

* Create asynchronous audit provider with queue configuration and audit handler

25

* @param name String provider name identifier

26

* @param maxQueueSize int maximum queue size before blocking

27

* @param maxFlushInterval int maximum flush interval in milliseconds

28

* @param provider AuditHandler audit handler to add to this provider

29

*/

30

public AsyncAuditProvider(String name, int maxQueueSize, int maxFlushInterval, AuditHandler provider);

31

32

/**

33

* Initialize async provider with configuration properties

34

* @param props Properties configuration properties

35

*/

36

public void init(Properties props);

37

38

/**

39

* Log audit event asynchronously (non-blocking)

40

* @param event AuditEventBase event to log

41

*/

42

public void log(AuditEventBase event);

43

44

/**

45

* Log collection of audit events asynchronously

46

* @param events Collection<AuditEventBase> events to log

47

*/

48

public void log(Collection<AuditEventBase> events);

49

50

/**

51

* Start background processing threads

52

*/

53

public void start();

54

55

/**

56

* Stop async provider and background threads

57

*/

58

public void stop();

59

60

/**

61

* Wait for all pending events to be processed

62

*/

63

public void waitToComplete();

64

65

/**

66

* Get current queue size

67

* @return int number of events in queue

68

*/

69

public int getQueueSize();

70

71

/**

72

* Check if async provider is running

73

* @return boolean true if running

74

*/

75

public boolean isRunning();

76

}

77

```

78

79

### Audit Queue Base Class

80

81

Abstract base class for audit queues providing batching, file spooling, and drain management capabilities.

82

83

```java { .api }

84

/**

85

* Base class for audit queues with batching and file spooling

86

*/

87

public abstract class AuditQueue extends AuditDestination {

88

// Queue configuration methods

89

public void setMaxBatchSize(int maxBatchSize);

90

public int getMaxBatchSize();

91

public void setMaxBatchInterval(long maxBatchInterval);

92

public long getMaxBatchInterval();

93

public void setMaxQueueSize(int maxQueueSize);

94

public int getMaxQueueSize();

95

96

// File spooling configuration

97

public void setSpoolEnabled(boolean spoolEnabled);

98

public boolean isSpoolEnabled();

99

public void setSpoolDirectory(String spoolDirectory);

100

public String getSpoolDirectory();

101

public void setSpoolFileName(String spoolFileName);

102

public String getSpoolFileName();

103

104

// Drain management

105

public void startDrainThread();

106

public void stopDrainThread();

107

public boolean isDrainInProgress();

108

109

// Statistics

110

public long getProcessedCount();

111

public long getErrorCount();

112

public long getDroppedCount();

113

}

114

```

115

116

### Multi-Destination Audit Provider

117

118

Audit provider that routes audit events to multiple destinations simultaneously, enabling parallel audit logging to different systems.

119

120

```java { .api }

121

/**

122

* Routes audit events to multiple destinations

123

*/

124

public class MultiDestAuditProvider extends BaseAuditHandler {

125

/**

126

* Add single audit provider to the multi-destination list

127

* @param provider AuditHandler provider to add

128

*/

129

public void addAuditProvider(AuditHandler provider);

130

131

/**

132

* Add multiple audit providers to the multi-destination list

133

* @param providers List<AuditHandler> providers to add

134

*/

135

public void addAuditProviders(List<AuditHandler> providers);

136

137

/**

138

* Remove audit provider from the multi-destination list

139

* @param provider AuditHandler provider to remove

140

*/

141

public void removeAuditProvider(AuditHandler provider);

142

143

/**

144

* Get list of configured audit providers

145

* @return List<AuditHandler> current providers

146

*/

147

public List<AuditHandler> getAuditProviders();

148

149

/**

150

* Log event to all configured providers

151

* @param event AuditEventBase event to log

152

*/

153

public void log(AuditEventBase event);

154

155

/**

156

* Log events to all configured providers

157

* @param events Collection<AuditEventBase> events to log

158

*/

159

public void log(Collection<AuditEventBase> events);

160

161

/**

162

* Initialize all configured providers

163

* @param props Properties configuration properties

164

*/

165

public void init(Properties props);

166

167

/**

168

* Start all configured providers

169

*/

170

public void start();

171

172

/**

173

* Stop all configured providers

174

*/

175

public void stop();

176

177

/**

178

* Flush all configured providers

179

*/

180

public void flush();

181

}

182

```

183

184

### Audit Index Record

185

186

Represents audit file index records for spool file management and tracking.

187

188

```java { .api }

189

/**

190

* Represents audit file index records for spool file management

191

*/

192

public class AuditIndexRecord {

193

/**

194

* Get unique record identifier

195

* @return String record ID

196

*/

197

public String getId();

198

199

/**

200

* Set unique record identifier

201

* @param id String record ID

202

*/

203

public void setId(String id);

204

205

/**

206

* Get file path for this record

207

* @return String file path

208

*/

209

public String getFilePath();

210

211

/**

212

* Set file path for this record

213

* @param filePath String file path

214

*/

215

public void setFilePath(String filePath);

216

217

/**

218

* Get line position in file

219

* @return long line position

220

*/

221

public long getLinePosition();

222

223

/**

224

* Set line position in file

225

* @param linePosition long line position

226

*/

227

public void setLinePosition(long linePosition);

228

229

/**

230

* Get current status of this record

231

* @return SPOOL_FILE_STATUS current status

232

*/

233

public SPOOL_FILE_STATUS getStatus();

234

235

/**

236

* Set status of this record

237

* @param status SPOOL_FILE_STATUS new status

238

*/

239

public void setStatus(SPOOL_FILE_STATUS status);

240

241

/**

242

* Get creation timestamp

243

* @return Date creation time

244

*/

245

public Date getCreatedTime();

246

247

/**

248

* Set creation timestamp

249

* @param createdTime Date creation time

250

*/

251

public void setCreatedTime(Date createdTime);

252

253

/**

254

* Get last attempt timestamp

255

* @return Date last attempt time

256

*/

257

public Date getLastAttempt();

258

259

/**

260

* Set last attempt timestamp

261

* @param lastAttempt Date last attempt time

262

*/

263

public void setLastAttempt(Date lastAttempt);

264

265

/**

266

* Get retry count

267

* @return int number of retries

268

*/

269

public int getRetryCount();

270

271

/**

272

* Set retry count

273

* @param retryCount int number of retries

274

*/

275

public void setRetryCount(int retryCount);

276

}

277

```

278

279

**Usage Examples:**

280

281

```java

282

import org.apache.ranger.audit.provider.AsyncAuditProvider;

283

import org.apache.ranger.audit.provider.MultiDestAuditProvider;

284

import org.apache.ranger.audit.destination.*;

285

286

// Configure asynchronous audit provider

287

AsyncAuditProvider asyncProvider = new AsyncAuditProvider("async-hdfs", 10000, 5000);

288

Properties asyncProps = new Properties();

289

asyncProps.setProperty("xasecure.audit.async.queue.batch.size", "100");

290

asyncProps.setProperty("xasecure.audit.async.queue.flush.interval", "30000");

291

asyncProvider.init(asyncProps);

292

asyncProvider.start();

293

294

// Configure multi-destination provider

295

MultiDestAuditProvider multiProvider = new MultiDestAuditProvider();

296

297

// Add HDFS destination

298

HDFSAuditDestination hdfsDestination = new HDFSAuditDestination();

299

Properties hdfsProps = new Properties();

300

hdfsProps.setProperty("xasecure.audit.hdfs.is.enabled", "true");

301

hdfsProps.setProperty("xasecure.audit.hdfs.destination.directory", "/ranger/audit");

302

hdfsDestination.init(hdfsProps, "xasecure.audit.hdfs");

303

304

// Add Solr destination

305

SolrAuditDestination solrDestination = new SolrAuditDestination();

306

Properties solrProps = new Properties();

307

solrProps.setProperty("xasecure.audit.solr.is.enabled", "true");

308

solrProps.setProperty("xasecure.audit.solr.urls", "http://solr:8983/solr");

309

solrDestination.init(solrProps, "xasecure.audit.solr");

310

311

// Add destinations to multi-provider

312

multiProvider.addAuditProvider(hdfsDestination);

313

multiProvider.addAuditProvider(solrDestination);

314

multiProvider.init(new Properties());

315

multiProvider.start();

316

317

// Log events - will go to both HDFS and Solr

318

AuthzAuditEvent event = new AuthzAuditEvent();

319

// ... configure event ...

320

multiProvider.log(event);

321

322

// Async logging (non-blocking)

323

asyncProvider.log(event);

324

325

// Batch processing

326

List<AuditEventBase> events = Arrays.asList(event1, event2, event3);

327

multiProvider.log(events);

328

329

// Graceful shutdown

330

multiProvider.flush(); // Ensure all events are processed

331

multiProvider.stop();

332

asyncProvider.waitToComplete(); // Wait for async processing to finish

333

asyncProvider.stop();

334

```

335

336

### Async Queue Implementation

337

338

Non-blocking asynchronous queue with unlimited capacity using LinkedBlockingQueue internally.

339

340

```java { .api }

341

/**

342

* Non-blocking asynchronous queue with background processing

343

*/

344

public class AuditAsyncQueue extends AuditQueue implements Runnable {

345

/**

346

* Create async queue with consumer handler

347

* @param consumer AuditHandler consumer to process events

348

*/

349

public AuditAsyncQueue(AuditHandler consumer);

350

351

/**

352

* Log audit event asynchronously (non-blocking)

353

* @param event AuditEventBase event to queue

354

* @return boolean true if queued successfully

355

*/

356

public boolean log(AuditEventBase event);

357

358

/**

359

* Log collection of audit events

360

* @param events Collection<AuditEventBase> events to queue

361

* @return boolean true if all events queued

362

*/

363

public boolean log(Collection<AuditEventBase> events);

364

365

/**

366

* Start the queue and consumer thread

367

*/

368

public void start();

369

370

/**

371

* Stop the queue and drain remaining events

372

*/

373

public void stop();

374

375

/**

376

* Get current queue size

377

* @return int number of events in queue

378

*/

379

public int size();

380

}

381

```

382

383

### Batch Queue Implementation

384

385

Blocking queue that batches audit events before sending to consumer with file spooling support.

386

387

```java { .api }

388

/**

389

* Blocking queue with batching and file spooling capabilities

390

*/

391

public class AuditBatchQueue extends AuditQueue implements Runnable {

392

/**

393

* Create batch queue with consumer handler

394

* @param consumer AuditHandler consumer to process batched events

395

*/

396

public AuditBatchQueue(AuditHandler consumer);

397

398

/**

399

* Initialize with configuration properties

400

* @param prop Properties configuration properties

401

* @param basePropertyName String base property name

402

*/

403

public void init(Properties prop, String basePropertyName);

404

405

/**

406

* Log audit event (blocking if queue full)

407

* @param event AuditEventBase event to queue

408

* @return boolean true if queued successfully

409

*/

410

public boolean log(AuditEventBase event);

411

412

/**

413

* Start the queue, consumer and file spooler

414

*/

415

public synchronized void start();

416

417

/**

418

* Wait for completion with timeout

419

* @param timeout long timeout in milliseconds

420

*/

421

public void waitToComplete(long timeout);

422

423

/**

424

* Flush pending events to consumer

425

*/

426

public void flush();

427

}

428

```

429

430

### File Queue Implementation

431

432

File-based queue providing persistence and failover through local filesystem spooling.

433

434

```java { .api }

435

/**

436

* File-based queue with persistence and failover capabilities

437

*/

438

public class AuditFileQueue extends BaseAuditHandler {

439

/**

440

* Create file queue with consumer handler

441

* @param consumer AuditHandler consumer to process events from files

442

*/

443

public AuditFileQueue(AuditHandler consumer);

444

445

/**

446

* Initialize with configuration properties

447

* @param prop Properties configuration properties

448

* @param basePropertyName String base property name

449

*/

450

public void init(Properties prop, String basePropertyName);

451

452

/**

453

* Log audit event to file spool

454

* @param event AuditEventBase event to spool to file

455

* @return boolean true if spooled successfully

456

*/

457

public boolean log(AuditEventBase event);

458

459

/**

460

* Start the consumer and file spooler

461

*/

462

public void start();

463

464

/**

465

* Wait for completion with timeout

466

* @param timeout long timeout in milliseconds

467

*/

468

public void waitToComplete(long timeout);

469

}

470

```

471

472

### Summary Queue Implementation

473

474

Queue that aggregates and summarizes similar audit events to reduce volume.

475

476

```java { .api }

477

/**

478

* Queue that summarizes similar audit events before sending to consumer

479

*/

480

public class AuditSummaryQueue extends AuditQueue implements Runnable {

481

/**

482

* Create summary queue with consumer handler

483

* @param consumer AuditHandler consumer to process summarized events

484

*/

485

public AuditSummaryQueue(AuditHandler consumer);

486

487

/**

488

* Initialize with summary-specific properties

489

* @param props Properties configuration properties

490

* @param propPrefix String property prefix

491

*/

492

public void init(Properties props, String propPrefix);

493

494

/**

495

* Log audit event (adds to summary aggregation)

496

* @param event AuditEventBase event to add to summary

497

* @return boolean true if processed successfully

498

*/

499

public boolean log(AuditEventBase event);

500

501

/**

502

* Start the queue and consumer thread

503

*/

504

public void start();

505

506

/**

507

* Stop the queue and send remaining summaries

508

*/

509

public void stop();

510

}

511

```

512

513

### Configuration Properties

514

515

Key configuration properties for queue and async processing:

516

517

**Async Provider Configuration:**

518

- `xasecure.audit.async.queue.batch.size`: Batch size for async processing

519

- `xasecure.audit.async.queue.flush.interval`: Flush interval in milliseconds

520

- `xasecure.audit.async.queue.max.size`: Maximum queue size

521

522

**Common Queue Configuration:**

523

- `batch.size`: Events per batch (default: 1000)

524

- `queue.size`: Maximum queue capacity (default: 1024*1024)

525

- `batch.interval.ms`: Batch processing interval (default: 3000ms)

526

527

**Summary Queue Configuration:**

528

- `summary.interval.ms`: Summary aggregation interval (default: 5000ms)

529

530

**File Spooling Configuration:**

531

- `filespool.enable`: Enable file spooling for failover

532

- `filespool.drain.threshold.percent`: Threshold for draining spool files

533

- `filespool.drain.full.wait.ms`: Wait time for full drain completion

534

- `xasecure.audit.spool.local.dir`: Local spool directory

535

- `xasecure.audit.spool.local.filename`: Spool filename pattern