or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdlog-buffer.mdlogging-context.mdlogging-service.mdmetrics-collection.mdmetrics-processing.mdmetrics-query.md

log-buffer.mddocs/

0

# Log Buffer System

1

2

High-throughput log buffering infrastructure for temporary log storage, pipeline processing, automatic recovery, and cleanup operations. The Log Buffer system provides resilient log processing capabilities for CDAP platform components.

3

4

## Capabilities

5

6

### LogBufferService

7

8

Main service managing log buffer pipelines, recovery, cleanup, and HTTP endpoints for high-throughput log processing scenarios.

9

10

```java { .api }

11

/**

12

* Manages log buffer pipelines, recovery, cleanup, and HTTP endpoint

13

* Responsible for loading, starting and stopping log buffer pipelines, creating concurrent writers,

14

* starting cleaner service, and recovering logs from buffer

15

*/

16

public class LogBufferService extends AbstractIdleService {

17

/**

18

* Start log buffer service including pipeline loading and recovery

19

* @throws Exception if service startup fails

20

*/

21

protected void startUp() throws Exception;

22

23

/**

24

* Stop log buffer service and cleanup resources

25

* @throws Exception if service shutdown fails

26

*/

27

protected void shutDown() throws Exception;

28

}

29

```

30

31

### LogBufferWriter

32

33

File-based log writer that appends logs to rotating buffer files with automatic file rotation when maximum size is reached.

34

35

```java { .api }

36

/**

37

* Appends logs to log buffer file with automatic rotation

38

* File format: <length><log_event> where length is Avro encoded int32

39

* Files are named with monotonically increasing numbers: (max_file_id + 1).buf

40

*/

41

public class LogBufferWriter implements Flushable, Closeable {

42

/**

43

* Create log buffer writer with specified configuration

44

* @param logEventSerializer Serializer for log events

45

* @param locationFactory Factory for creating file locations

46

* @param maxFileSizeInBytes Maximum file size before rotation

47

* @param cleaner Cleanup runnable for old files

48

*/

49

public LogBufferWriter(LoggingEventSerializer logEventSerializer, LocationFactory locationFactory,

50

long maxFileSizeInBytes, Runnable cleaner);

51

52

/**

53

* Append log event to buffer file

54

* @param logEvent Log event to append

55

* @throws IOException if write operation fails

56

*/

57

public void append(LogBufferEvent logEvent) throws IOException;

58

59

/**

60

* Flush buffered data to file system

61

* @throws IOException if flush operation fails

62

*/

63

public void flush() throws IOException;

64

65

/**

66

* Close writer and cleanup resources

67

* @throws IOException if close operation fails

68

*/

69

public void close() throws IOException;

70

}

71

```

72

73

### ConcurrentLogBufferWriter

74

75

Thread-safe wrapper around LogBufferWriter providing concurrent access for multiple threads writing to the same buffer.

76

77

```java { .api }

78

/**

79

* Thread-safe wrapper for LogBufferWriter supporting concurrent writes

80

* Provides synchronization for multiple threads writing to the same log buffer

81

*/

82

public class ConcurrentLogBufferWriter implements Flushable, Closeable {

83

/**

84

* Create concurrent log buffer writer

85

* @param logBufferWriter Underlying writer to wrap

86

*/

87

public ConcurrentLogBufferWriter(LogBufferWriter logBufferWriter);

88

89

/**

90

* Thread-safe append operation

91

* @param logEvent Log event to append

92

* @throws IOException if write operation fails

93

*/

94

public synchronized void append(LogBufferEvent logEvent) throws IOException;

95

96

/**

97

* Thread-safe flush operation

98

* @throws IOException if flush operation fails

99

*/

100

public synchronized void flush() throws IOException;

101

102

/**

103

* Thread-safe close operation

104

* @throws IOException if close operation fails

105

*/

106

public synchronized void close() throws IOException;

107

}

108

```

109

110

