0
# Direct Buffer Streaming
1
2
High-performance streaming API for direct ByteBuffers with minimal memory copying. These classes provide the highest performance option for applications that can work with direct ByteBuffers and implement custom buffer management strategies.
3
4
## Capabilities
5
6
### ZstdDirectBufferCompressingStream - High-Performance Compression
7
8
Stream-based compression using direct ByteBuffers for maximum performance.
9
10
```java { .api }
11
/**
12
* Creates a compressing stream for direct ByteBuffers (protected constructor)
13
* @param target initial target buffer for compressed output (must be direct)
14
* @param level compression level (1-22, higher = better compression)
15
* @throws IOException if initialization fails
16
*/
17
protected ZstdDirectBufferCompressingStream(ByteBuffer target, int level) throws IOException;
18
19
/**
20
* Gets recommended output buffer size for optimal performance
21
* @return recommended buffer size in bytes
22
*/
23
public static int recommendedOutputBufferSize();
24
25
/**
26
* Compresses data from source buffer
27
* @param source source buffer containing data to compress (must be direct)
28
* @throws IOException if compression fails
29
*/
30
public void compress(ByteBuffer source) throws IOException;
31
32
/**
33
* Flushes any pending compressed data
34
* @throws IOException if flush fails
35
*/
36
public void flush() throws IOException;
37
38
/**
39
* Closes the stream and finishes compression
40
* @throws IOException if close fails
41
*/
42
public void close() throws IOException;
43
44
/**
45
* Buffer management callback - override to handle buffer flushing
46
* @param toFlush buffer that needs to be flushed (flip() first to read data)
47
* @return buffer to continue using (typically same buffer after clear())
48
* @throws IOException if buffer handling fails
49
*/
50
protected ByteBuffer flushBuffer(ByteBuffer toFlush) throws IOException;
51
```
52
53
**Usage Examples:**
54
55
```java
56
import com.github.luben.zstd.ZstdDirectBufferCompressingStream;
57
import java.nio.ByteBuffer;
58
import java.io.*;
59
60
// Extend the class to implement buffer management
61
class MyCompressingStream extends ZstdDirectBufferCompressingStream {
62
private final OutputStream output;
63
64
public MyCompressingStream(OutputStream output, int level) throws IOException {
65
super(ByteBuffer.allocateDirect(recommendedOutputBufferSize()), level);
66
this.output = output;
67
}
68
69
@Override
70
protected ByteBuffer flushBuffer(ByteBuffer toFlush) throws IOException {
71
toFlush.flip(); // Prepare for reading
72
73
// Write compressed data to output stream
74
byte[] buffer = new byte[toFlush.remaining()];
75
toFlush.get(buffer);
76
output.write(buffer);
77
78
toFlush.clear(); // Prepare for writing
79
return toFlush; // Reuse same buffer
80
}
81
}
82
83
// Use the custom compressing stream
84
try (MyCompressingStream compressor = new MyCompressingStream(outputStream, 6)) {
85
ByteBuffer sourceData = ByteBuffer.allocateDirect(8192);
86
sourceData.put("Data to compress".getBytes());
87
sourceData.flip();
88
89
compressor.compress(sourceData);
90
compressor.flush();
91
}
92
```
93
94
### ZstdDirectBufferDecompressingStream - High-Performance Decompression
95
96
Stream-based decompression using direct ByteBuffers for maximum performance.
97
98
```java { .api }
99
/**
100
* Creates a decompressing stream for direct ByteBuffers
101
* @param source initial source buffer containing compressed data (must be direct)
102
*/
103
public ZstdDirectBufferDecompressingStream(ByteBuffer source);
104
105
/**
106
* Gets recommended target buffer size for optimal performance
107
* @return recommended buffer size in bytes
108
*/
109
public static int recommendedTargetBufferSize();
110
111
/**
112
* Checks if more data is available for decompression
113
* @return true if more data can be read
114
*/
115
public boolean hasRemaining();
116
117
/**
118
* Reads decompressed data into target buffer
119
* @param target target buffer for decompressed data (must be direct)
120
* @return number of bytes written to target buffer
121
* @throws IOException if decompression fails
122
*/
123
public int read(ByteBuffer target) throws IOException;
124
125
/**
126
* Closes the stream and releases resources
127
* @throws IOException if close fails
128
*/
129
public void close() throws IOException;
130
131
/**
132
* Buffer management callback - override to refill source buffer
133
* @param toRefill current source buffer (may need more data)
134
* @return buffer to continue using (typically same buffer refilled and flipped)
135
*/
136
protected ByteBuffer refill(ByteBuffer toRefill);
137
```
138
139
**Usage Examples:**
140
141
```java
142
import com.github.luben.zstd.ZstdDirectBufferDecompressingStream;
143
import java.nio.ByteBuffer;
144
import java.io.*;
145
146
// Extend the class to implement buffer management
147
class MyDecompressingStream extends ZstdDirectBufferDecompressingStream {
148
private final InputStream input;
149
150
public MyDecompressingStream(InputStream input) throws IOException {
151
super(readInitialData(input));
152
this.input = input;
153
}
154
155
private static ByteBuffer readInitialData(InputStream input) throws IOException {
156
ByteBuffer buffer = ByteBuffer.allocateDirect(8192);
157
byte[] temp = new byte[8192];
158
int bytesRead = input.read(temp);
159
if (bytesRead > 0) {
160
buffer.put(temp, 0, bytesRead);
161
}
162
buffer.flip();
163
return buffer;
164
}
165
166
@Override
167
protected ByteBuffer refill(ByteBuffer toRefill) {
168
try {
169
toRefill.compact(); // Move unread data to beginning
170
171
// Read more data from input stream
172
byte[] temp = new byte[toRefill.remaining()];
173
int bytesRead = input.read(temp);
174
if (bytesRead > 0) {
175
toRefill.put(temp, 0, bytesRead);
176
}
177
178
toRefill.flip(); // Prepare for reading
179
return toRefill;
180
} catch (IOException e) {
181
throw new RuntimeException(e);
182
}
183
}
184
}
185
186
// Use the custom decompressing stream
187
try (MyDecompressingStream decompressor = new MyDecompressingStream(inputStream)) {
188
ByteBuffer targetBuffer = ByteBuffer.allocateDirect(
189
ZstdDirectBufferDecompressingStream.recommendedTargetBufferSize());
190
191
while (decompressor.hasRemaining()) {
192
int bytesRead = decompressor.read(targetBuffer);
193
if (bytesRead > 0) {
194
targetBuffer.flip(); // Prepare for reading
195
196
// Process decompressed data
197
byte[] data = new byte[targetBuffer.remaining()];
198
targetBuffer.get(data);
199
processData(data);
200
201
targetBuffer.clear(); // Prepare for next read
202
}
203
}
204
}
205
```
206
207
### Advanced Buffer Management
208
209
**Multiple Buffer Strategy:**
210
211
```java
212
class MultiBufferCompressor extends ZstdDirectBufferCompressingStream {
213
private final Queue<ByteBuffer> bufferPool;
214
private final OutputStream output;
215
216
public MultiBufferCompressor(OutputStream output, int level) throws IOException {
217
super(ByteBuffer.allocateDirect(recommendedOutputBufferSize()), level);
218
this.output = output;
219
this.bufferPool = new LinkedList<>();
220
221
// Pre-allocate buffer pool
222
for (int i = 0; i < 4; i++) {
223
bufferPool.offer(ByteBuffer.allocateDirect(recommendedOutputBufferSize()));
224
}
225
}
226
227
@Override
228
protected ByteBuffer flushBuffer(ByteBuffer toFlush) throws IOException {
229
// Asynchronously write buffer to output
230
writeBufferAsync(toFlush);
231
232
// Get next buffer from pool
233
ByteBuffer nextBuffer = bufferPool.poll();
234
if (nextBuffer == null) {
235
nextBuffer = ByteBuffer.allocateDirect(recommendedOutputBufferSize());
236
}
237
return nextBuffer;
238
}
239
240
private void writeBufferAsync(ByteBuffer buffer) {
241
// Submit to thread pool for async I/O
242
executor.submit(() -> {
243
try {
244
buffer.flip();
245
writeBufferToStream(buffer, output);
246
buffer.clear();
247
bufferPool.offer(buffer); // Return to pool
248
} catch (IOException e) {
249
// Handle error
250
}
251
});
252
}
253
}
254
```
255
256
**Memory-Mapped File Integration:**
257
258
```java
259
class MMapDecompressor extends ZstdDirectBufferDecompressingStream {
260
private final MappedByteBuffer mappedFile;
261
private int position;
262
263
public MMapDecompressor(Path compressedFile) throws IOException {
264
super(createInitialBuffer(compressedFile));
265
try (RandomAccessFile raf = new RandomAccessFile(compressedFile.toFile(), "r");
266
FileChannel channel = raf.getChannel()) {
267
this.mappedFile = channel.map(FileChannel.MapMode.READ_ONLY, 0, channel.size());
268
}
269
}
270
271
@Override
272
protected ByteBuffer refill(ByteBuffer toRefill) {
273
toRefill.clear();
274
275
// Copy data from memory-mapped file
276
int remaining = Math.min(toRefill.remaining(), mappedFile.remaining());
277
if (remaining > 0) {
278
ByteBuffer slice = mappedFile.slice();
279
slice.limit(remaining);
280
toRefill.put(slice);
281
mappedFile.position(mappedFile.position() + remaining);
282
}
283
284
toRefill.flip();
285
return toRefill;
286
}
287
}
288
```
289
290
## Performance Optimization
291
292
### Buffer Sizing
293
294
```java
295
// Use recommended buffer sizes for optimal performance
296
int outputSize = ZstdDirectBufferCompressingStream.recommendedOutputBufferSize();
297
int inputSize = ZstdDirectBufferDecompressingStream.recommendedTargetBufferSize();
298
299
ByteBuffer outputBuffer = ByteBuffer.allocateDirect(outputSize);
300
ByteBuffer inputBuffer = ByteBuffer.allocateDirect(inputSize);
301
```
302
303
### Memory Management
304
305
- **Direct buffers**: Always use direct ByteBuffers for best performance
306
- **Buffer reuse**: Reuse buffers to minimize allocation overhead
307
- **Pool management**: Use buffer pools for high-throughput applications
308
- **Native memory**: Direct buffers use native memory - monitor usage
309
310
### Threading Considerations
311
312
- **Thread safety**: These classes are not thread-safe
313
- **Async I/O**: Use separate threads for I/O operations to overlap computation
314
- **Buffer handoff**: Carefully manage buffer ownership between threads
315
316
## Error Handling
317
318
Direct buffer streaming methods throw IOException on errors:
319
320
```java
321
try (MyCompressingStream compressor = new MyCompressingStream(output, 6)) {
322
compressor.compress(sourceBuffer);
323
} catch (IOException e) {
324
// Handle compression errors
325
if (e.getMessage().contains("Target buffer has no more space")) {
326
// Buffer management issue
327
} else {
328
// Other compression error
329
}
330
}
331
```
332
333
Common error conditions:
334
- **Buffer not direct**: ArgumentException if non-direct buffers are used
335
- **Buffer overflow**: IOException if target buffer has insufficient space
336
- **Compression errors**: IOException with Zstd error details