0
# Queue and Async Processing
1
2
Asynchronous audit processing capabilities with configurable queues, batching, and file spooling that provide reliability and performance optimization for high-volume audit scenarios.
3
4
## Capabilities
5
6
### Asynchronous Audit Provider
7
8
Asynchronous audit provider that processes audit events in background threads with configurable queue sizes and batch intervals.
9
10
```java { .api }
11
/**
12
* Asynchronous audit provider with background processing
13
*/
14
public class AsyncAuditProvider extends BaseAuditHandler {
15
/**
16
* Create asynchronous audit provider with queue configuration
17
* @param name String provider name identifier
18
* @param maxQueueSize int maximum queue size before blocking
19
* @param maxFlushInterval int maximum flush interval in milliseconds
20
*/
21
public AsyncAuditProvider(String name, int maxQueueSize, int maxFlushInterval);
22
23
/**
24
* Create asynchronous audit provider with queue configuration and audit handler
25
* @param name String provider name identifier
26
* @param maxQueueSize int maximum queue size before blocking
27
* @param maxFlushInterval int maximum flush interval in milliseconds
28
* @param provider AuditHandler audit handler to add to this provider
29
*/
30
public AsyncAuditProvider(String name, int maxQueueSize, int maxFlushInterval, AuditHandler provider);
31
32
/**
33
* Initialize async provider with configuration properties
34
* @param props Properties configuration properties
35
*/
36
public void init(Properties props);
37
38
/**
39
* Log audit event asynchronously (non-blocking)
40
* @param event AuditEventBase event to log
41
*/
42
public void log(AuditEventBase event);
43
44
/**
45
* Log collection of audit events asynchronously
46
* @param events Collection<AuditEventBase> events to log
47
*/
48
public void log(Collection<AuditEventBase> events);
49
50
/**
51
* Start background processing threads
52
*/
53
public void start();
54
55
/**
56
* Stop async provider and background threads
57
*/
58
public void stop();
59
60
/**
61
* Wait for all pending events to be processed
62
*/
63
public void waitToComplete();
64
65
/**
66
* Get current queue size
67
* @return int number of events in queue
68
*/
69
public int getQueueSize();
70
71
/**
72
* Check if async provider is running
73
* @return boolean true if running
74
*/
75
public boolean isRunning();
76
}
77
```
78
79
### Audit Queue Base Class
80
81
Abstract base class for audit queues providing batching, file spooling, and drain management capabilities.
82
83
```java { .api }
84
/**
85
* Base class for audit queues with batching and file spooling
86
*/
87
public abstract class AuditQueue extends AuditDestination {
88
// Queue configuration methods
89
public void setMaxBatchSize(int maxBatchSize);
90
public int getMaxBatchSize();
91
public void setMaxBatchInterval(long maxBatchInterval);
92
public long getMaxBatchInterval();
93
public void setMaxQueueSize(int maxQueueSize);
94
public int getMaxQueueSize();
95
96
// File spooling configuration
97
public void setSpoolEnabled(boolean spoolEnabled);
98
public boolean isSpoolEnabled();
99
public void setSpoolDirectory(String spoolDirectory);
100
public String getSpoolDirectory();
101
public void setSpoolFileName(String spoolFileName);
102
public String getSpoolFileName();
103
104
// Drain management
105
public void startDrainThread();
106
public void stopDrainThread();
107
public boolean isDrainInProgress();
108
109
// Statistics
110
public long getProcessedCount();
111
public long getErrorCount();
112
public long getDroppedCount();
113
}
114
```
115
116
### Multi-Destination Audit Provider
117
118
Audit provider that routes audit events to multiple destinations simultaneously, enabling parallel audit logging to different systems.
119
120
```java { .api }
121
/**
122
* Routes audit events to multiple destinations
123
*/
124
public class MultiDestAuditProvider extends BaseAuditHandler {
125
/**
126
* Add single audit provider to the multi-destination list
127
* @param provider AuditHandler provider to add
128
*/
129
public void addAuditProvider(AuditHandler provider);
130
131
/**
132
* Add multiple audit providers to the multi-destination list
133
* @param providers List<AuditHandler> providers to add
134
*/
135
public void addAuditProviders(List<AuditHandler> providers);
136
137
/**
138
* Remove audit provider from the multi-destination list
139
* @param provider AuditHandler provider to remove
140
*/
141
public void removeAuditProvider(AuditHandler provider);
142
143
/**
144
* Get list of configured audit providers
145
* @return List<AuditHandler> current providers
146
*/
147
public List<AuditHandler> getAuditProviders();
148
149
/**
150
* Log event to all configured providers
151
* @param event AuditEventBase event to log
152
*/
153
public void log(AuditEventBase event);
154
155
/**
156
* Log events to all configured providers
157
* @param events Collection<AuditEventBase> events to log
158
*/
159
public void log(Collection<AuditEventBase> events);
160
161
/**
162
* Initialize all configured providers
163
* @param props Properties configuration properties
164
*/
165
public void init(Properties props);
166
167
/**
168
* Start all configured providers
169
*/
170
public void start();
171
172
/**
173
* Stop all configured providers
174
*/
175
public void stop();
176
177
/**
178
* Flush all configured providers
179
*/
180
public void flush();
181
}
182
```
183
184
### Audit Index Record
185
186
Represents audit file index records for spool file management and tracking.
187
188
```java { .api }
189
/**
190
* Represents audit file index records for spool file management
191
*/
192
public class AuditIndexRecord {
193
/**
194
* Get unique record identifier
195
* @return String record ID
196
*/
197
public String getId();
198
199
/**
200
* Set unique record identifier
201
* @param id String record ID
202
*/
203
public void setId(String id);
204
205
/**
206
* Get file path for this record
207
* @return String file path
208
*/
209
public String getFilePath();
210
211
/**
212
* Set file path for this record
213
* @param filePath String file path
214
*/
215
public void setFilePath(String filePath);
216
217
/**
218
* Get line position in file
219
* @return long line position
220
*/
221
public long getLinePosition();
222
223
/**
224
* Set line position in file
225
* @param linePosition long line position
226
*/
227
public void setLinePosition(long linePosition);
228
229
/**
230
* Get current status of this record
231
* @return SPOOL_FILE_STATUS current status
232
*/
233
public SPOOL_FILE_STATUS getStatus();
234
235
/**
236
* Set status of this record
237
* @param status SPOOL_FILE_STATUS new status
238
*/
239
public void setStatus(SPOOL_FILE_STATUS status);
240
241
/**
242
* Get creation timestamp
243
* @return Date creation time
244
*/
245
public Date getCreatedTime();
246
247
/**
248
* Set creation timestamp
249
* @param createdTime Date creation time
250
*/
251
public void setCreatedTime(Date createdTime);
252
253
/**
254
* Get last attempt timestamp
255
* @return Date last attempt time
256
*/
257
public Date getLastAttempt();
258
259
/**
260
* Set last attempt timestamp
261
* @param lastAttempt Date last attempt time
262
*/
263
public void setLastAttempt(Date lastAttempt);
264
265
/**
266
* Get retry count
267
* @return int number of retries
268
*/
269
public int getRetryCount();
270
271
/**
272
* Set retry count
273
* @param retryCount int number of retries
274
*/
275
public void setRetryCount(int retryCount);
276
}
277
```
278
279
**Usage Examples:**
280
281
```java
282
import org.apache.ranger.audit.provider.AsyncAuditProvider;
283
import org.apache.ranger.audit.provider.MultiDestAuditProvider;
284
import org.apache.ranger.audit.destination.*;
285
286
// Configure asynchronous audit provider
287
AsyncAuditProvider asyncProvider = new AsyncAuditProvider("async-hdfs", 10000, 5000);
288
Properties asyncProps = new Properties();
289
asyncProps.setProperty("xasecure.audit.async.queue.batch.size", "100");
290
asyncProps.setProperty("xasecure.audit.async.queue.flush.interval", "30000");
291
asyncProvider.init(asyncProps);
292
asyncProvider.start();
293
294
// Configure multi-destination provider
295
MultiDestAuditProvider multiProvider = new MultiDestAuditProvider();
296
297
// Add HDFS destination
298
HDFSAuditDestination hdfsDestination = new HDFSAuditDestination();
299
Properties hdfsProps = new Properties();
300
hdfsProps.setProperty("xasecure.audit.hdfs.is.enabled", "true");
301
hdfsProps.setProperty("xasecure.audit.hdfs.destination.directory", "/ranger/audit");
302
hdfsDestination.init(hdfsProps, "xasecure.audit.hdfs");
303
304
// Add Solr destination
305
SolrAuditDestination solrDestination = new SolrAuditDestination();
306
Properties solrProps = new Properties();
307
solrProps.setProperty("xasecure.audit.solr.is.enabled", "true");
308
solrProps.setProperty("xasecure.audit.solr.urls", "http://solr:8983/solr");
309
solrDestination.init(solrProps, "xasecure.audit.solr");
310
311
// Add destinations to multi-provider
312
multiProvider.addAuditProvider(hdfsDestination);
313
multiProvider.addAuditProvider(solrDestination);
314
multiProvider.init(new Properties());
315
multiProvider.start();
316
317
// Log events - will go to both HDFS and Solr
318
AuthzAuditEvent event = new AuthzAuditEvent();
319
// ... configure event ...
320
multiProvider.log(event);
321
322
// Async logging (non-blocking)
323
asyncProvider.log(event);
324
325
// Batch processing
326
List<AuditEventBase> events = Arrays.asList(event1, event2, event3);
327
multiProvider.log(events);
328
329
// Graceful shutdown
330
multiProvider.flush(); // Ensure all events are processed
331
multiProvider.stop();
332
asyncProvider.waitToComplete(); // Wait for async processing to finish
333
asyncProvider.stop();
334
```
335
336
### Async Queue Implementation
337
338
Non-blocking asynchronous queue with unlimited capacity using LinkedBlockingQueue internally.
339
340
```java { .api }
341
/**
342
* Non-blocking asynchronous queue with background processing
343
*/
344
public class AuditAsyncQueue extends AuditQueue implements Runnable {
345
/**
346
* Create async queue with consumer handler
347
* @param consumer AuditHandler consumer to process events
348
*/
349
public AuditAsyncQueue(AuditHandler consumer);
350
351
/**
352
* Log audit event asynchronously (non-blocking)
353
* @param event AuditEventBase event to queue
354
* @return boolean true if queued successfully
355
*/
356
public boolean log(AuditEventBase event);
357
358
/**
359
* Log collection of audit events
360
* @param events Collection<AuditEventBase> events to queue
361
* @return boolean true if all events queued
362
*/
363
public boolean log(Collection<AuditEventBase> events);
364
365
/**
366
* Start the queue and consumer thread
367
*/
368
public void start();
369
370
/**
371
* Stop the queue and drain remaining events
372
*/
373
public void stop();
374
375
/**
376
* Get current queue size
377
* @return int number of events in queue
378
*/
379
public int size();
380
}
381
```
382
383
### Batch Queue Implementation
384
385
Blocking queue that batches audit events before sending to consumer with file spooling support.
386
387
```java { .api }
388
/**
389
* Blocking queue with batching and file spooling capabilities
390
*/
391
public class AuditBatchQueue extends AuditQueue implements Runnable {
392
/**
393
* Create batch queue with consumer handler
394
* @param consumer AuditHandler consumer to process batched events
395
*/
396
public AuditBatchQueue(AuditHandler consumer);
397
398
/**
399
* Initialize with configuration properties
400
* @param prop Properties configuration properties
401
* @param basePropertyName String base property name
402
*/
403
public void init(Properties prop, String basePropertyName);
404
405
/**
406
* Log audit event (blocking if queue full)
407
* @param event AuditEventBase event to queue
408
* @return boolean true if queued successfully
409
*/
410
public boolean log(AuditEventBase event);
411
412
/**
413
* Start the queue, consumer and file spooler
414
*/
415
public synchronized void start();
416
417
/**
418
* Wait for completion with timeout
419
* @param timeout long timeout in milliseconds
420
*/
421
public void waitToComplete(long timeout);
422
423
/**
424
* Flush pending events to consumer
425
*/
426
public void flush();
427
}
428
```
429
430
### File Queue Implementation
431
432
File-based queue providing persistence and failover through local filesystem spooling.
433
434
```java { .api }
435
/**
436
* File-based queue with persistence and failover capabilities
437
*/
438
public class AuditFileQueue extends BaseAuditHandler {
439
/**
440
* Create file queue with consumer handler
441
* @param consumer AuditHandler consumer to process events from files
442
*/
443
public AuditFileQueue(AuditHandler consumer);
444
445
/**
446
* Initialize with configuration properties
447
* @param prop Properties configuration properties
448
* @param basePropertyName String base property name
449
*/
450
public void init(Properties prop, String basePropertyName);
451
452
/**
453
* Log audit event to file spool
454
* @param event AuditEventBase event to spool to file
455
* @return boolean true if spooled successfully
456
*/
457
public boolean log(AuditEventBase event);
458
459
/**
460
* Start the consumer and file spooler
461
*/
462
public void start();
463
464
/**
465
* Wait for completion with timeout
466
* @param timeout long timeout in milliseconds
467
*/
468
public void waitToComplete(long timeout);
469
}
470
```
471
472
### Summary Queue Implementation
473
474
Queue that aggregates and summarizes similar audit events to reduce volume.
475
476
```java { .api }
477
/**
478
* Queue that summarizes similar audit events before sending to consumer
479
*/
480
public class AuditSummaryQueue extends AuditQueue implements Runnable {
481
/**
482
* Create summary queue with consumer handler
483
* @param consumer AuditHandler consumer to process summarized events
484
*/
485
public AuditSummaryQueue(AuditHandler consumer);
486
487
/**
488
* Initialize with summary-specific properties
489
* @param props Properties configuration properties
490
* @param propPrefix String property prefix
491
*/
492
public void init(Properties props, String propPrefix);
493
494
/**
495
* Log audit event (adds to summary aggregation)
496
* @param event AuditEventBase event to add to summary
497
* @return boolean true if processed successfully
498
*/
499
public boolean log(AuditEventBase event);
500
501
/**
502
* Start the queue and consumer thread
503
*/
504
public void start();
505
506
/**
507
* Stop the queue and send remaining summaries
508
*/
509
public void stop();
510
}
511
```
512
513
### Configuration Properties
514
515
Key configuration properties for queue and async processing:
516
517
**Async Provider Configuration:**
518
- `xasecure.audit.async.queue.batch.size`: Batch size for async processing
519
- `xasecure.audit.async.queue.flush.interval`: Flush interval in milliseconds
520
- `xasecure.audit.async.queue.max.size`: Maximum queue size
521
522
**Common Queue Configuration:**
523
- `batch.size`: Events per batch (default: 1000)
524
- `queue.size`: Maximum queue capacity (default: 1024*1024)
525
- `batch.interval.ms`: Batch processing interval (default: 3000ms)
526
527
**Summary Queue Configuration:**
528
- `summary.interval.ms`: Summary aggregation interval (default: 5000ms)
529
530
**File Spooling Configuration:**
531
- `filespool.enable`: Enable file spooling for failover
532
- `filespool.drain.threshold.percent`: Threshold for draining spool files
533
- `filespool.drain.full.wait.ms`: Wait time for full drain completion
534
- `xasecure.audit.spool.local.dir`: Local spool directory
535
- `xasecure.audit.spool.local.filename`: Spool filename pattern