0
# I/O Framework
1
2
I/O abstractions including binary encoding/decoding, location management, seekable streams, and various I/O utilities for file and stream processing.
3
4
## Capabilities
5
6
### Binary Encoding and Decoding
7
8
Utilities for binary data encoding and decoding operations.
9
10
```java { .api }
11
/**
12
* Binary encoder for writing structured data
13
*/
14
public class BinaryEncoder {
15
public BinaryEncoder(OutputStream output);
16
17
/**
18
* Write boolean value
19
*/
20
public void writeBool(boolean value) throws IOException;
21
22
/**
23
* Write byte value
24
*/
25
public void writeByte(byte value) throws IOException;
26
27
/**
28
* Write integer value
29
*/
30
public void writeInt(int value) throws IOException;
31
32
/**
33
* Write long value
34
*/
35
public void writeLong(long value) throws IOException;
36
37
/**
38
* Write float value
39
*/
40
public void writeFloat(float value) throws IOException;
41
42
/**
43
* Write double value
44
*/
45
public void writeDouble(double value) throws IOException;
46
47
/**
48
* Write string value
49
*/
50
public void writeString(String value) throws IOException;
51
52
/**
53
* Write byte array
54
*/
55
public void writeBytes(byte[] bytes) throws IOException;
56
57
/**
58
* Flush the encoder
59
*/
60
public void flush() throws IOException;
61
}
62
63
/**
64
* Binary decoder for reading structured data
65
*/
66
public class BinaryDecoder {
67
public BinaryDecoder(InputStream input);
68
69
/**
70
* Read boolean value
71
*/
72
public boolean readBool() throws IOException;
73
74
/**
75
* Read byte value
76
*/
77
public byte readByte() throws IOException;
78
79
/**
80
* Read integer value
81
*/
82
public int readInt() throws IOException;
83
84
/**
85
* Read long value
86
*/
87
public long readLong() throws IOException;
88
89
/**
90
* Read float value
91
*/
92
public float readFloat() throws IOException;
93
94
/**
95
* Read double value
96
*/
97
public double readDouble() throws IOException;
98
99
/**
100
* Read string value
101
*/
102
public String readString() throws IOException;
103
104
/**
105
* Read byte array
106
*/
107
public byte[] readBytes() throws IOException;
108
}
109
```
110
111
### Codec Abstraction
112
113
Generic encoding/decoding interface for object serialization.
114
115
```java { .api }
116
/**
117
* Generic codec interface for encoding/decoding objects
118
*/
119
public interface Codec<T> {
120
/**
121
* Encode object to byte array
122
*/
123
byte[] encode(T object) throws IOException;
124
125
/**
126
* Decode byte array to object
127
*/
128
T decode(byte[] data) throws IOException;
129
130
/**
131
* Encode object to output stream
132
*/
133
default void encode(T object, OutputStream output) throws IOException;
134
135
/**
136
* Decode object from input stream
137
*/
138
default T decode(InputStream input) throws IOException;
139
}
140
```
141
142
### Location Abstractions
143
144
Abstractions for file system and location management.
145
146
```java { .api }
147
/**
148
* Factory for creating location instances with caching
149
*/
150
public class CachingLocationFactory implements LocationFactory {
151
public CachingLocationFactory(LocationFactory delegate, int cacheSize);
152
153
@Override
154
public Location create(String path);
155
156
@Override
157
public Location create(URI uri);
158
159
/**
160
* Clear location cache
161
*/
162
public void clearCache();
163
164
/**
165
* Get cache statistics
166
*/
167
public CacheStats getCacheStats();
168
}
169
170
/**
171
* Interface for locations that support linking
172
*/
173
public interface LinkableLocation extends Location {
174
/**
175
* Create a symbolic link to this location
176
*/
177
void createSymbolicLink(Location linkLocation) throws IOException;
178
179
/**
180
* Create a hard link to this location
181
*/
182
void createHardLink(Location linkLocation) throws IOException;
183
184
/**
185
* Check if this location is a symbolic link
186
*/
187
boolean isSymbolicLink();
188
189
/**
190
* Get the target of a symbolic link
191
*/
192
Location getLinkTarget() throws IOException;
193
}
194
195
/**
196
* Enumeration of location status types
197
*/
198
public enum LocationStatus {
199
FILE,
200
DIRECTORY,
201
SYMBOLIC_LINK,
202
DOES_NOT_EXIST;
203
204
/**
205
* Check if status indicates existence
206
*/
207
public boolean exists();
208
209
/**
210
* Check if status indicates a file
211
*/
212
public boolean isFile();
213
214
/**
215
* Check if status indicates a directory
216
*/
217
public boolean isDirectory();
218
}
219
```
220
221
### Stream Processing
222
223
Stream processing abstractions for seekable and advanced stream operations.
224
225
```java { .api }
226
/**
227
* Abstract seekable input stream
228
*/
229
public abstract class SeekableInputStream extends InputStream {
230
/**
231
* Seek to specific position in stream
232
*/
233
public abstract void seek(long pos) throws IOException;
234
235
/**
236
* Get current position in stream
237
*/
238
public abstract long getPos() throws IOException;
239
240
/**
241
* Get total length of stream if known
242
*/
243
public abstract long length() throws IOException;
244
245
/**
246
* Check if stream supports seeking
247
*/
248
public boolean seekable();
249
}
250
251
/**
252
* File-based seekable input stream implementation
253
*/
254
public class FileSeekableInputStream extends SeekableInputStream {
255
public FileSeekableInputStream(File file) throws FileNotFoundException;
256
public FileSeekableInputStream(RandomAccessFile raf);
257
258
@Override
259
public int read() throws IOException;
260
261
@Override
262
public int read(byte[] b, int off, int len) throws IOException;
263
264
@Override
265
public void seek(long pos) throws IOException;
266
267
@Override
268
public long getPos() throws IOException;
269
270
@Override
271
public long length() throws IOException;
272
273
@Override
274
public void close() throws IOException;
275
}
276
```
277
278
### I/O Utilities
279
280
Utility classes for common I/O operations and stream handling.
281
282
```java { .api }
283
/**
284
* Location utility functions
285
*/
286
public class Locations {
287
/**
288
* Copy from one location to another
289
*/
290
public static void copy(Location source, Location destination) throws IOException;
291
292
/**
293
* Move from one location to another
294
*/
295
public static void move(Location source, Location destination) throws IOException;
296
297
/**
298
* Delete location recursively
299
*/
300
public static void deleteRecursively(Location location) throws IOException;
301
302
/**
303
* Create parent directories if they don't exist
304
*/
305
public static void mkdirsIfNotExists(Location location) throws IOException;
306
307
/**
308
* Get relative path between two locations
309
*/
310
public static String getRelativePath(Location base, Location target);
311
312
/**
313
* Check if location is under another location
314
*/
315
public static boolean isChild(Location parent, Location child);
316
}
317
318
/**
319
* ByteBuffer utility functions
320
*/
321
public class ByteBuffers {
322
/**
323
* Convert byte array to ByteBuffer
324
*/
325
public static ByteBuffer wrap(byte[] bytes);
326
327
/**
328
* Convert ByteBuffer to byte array
329
*/
330
public static byte[] getBytes(ByteBuffer buffer);
331
332
/**
333
* Copy ByteBuffer contents
334
*/
335
public static ByteBuffer copy(ByteBuffer source);
336
337
/**
338
* Merge multiple ByteBuffers
339
*/
340
public static ByteBuffer merge(ByteBuffer... buffers);
341
342
/**
343
* Compare two ByteBuffers
344
*/
345
public static int compare(ByteBuffer a, ByteBuffer b);
346
}
347
348
/**
349
* URL connection utilities
350
*/
351
public class URLConnections {
352
/**
353
* Set common properties for HTTP connections
354
*/
355
public static void setDefaultProperties(URLConnection connection);
356
357
/**
358
* Set timeout properties
359
*/
360
public static void setTimeout(URLConnection connection, int timeoutMs);
361
362
/**
363
* Set authentication headers
364
*/
365
public static void setAuthentication(URLConnection connection, String username, String password);
366
367
/**
368
* Get response as string
369
*/
370
public static String getResponse(URLConnection connection) throws IOException;
371
372
/**
373
* Download to file
374
*/
375
public static void downloadToFile(URLConnection connection, File destination) throws IOException;
376
}
377
```
378
379
**Usage Examples:**
380
381
```java
382
import io.cdap.cdap.common.io.*;
383
import java.nio.ByteBuffer;
384
385
// Binary encoding/decoding
386
public class DataSerializer {
387
public byte[] serializeUserData(String username, int age, boolean active) throws IOException {
388
ByteArrayOutputStream baos = new ByteArrayOutputStream();
389
BinaryEncoder encoder = new BinaryEncoder(baos);
390
391
encoder.writeString(username);
392
encoder.writeInt(age);
393
encoder.writeBool(active);
394
encoder.flush();
395
396
return baos.toByteArray();
397
}
398
399
public UserData deserializeUserData(byte[] data) throws IOException {
400
ByteArrayInputStream bais = new ByteArrayInputStream(data);
401
BinaryDecoder decoder = new BinaryDecoder(bais);
402
403
String username = decoder.readString();
404
int age = decoder.readInt();
405
boolean active = decoder.readBool();
406
407
return new UserData(username, age, active);
408
}
409
}
410
411
// Custom codec implementation
412
public class JsonCodec<T> implements Codec<T> {
413
private final Class<T> type;
414
private final ObjectMapper objectMapper;
415
416
public JsonCodec(Class<T> type) {
417
this.type = type;
418
this.objectMapper = new ObjectMapper();
419
}
420
421
@Override
422
public byte[] encode(T object) throws IOException {
423
return objectMapper.writeValueAsBytes(object);
424
}
425
426
@Override
427
public T decode(byte[] data) throws IOException {
428
return objectMapper.readValue(data, type);
429
}
430
}
431
432
// Location management with caching
433
public class FileManager {
434
private final CachingLocationFactory locationFactory;
435
436
public FileManager(LocationFactory baseFactory) {
437
this.locationFactory = new CachingLocationFactory(baseFactory, 1000);
438
}
439
440
public void processFiles(String basePath, List<String> filePaths) throws IOException {
441
Location baseLocation = locationFactory.create(basePath);
442
Locations.mkdirsIfNotExists(baseLocation);
443
444
for (String filePath : filePaths) {
445
Location fileLocation = locationFactory.create(
446
baseLocation.toURI().resolve(filePath).toString()
447
);
448
449
if (fileLocation.exists()) {
450
processFile(fileLocation);
451
} else {
452
System.out.println("File not found: " + filePath);
453
}
454
}
455
456
// Print cache statistics
457
System.out.println("Cache stats: " + locationFactory.getCacheStats());
458
}
459
460
public void createSymbolicLinks(Location sourceDir, Location linkDir) throws IOException {
461
if (sourceDir instanceof LinkableLocation && linkDir instanceof LinkableLocation) {
462
LinkableLocation linkableSource = (LinkableLocation) sourceDir;
463
LinkableLocation linkableTarget = (LinkableLocation) linkDir;
464
465
for (Location file : sourceDir.list()) {
466
if (file.isFile()) {
467
Location linkFile = linkDir.append(file.getName());
468
((LinkableLocation) file).createSymbolicLink(linkFile);
469
}
470
}
471
}
472
}
473
474
private void processFile(Location location) throws IOException {
475
// File processing logic
476
System.out.println("Processing: " + location.getName() +
477
" (size: " + location.length() + " bytes)");
478
}
479
}
480
481
// Seekable stream processing
482
public class LogFileProcessor {
483
public void processLogFile(File logFile, long startOffset, long endOffset) throws IOException {
484
try (FileSeekableInputStream stream = new FileSeekableInputStream(logFile)) {
485
// Seek to start position
486
stream.seek(startOffset);
487
488
byte[] buffer = new byte[8192];
489
long currentPos = stream.getPos();
490
491
while (currentPos < endOffset) {
492
int bytesToRead = (int) Math.min(buffer.length, endOffset - currentPos);
493
int bytesRead = stream.read(buffer, 0, bytesToRead);
494
495
if (bytesRead == -1) {
496
break; // End of file
497
}
498
499
// Process the read data
500
processLogData(buffer, 0, bytesRead);
501
502
currentPos = stream.getPos();
503
}
504
}
505
}
506
507
public void indexLogFile(File logFile) throws IOException {
508
try (FileSeekableInputStream stream = new FileSeekableInputStream(logFile)) {
509
Map<String, Long> lineIndex = new HashMap<>();
510
long position = 0;
511
String line;
512
513
BufferedReader reader = new BufferedReader(
514
new InputStreamReader(stream, StandardCharsets.UTF_8)
515
);
516
517
while ((line = reader.readLine()) != null) {
518
// Extract timestamp or identifier from line
519
String key = extractLineKey(line);
520
if (key != null) {
521
lineIndex.put(key, position);
522
}
523
position = stream.getPos();
524
}
525
526
System.out.println("Indexed " + lineIndex.size() + " lines");
527
}
528
}
529
530
private void processLogData(byte[] data, int offset, int length) {
531
// Process log data
532
String logData = new String(data, offset, length, StandardCharsets.UTF_8);
533
System.out.println("Processing log data: " + logData.length() + " chars");
534
}
535
536
private String extractLineKey(String line) {
537
// Extract timestamp, request ID, etc.
538
return line.length() > 20 ? line.substring(0, 20) : null;
539
}
540
}
541
542
// Advanced I/O utilities
543
public class DataTransferService {
544
public void transferLargeFile(Location source, Location destination) throws IOException {
545
System.out.println("Transferring: " + source.getName() + " -> " + destination.getName());
546
547
// Copy with progress tracking
548
long totalSize = source.length();
549
long transferred = 0;
550
551
try (InputStream input = source.getInputStream();
552
OutputStream output = destination.getOutputStream()) {
553
554
byte[] buffer = new byte[64 * 1024]; // 64KB buffer
555
int bytesRead;
556
557
while ((bytesRead = input.read(buffer)) != -1) {
558
output.write(buffer, 0, bytesRead);
559
transferred += bytesRead;
560
561
// Report progress
562
if (transferred % (1024 * 1024) == 0) { // Every MB
563
double progress = (double) transferred / totalSize * 100;
564
System.out.printf("Progress: %.1f%% (%d/%d bytes)%n",
565
progress, transferred, totalSize);
566
}
567
}
568
}
569
570
System.out.println("Transfer completed: " + transferred + " bytes");
571
}
572
573
public void downloadFromUrl(String url, File destination, int timeoutMs) throws IOException {
574
URLConnection connection = new URL(url).openConnection();
575
URLConnections.setTimeout(connection, timeoutMs);
576
URLConnections.setDefaultProperties(connection);
577
578
// Download to file
579
URLConnections.downloadToFile(connection, destination);
580
581
System.out.println("Downloaded: " + url + " -> " + destination.getAbsolutePath());
582
}
583
584
public void mergeFiles(List<Location> sourceFiles, Location destination) throws IOException {
585
try (OutputStream output = destination.getOutputStream()) {
586
for (Location sourceFile : sourceFiles) {
587
System.out.println("Merging: " + sourceFile.getName());
588
589
try (InputStream input = sourceFile.getInputStream()) {
590
byte[] buffer = new byte[32 * 1024];
591
int bytesRead;
592
593
while ((bytesRead = input.read(buffer)) != -1) {
594
output.write(buffer, 0, bytesRead);
595
}
596
}
597
}
598
}
599
600
System.out.println("Merged " + sourceFiles.size() + " files into: " +
601
destination.getName());
602
}
603
}
604
605
// ByteBuffer operations
606
public class BufferProcessor {
607
public void processDataBuffers(List<ByteBuffer> buffers) {
608
// Merge all buffers
609
ByteBuffer merged = ByteBuffers.merge(buffers.toArray(new ByteBuffer[0]));
610
611
System.out.println("Merged buffer size: " + merged.remaining() + " bytes");
612
613
// Process merged data
614
byte[] data = ByteBuffers.getBytes(merged);
615
processData(data);
616
}
617
618
public void compareBuffers(ByteBuffer buffer1, ByteBuffer buffer2) {
619
int comparison = ByteBuffers.compare(buffer1, buffer2);
620
621
if (comparison == 0) {
622
System.out.println("Buffers are identical");
623
} else if (comparison < 0) {
624
System.out.println("Buffer1 is lexicographically smaller");
625
} else {
626
System.out.println("Buffer1 is lexicographically larger");
627
}
628
}
629
630
private void processData(byte[] data) {
631
// Data processing logic
632
System.out.println("Processing " + data.length + " bytes");
633
}
634
}
635
```