### LogBufferHandler

111

112

HTTP request handler for processing log buffer requests through REST endpoints.

113

114

```java { .api }

115

/**

116

* HTTP handler for log buffer requests

117

* Processes incoming log events through HTTP endpoints

118

*/

119

public class LogBufferHandler extends AbstractHttpHandler {

120

// Handles HTTP requests for log buffer operations

121

// POST /logBuffer - Process log events through buffer

122

}

123

```

124

125

### Recovery System

126

127

Components for recovering logs from buffer files after system restarts or failures.

128

129

```java { .api }

130

/**

131

* Service for recovering logs from buffer files

132

* Handles recovery operations after system restarts or failures

133

*/

134

public class LogBufferRecoveryService extends AbstractIdleService {

135

/**

136

* Start recovery service

137

* @throws Exception if recovery startup fails

138

*/

139

protected void startUp() throws Exception;

140

141

/**

142

* Stop recovery service

143

* @throws Exception if recovery shutdown fails

144

*/

145

protected void shutDown() throws Exception;

146

}

147

148

/**

149

* Reader for recovering log events from buffer files

150

* Provides sequential access to log events stored in buffer files

151

*/

152

public class LogBufferReader implements Closeable {

153

/**

154

* Create log buffer reader for specified file

155

* @param bufferFile File containing buffered log events

156

* @param serializer Serializer for deserializing log events

157

*/

158

public LogBufferReader(File bufferFile, LoggingEventSerializer serializer);

159

160

/**

161

* Read next log event from buffer

162

* @return Next log event, or null if end of file reached

163

* @throws IOException if read operation fails

164

*/

165

public LogBufferEvent readNext() throws IOException;

166

167

/**

168

* Close reader and cleanup resources

169

* @throws IOException if close operation fails

170

*/

171

public void close() throws IOException;

172

}

173

```

174

175

### Cleanup System

176

177

Automatic cleanup of processed log buffer files to manage disk space usage.

178

179

```java { .api }

180

/**

181

* Cleaner service for removing processed log buffer files

182

* Automatically removes old buffer files that have been processed

183

*/

184

public class LogBufferCleaner {

185

/**

186

* Create log buffer cleaner with configuration

187

* @param retentionPeriodMs Retention period for buffer files in milliseconds

188

* @param cleanupIntervalMs Interval between cleanup runs in milliseconds

189

*/

190

public LogBufferCleaner(long retentionPeriodMs, long cleanupIntervalMs);

191

192

/**

193

* Start cleanup operations

194

* Begins periodic cleanup of old buffer files

195

*/

196

public void start();

197

198

/**

199

* Stop cleanup operations

200

* Stops periodic cleanup and cleans up resources

201

*/

202

public void stop();

203

}

204

```

205

206

## Data Models

207

208

Data structures for log buffer operations and file management.

209

210

```java { .api }

211

/**

212

* Represents a log event in the buffer system

213

* Wrapper for log events with buffer-specific metadata

214

*/

215

public class LogBufferEvent {

216

/**

217

* Get the underlying log event

218

* @return ILoggingEvent containing the actual log data

219

*/

220

public ILoggingEvent getLoggingEvent();

221

222

/**

223

* Get the timestamp of the event

224

* @return Timestamp in milliseconds since epoch

225

*/

226

public long getTimestamp();

227

}

228

229

/**

230

* Request structure for log buffer operations

231

* Contains log events and metadata for buffer processing

232

*/

233

public class LogBufferRequest {

234

/**

235

* Get log events in this request

236

* @return List of log events to be buffered

237

*/

238

public List<LogBufferEvent> getLogEvents();

239

240

/**

241

* Get request metadata

242

* @return Map of metadata key-value pairs

243

*/

244

public Map<String, String> getMetadata();

245

}

246

247

/**

248

* Represents pending log buffer request awaiting processing

249

* Used for managing queued requests in the buffer system

250

*/

251

public class PendingLogBufferRequest {

252

/**

253

* Get the underlying request

254

* @return LogBufferRequest that is pending processing

255

*/

256

public LogBufferRequest getRequest();

257

258

/**

259

* Get the submission timestamp

260

* @return When this request was submitted for processing

261

*/

262

public long getSubmissionTime();

263

}

264

265

/**

266

* File offset information for log buffer files

267

* Tracks position information for reading/writing buffer files

268

*/

269

public class LogBufferFileOffset {

270

/**

271

* Get the file identifier

272

* @return Unique identifier for the buffer file

273

*/

274

public String getFileId();

275

276

/**

277

* Get the offset within the file

278

* @return Byte offset within the buffer file

279

*/

280

public long getOffset();

281

}

282

```

