0
# Log Buffer System
1
2
High-throughput log buffering infrastructure for temporary log storage, pipeline processing, automatic recovery, and cleanup operations. The Log Buffer system provides resilient log processing capabilities for CDAP platform components.
3
4
## Capabilities
5
6
### LogBufferService
7
8
Main service managing log buffer pipelines, recovery, cleanup, and HTTP endpoints for high-throughput log processing scenarios.
9
10
```java { .api }
11
/**
12
* Manages log buffer pipelines, recovery, cleanup, and HTTP endpoint
13
* Responsible for loading, starting and stopping log buffer pipelines, creating concurrent writers,
14
* starting cleaner service, and recovering logs from buffer
15
*/
16
public class LogBufferService extends AbstractIdleService {
17
/**
18
* Start log buffer service including pipeline loading and recovery
19
* @throws Exception if service startup fails
20
*/
21
protected void startUp() throws Exception;
22
23
/**
24
* Stop log buffer service and cleanup resources
25
* @throws Exception if service shutdown fails
26
*/
27
protected void shutDown() throws Exception;
28
}
29
```
30
31
### LogBufferWriter
32
33
File-based log writer that appends logs to rotating buffer files with automatic file rotation when maximum size is reached.
34
35
```java { .api }
36
/**
37
* Appends logs to log buffer file with automatic rotation
38
* File format: <length><log_event> where length is Avro encoded int32
39
* Files are named with monotonically increasing numbers: (max_file_id + 1).buf
40
*/
41
public class LogBufferWriter implements Flushable, Closeable {
42
/**
43
* Create log buffer writer with specified configuration
44
* @param logEventSerializer Serializer for log events
45
* @param locationFactory Factory for creating file locations
46
* @param maxFileSizeInBytes Maximum file size before rotation
47
* @param cleaner Cleanup runnable for old files
48
*/
49
public LogBufferWriter(LoggingEventSerializer logEventSerializer, LocationFactory locationFactory,
50
long maxFileSizeInBytes, Runnable cleaner);
51
52
/**
53
* Append log event to buffer file
54
* @param logEvent Log event to append
55
* @throws IOException if write operation fails
56
*/
57
public void append(LogBufferEvent logEvent) throws IOException;
58
59
/**
60
* Flush buffered data to file system
61
* @throws IOException if flush operation fails
62
*/
63
public void flush() throws IOException;
64
65
/**
66
* Close writer and cleanup resources
67
* @throws IOException if close operation fails
68
*/
69
public void close() throws IOException;
70
}
71
```
72
73
### ConcurrentLogBufferWriter
74
75
Thread-safe wrapper around LogBufferWriter providing concurrent access for multiple threads writing to the same buffer.
76
77
```java { .api }
78
/**
79
* Thread-safe wrapper for LogBufferWriter supporting concurrent writes
80
* Provides synchronization for multiple threads writing to the same log buffer
81
*/
82
public class ConcurrentLogBufferWriter implements Flushable, Closeable {
83
/**
84
* Create concurrent log buffer writer
85
* @param logBufferWriter Underlying writer to wrap
86
*/
87
public ConcurrentLogBufferWriter(LogBufferWriter logBufferWriter);
88
89
/**
90
* Thread-safe append operation
91
* @param logEvent Log event to append
92
* @throws IOException if write operation fails
93
*/
94
public synchronized void append(LogBufferEvent logEvent) throws IOException;
95
96
/**
97
* Thread-safe flush operation
98
* @throws IOException if flush operation fails
99
*/
100
public synchronized void flush() throws IOException;
101
102
/**
103
* Thread-safe close operation
104
* @throws IOException if close operation fails
105
*/
106
public synchronized void close() throws IOException;
107
}
108
```
109
110
### LogBufferHandler
111
112
HTTP request handler for processing log buffer requests through REST endpoints.
113
114
```java { .api }
115
/**
116
* HTTP handler for log buffer requests
117
* Processes incoming log events through HTTP endpoints
118
*/
119
public class LogBufferHandler extends AbstractHttpHandler {
120
// Handles HTTP requests for log buffer operations
121
// POST /logBuffer - Process log events through buffer
122
}
123
```
124
125
### Recovery System
126
127
Components for recovering logs from buffer files after system restarts or failures.
128
129
```java { .api }
130
/**
131
* Service for recovering logs from buffer files
132
* Handles recovery operations after system restarts or failures
133
*/
134
public class LogBufferRecoveryService extends AbstractIdleService {
135
/**
136
* Start recovery service
137
* @throws Exception if recovery startup fails
138
*/
139
protected void startUp() throws Exception;
140
141
/**
142
* Stop recovery service
143
* @throws Exception if recovery shutdown fails
144
*/
145
protected void shutDown() throws Exception;
146
}
147
148
/**
149
* Reader for recovering log events from buffer files
150
* Provides sequential access to log events stored in buffer files
151
*/
152
public class LogBufferReader implements Closeable {
153
/**
154
* Create log buffer reader for specified file
155
* @param bufferFile File containing buffered log events
156
* @param serializer Serializer for deserializing log events
157
*/
158
public LogBufferReader(File bufferFile, LoggingEventSerializer serializer);
159
160
/**
161
* Read next log event from buffer
162
* @return Next log event, or null if end of file reached
163
* @throws IOException if read operation fails
164
*/
165
public LogBufferEvent readNext() throws IOException;
166
167
/**
168
* Close reader and cleanup resources
169
* @throws IOException if close operation fails
170
*/
171
public void close() throws IOException;
172
}
173
```
174
175
### Cleanup System
176
177
Automatic cleanup of processed log buffer files to manage disk space usage.
178
179
```java { .api }
180
/**
181
* Cleaner service for removing processed log buffer files
182
* Automatically removes old buffer files that have been processed
183
*/
184
public class LogBufferCleaner {
185
/**
186
* Create log buffer cleaner with configuration
187
* @param retentionPeriodMs Retention period for buffer files in milliseconds
188
* @param cleanupIntervalMs Interval between cleanup runs in milliseconds
189
*/
190
public LogBufferCleaner(long retentionPeriodMs, long cleanupIntervalMs);
191
192
/**
193
* Start cleanup operations
194
* Begins periodic cleanup of old buffer files
195
*/
196
public void start();
197
198
/**
199
* Stop cleanup operations
200
* Stops periodic cleanup and cleans up resources
201
*/
202
public void stop();
203
}
204
```
205
206
## Data Models
207
208
Data structures for log buffer operations and file management.
209
210
```java { .api }
211
/**
212
* Represents a log event in the buffer system
213
* Wrapper for log events with buffer-specific metadata
214
*/
215
public class LogBufferEvent {
216
/**
217
* Get the underlying log event
218
* @return ILoggingEvent containing the actual log data
219
*/
220
public ILoggingEvent getLoggingEvent();
221
222
/**
223
* Get the timestamp of the event
224
* @return Timestamp in milliseconds since epoch
225
*/
226
public long getTimestamp();
227
}
228
229
/**
230
* Request structure for log buffer operations
231
* Contains log events and metadata for buffer processing
232
*/
233
public class LogBufferRequest {
234
/**
235
* Get log events in this request
236
* @return List of log events to be buffered
237
*/
238
public List<LogBufferEvent> getLogEvents();
239
240
/**
241
* Get request metadata
242
* @return Map of metadata key-value pairs
243
*/
244
public Map<String, String> getMetadata();
245
}
246
247
/**
248
* Represents pending log buffer request awaiting processing
249
* Used for managing queued requests in the buffer system
250
*/
251
public class PendingLogBufferRequest {
252
/**
253
* Get the underlying request
254
* @return LogBufferRequest that is pending processing
255
*/
256
public LogBufferRequest getRequest();
257
258
/**
259
* Get the submission timestamp
260
* @return When this request was submitted for processing
261
*/
262
public long getSubmissionTime();
263
}
264
265
/**
266
* File offset information for log buffer files
267
* Tracks position information for reading/writing buffer files
268
*/
269
public class LogBufferFileOffset {
270
/**
271
* Get the file identifier
272
* @return Unique identifier for the buffer file
273
*/
274
public String getFileId();
275
276
/**
277
* Get the offset within the file
278
* @return Byte offset within the buffer file
279
*/
280
public long getOffset();
281
}
282
```
283
284
### Pipeline Integration
285
286
Components for integrating log buffer with CDAP logging pipelines.
287
288
```java { .api }
289
/**
290
* Pipeline configuration for log buffer processing
291
* Defines how log buffer integrates with logging pipelines
292
*/
293
public class LogBufferPipelineConfig {
294
/**
295
* Get buffer directory path
296
* @return Directory path where buffer files are stored
297
*/
298
public String getBufferDir();
299
300
/**
301
* Get maximum file size for buffer files
302
* @return Maximum size in bytes before file rotation
303
*/
304
public long getMaxFileSize();
305
306
/**
307
* Get retention period for buffer files
308
* @return Retention period in milliseconds
309
*/
310
public long getRetentionPeriod();
311
}
312
313
/**
314
* Log processor pipeline specifically for buffer processing
315
* Integrates buffer operations with CDAP logging pipeline framework
316
*/
317
public class LogBufferProcessorPipeline extends LogProcessorPipelineContext {
318
/**
319
* Create buffer processor pipeline with configuration
320
* @param config Pipeline configuration
321
* @param checkpointManager Manager for tracking processing checkpoints
322
*/
323
public LogBufferProcessorPipeline(LogBufferPipelineConfig config, CheckpointManager<LogBufferFileOffset> checkpointManager);
324
325
/**
326
* Start pipeline processing
327
* @throws Exception if pipeline startup fails
328
*/
329
public void start() throws Exception;
330
331
/**
332
* Stop pipeline processing
333
* @throws Exception if pipeline shutdown fails
334
*/
335
public void stop() throws Exception;
336
}
337
```
338
339
**Usage Examples:**
340
341
```java
342
import io.cdap.cdap.logging.logbuffer.*;
343
import io.cdap.cdap.logging.serialize.LoggingEventSerializer;
344
import org.apache.twill.filesystem.LocalLocationFactory;
345
346
// Create log buffer writer
347
LoggingEventSerializer serializer = new LoggingEventSerializer();
348
LocalLocationFactory locationFactory = new LocalLocationFactory();
349
long maxFileSize = 64 * 1024 * 1024; // 64MB
350
351
LogBufferWriter writer = new LogBufferWriter(
352
serializer,
353
locationFactory,
354
maxFileSize,
355
() -> System.out.println("Cleanup triggered")
356
);
357
358
// Create concurrent wrapper for multi-threaded access
359
ConcurrentLogBufferWriter concurrentWriter = new ConcurrentLogBufferWriter(writer);
360
361
// Append log events
362
LogBufferEvent event = // ... create log event
363
concurrentWriter.append(event);
364
concurrentWriter.flush();
365
366
// Cleanup
367
concurrentWriter.close();
368
369
// Recovery example
370
LogBufferRecoveryService recoveryService = // ... obtain recovery service
371
recoveryService.startUp(); // Recover any pending logs
372
373
// Cleanup configuration
374
LogBufferCleaner cleaner = new LogBufferCleaner(
375
24 * 60 * 60 * 1000L, // 24 hour retention
376
60 * 60 * 1000L // 1 hour cleanup interval
377
);
378
cleaner.start();
379
```