283

284

### Pipeline Integration

285

286

Components for integrating log buffer with CDAP logging pipelines.

287

288

```java { .api }

289

/**

290

* Pipeline configuration for log buffer processing

291

* Defines how log buffer integrates with logging pipelines

292

*/

293

public class LogBufferPipelineConfig {

294

/**

295

* Get buffer directory path

296

* @return Directory path where buffer files are stored

297

*/

298

public String getBufferDir();

299

300

/**

301

* Get maximum file size for buffer files

302

* @return Maximum size in bytes before file rotation

303

*/

304

public long getMaxFileSize();

305

306

/**

307

* Get retention period for buffer files

308

* @return Retention period in milliseconds

309

*/

310

public long getRetentionPeriod();

311

}

312

313

/**

314

* Log processor pipeline specifically for buffer processing

315

* Integrates buffer operations with CDAP logging pipeline framework

316

*/

317

public class LogBufferProcessorPipeline extends LogProcessorPipelineContext {

318

/**

319

* Create buffer processor pipeline with configuration

320

* @param config Pipeline configuration

321

* @param checkpointManager Manager for tracking processing checkpoints

322

*/

323

public LogBufferProcessorPipeline(LogBufferPipelineConfig config, CheckpointManager<LogBufferFileOffset> checkpointManager);

324

325

/**

326

* Start pipeline processing

327

* @throws Exception if pipeline startup fails

328

*/

329

public void start() throws Exception;

330

331

/**

332

* Stop pipeline processing

333

* @throws Exception if pipeline shutdown fails

334

*/

335

public void stop() throws Exception;

336

}

337

```

338

339

**Usage Examples:**

340

341

```java

342

import io.cdap.cdap.logging.logbuffer.*;

343

import io.cdap.cdap.logging.serialize.LoggingEventSerializer;

344

import org.apache.twill.filesystem.LocalLocationFactory;

345

346

// Create log buffer writer

347

LoggingEventSerializer serializer = new LoggingEventSerializer();

348

LocalLocationFactory locationFactory = new LocalLocationFactory();

349

long maxFileSize = 64 * 1024 * 1024; // 64MB

350

351

LogBufferWriter writer = new LogBufferWriter(

352

serializer,

353

locationFactory,

354

maxFileSize,

355

() -> System.out.println("Cleanup triggered")

356

);

357

358

// Create concurrent wrapper for multi-threaded access

359

ConcurrentLogBufferWriter concurrentWriter = new ConcurrentLogBufferWriter(writer);

360

361

// Append log events

362

LogBufferEvent event = // ... create log event

363

concurrentWriter.append(event);

364

concurrentWriter.flush();

365

366

// Cleanup

367

concurrentWriter.close();

368

369

// Recovery example

370

LogBufferRecoveryService recoveryService = // ... obtain recovery service

371

recoveryService.startUp(); // Recover any pending logs

372

373

// Cleanup configuration

374

LogBufferCleaner cleaner = new LogBufferCleaner(

375

24 * 60 * 60 * 1000L, // 24 hour retention

376

60 * 60 * 1000L // 1 hour cleanup interval

377

);

378

cleaner.start();

379

```