0
# Configuration and Utilities
1
2
Comprehensive configuration system with performance tuning options for connection management, I/O settings, security parameters, and utility classes for networking operations.
3
4
## Capabilities
5
6
### Transport Configuration
7
8
Central configuration management for all transport settings with performance tuning options and security controls.
9
10
```java { .api }
11
/**
12
* Central location for all transport configuration settings
13
* Provides typed access to configuration values with defaults
14
*/
15
public class TransportConf {
16
/**
17
* Create a transport configuration
18
* @param module Module name for configuration key prefixes
19
* @param conf Configuration provider for retrieving values
20
*/
21
public TransportConf(String module, ConfigProvider conf);
22
23
/**
24
* Get integer configuration value with default
25
* @param name Configuration key name
26
* @param defaultValue Default value if key not found
27
* @return Integer configuration value
28
*/
29
public int getInt(String name, int defaultValue);
30
31
/**
32
* Get string configuration value with default
33
* @param name Configuration key name
34
* @param defaultValue Default value if key not found
35
* @return String configuration value
36
*/
37
public String get(String name, String defaultValue);
38
39
/**
40
* Get the module name for this configuration
41
* @return Module name string
42
*/
43
public String getModuleName();
44
45
/**
46
* Get I/O mode setting (nio or epoll)
47
* @return I/O mode string
48
*/
49
public String ioMode();
50
51
/**
52
* Whether to prefer direct (off-heap) byte buffers
53
* @return true to use direct buffers, false for heap buffers
54
*/
55
public boolean preferDirectBufs();
56
57
/**
58
* Connection timeout in milliseconds
59
* @return Timeout for establishing connections
60
*/
61
public int connectionTimeoutMs();
62
63
/**
64
* Number of concurrent connections per peer
65
* @return Maximum concurrent connections to same host
66
*/
67
public int numConnectionsPerPeer();
68
69
/**
70
* Server socket backlog size
71
* @return Max length of pending connection queue
72
*/
73
public int backLog();
74
75
/**
76
* Number of server threads for handling connections
77
* @return Server thread pool size
78
*/
79
public int serverThreads();
80
81
/**
82
* Number of client threads for handling connections
83
* @return Client thread pool size
84
*/
85
public int clientThreads();
86
87
/**
88
* Socket receive buffer size (SO_RCVBUF)
89
* @return Receive buffer size in bytes
90
*/
91
public int receiveBuf();
92
93
/**
94
* Socket send buffer size (SO_SNDBUF)
95
* @return Send buffer size in bytes
96
*/
97
public int sendBuf();
98
99
/**
100
* Authentication round trip timeout
101
* @return Auth timeout in milliseconds
102
*/
103
public int authRTTimeoutMs();
104
105
/**
106
* Maximum I/O retry attempts
107
* @return Max retry count for failed I/O operations
108
*/
109
public int maxIORetries();
110
111
/**
112
* Wait time between I/O retry attempts
113
* @return Retry wait time in milliseconds
114
*/
115
public int ioRetryWaitTimeMs();
116
117
/**
118
* Minimum size threshold for memory mapping files
119
* @return Minimum bytes to use memory mapping
120
*/
121
public int memoryMapBytes();
122
123
/**
124
* Whether to initialize file descriptors lazily
125
* @return true for lazy initialization, false for eager
126
*/
127
public boolean lazyFileDescriptor();
128
129
/**
130
* Whether to track detailed Netty metrics
131
* @return true to enable verbose metrics, false for basic
132
*/
133
public boolean verboseMetrics();
134
135
/**
136
* Maximum port binding retry attempts
137
* @return Max retries when binding to ports
138
*/
139
public int portMaxRetries();
140
141
/**
142
* Whether transport encryption is enabled
143
* @return true if encryption should be used
144
*/
145
public boolean encryptionEnabled();
146
147
/**
148
* Cipher transformation for encryption
149
* @return Cipher transformation string (e.g., "AES/CTR/NoPadding")
150
*/
151
public String cipherTransformation();
152
153
/**
154
* Maximum concurrent chunks being transferred on shuffle service
155
* @return Max concurrent chunk transfers
156
*/
157
public long maxChunksBeingTransferred();
158
}
159
```
160
161
**Usage Examples:**
162
163
```java
164
// Create configuration with custom settings
165
Map<String, String> configMap = new HashMap<>();
166
configMap.put("spark.network.timeout", "120s");
167
configMap.put("spark.network.io.mode", "NIO");
168
configMap.put("spark.network.io.numConnectionsPerPeer", "3");
169
configMap.put("spark.network.io.serverThreads", "8");
170
configMap.put("spark.network.io.clientThreads", "8");
171
configMap.put("spark.authenticate", "true");
172
configMap.put("spark.network.crypto.enabled", "true");
173
174
MapConfigProvider configProvider = new MapConfigProvider(configMap);
175
TransportConf conf = new TransportConf("myapp", configProvider);
176
177
// Use configuration values
178
System.out.println("Connection timeout: " + conf.connectionTimeoutMs() + "ms");
179
System.out.println("Connections per peer: " + conf.numConnectionsPerPeer());
180
System.out.println("Encryption enabled: " + conf.encryptionEnabled());
181
System.out.println("I/O mode: " + conf.ioMode());
182
183
// Custom configuration values
184
int customValue = conf.getInt("myapp.custom.setting", 100);
185
String customString = conf.get("myapp.custom.name", "default");
186
```
187
188
### Configuration Providers
189
190
Abstract configuration provider system with concrete implementations for different configuration sources.
191
192
```java { .api }
193
/**
194
* Abstract provider for configuration values
195
* Enables pluggable configuration sources
196
*/
197
public abstract class ConfigProvider {
198
/**
199
* Get configuration value by key
200
* @param name Configuration key
201
* @return Configuration value or null if not found
202
*/
203
public abstract String get(String name);
204
}
205
206
/**
207
* ConfigProvider backed by a Map for in-memory configuration
208
*/
209
public class MapConfigProvider extends ConfigProvider {
210
/**
211
* Create a map-based configuration provider
212
* @param props Map containing configuration key-value pairs
213
*/
214
public MapConfigProvider(Map<String, String> props);
215
216
/**
217
* Get configuration value from the map
218
* @param name Configuration key
219
* @return Configuration value or null if not found
220
*/
221
public String get(String name);
222
}
223
```
224
225
**Usage Examples:**
226
227
```java
228
// Map-based configuration
229
Map<String, String> config = new HashMap<>();
230
config.put("spark.network.timeout", "60s");
231
config.put("spark.network.io.mode", "EPOLL");
232
config.put("spark.authenticate", "true");
233
234
ConfigProvider provider = new MapConfigProvider(config);
235
TransportConf conf = new TransportConf("spark", provider);
236
237
// Properties-based configuration
238
public class PropertiesConfigProvider extends ConfigProvider {
239
private final Properties properties;
240
241
public PropertiesConfigProvider(Properties properties) {
242
this.properties = properties;
243
}
244
245
@Override
246
public String get(String name) {
247
return properties.getProperty(name);
248
}
249
}
250
251
// File-based configuration
252
Properties props = new Properties();
253
try (InputStream is = new FileInputStream("transport.properties")) {
254
props.load(is);
255
}
256
ConfigProvider fileProvider = new PropertiesConfigProvider(props);
257
TransportConf fileConf = new TransportConf("file-app", fileProvider);
258
```
259
260
### Java Utilities
261
262
General Java utility functions for networking operations including resource management and parsing functions.
263
264
```java { .api }
265
/**
266
* Java utility functions for networking operations
267
*/
268
public class JavaUtils {
269
/**
270
* Close a Closeable resource without throwing exceptions
271
* Logs any exceptions that occur during closing
272
* @param closeable Resource to close (may be null)
273
*/
274
public static void closeQuietly(Closeable closeable);
275
276
/**
277
* Parse a time string (e.g., "30s", "5m", "2h") as seconds
278
* @param str Time string with unit suffix
279
* @return Time value in seconds
280
* @throws NumberFormatException if string format is invalid
281
*/
282
public static long timeStringAsSec(String str);
283
284
/**
285
* Parse a byte string (e.g., "1k", "512m", "2g") as bytes
286
* @param str Byte string with unit suffix
287
* @return Byte value as long
288
* @throws NumberFormatException if string format is invalid
289
*/
290
public static long byteStringAsBytes(String str);
291
}
292
```
293
294
**Usage Examples:**
295
296
```java
297
// Safe resource cleanup
298
FileInputStream fis = null;
299
try {
300
fis = new FileInputStream("data.bin");
301
// Use stream
302
} catch (IOException e) {
303
System.err.println("Error: " + e.getMessage());
304
} finally {
305
JavaUtils.closeQuietly(fis); // Won't throw exception
306
}
307
308
// Parse time strings
309
long timeout = JavaUtils.timeStringAsSec("30s"); // 30
310
long maxWait = JavaUtils.timeStringAsSec("5m"); // 300
311
long deadline = JavaUtils.timeStringAsSec("2h"); // 7200
312
313
// Parse byte strings
314
long bufferSize = JavaUtils.byteStringAsBytes("64k"); // 65536
315
long maxMemory = JavaUtils.byteStringAsBytes("512m"); // 536870912
316
long diskSpace = JavaUtils.byteStringAsBytes("2g"); // 2147483648
317
318
// Use in configuration
319
Map<String, String> config = new HashMap<>();
320
config.put("network.timeout", "120s");
321
config.put("buffer.size", "1m");
322
323
TransportConf conf = new TransportConf("app", new MapConfigProvider(config));
324
long timeoutMs = JavaUtils.timeStringAsSec(conf.get("network.timeout", "60s")) * 1000;
325
long bufferBytes = JavaUtils.byteStringAsBytes(conf.get("buffer.size", "64k"));
326
```
327
328
### Netty Utilities
329
330
Netty-specific utility functions for channel operations and frame decoding with transport protocol support.
331
332
```java { .api }
333
/**
334
* Netty-specific utility functions for channel operations
335
*/
336
public class NettyUtils {
337
/**
338
* Get remote address of a channel as string
339
* @param channel Netty channel
340
* @return Remote address string (host:port format)
341
*/
342
public static String getRemoteAddress(Channel channel);
343
344
/**
345
* Create a frame decoder for the transport protocol
346
* @return TransportFrameDecoder instance
347
*/
348
public static TransportFrameDecoder createFrameDecoder();
349
}
350
```
351
352
**Usage Examples:**
353
354
```java
355
// Get remote address for logging
356
Channel channel = // ... obtained from somewhere
357
String remoteAddr = NettyUtils.getRemoteAddress(channel);
358
System.out.println("Connection from: " + remoteAddr);
359
360
// Add frame decoder to pipeline
361
ChannelPipeline pipeline = channel.pipeline();
362
pipeline.addLast("frameDecoder", NettyUtils.createFrameDecoder());
363
```
364
365
### Crypto Utilities
366
367
Cryptography utility functions for secure networking operations including key generation and cipher management.
368
369
```java { .api }
370
/**
371
* Cryptography utility functions for secure networking
372
*/
373
public class CryptoUtils {
374
/**
375
* Generate a secure random byte array
376
* @param length Number of bytes to generate
377
* @return Random byte array
378
*/
379
public static byte[] randomBytes(int length);
380
381
/**
382
* Create cipher instance with the specified transformation
383
* @param transformation Cipher transformation string
384
* @return Cipher instance
385
* @throws GeneralSecurityException if cipher creation fails
386
*/
387
public static Cipher createCipher(String transformation) throws GeneralSecurityException;
388
389
/**
390
* Derive key from password using PBKDF2
391
* @param password Password string
392
* @param salt Salt bytes
393
* @param iterations Number of iterations
394
* @param keyLength Key length in bits
395
* @return Derived key bytes
396
* @throws GeneralSecurityException if key derivation fails
397
*/
398
public static byte[] deriveKey(String password, byte[] salt, int iterations, int keyLength)
399
throws GeneralSecurityException;
400
}
401
```
402
403
**Usage Examples:**
404
405
```java
406
// Generate random data for keys and IVs
407
byte[] key = CryptoUtils.randomBytes(32); // 256-bit key
408
byte[] iv = CryptoUtils.randomBytes(16); // 128-bit IV
409
byte[] salt = CryptoUtils.randomBytes(16); // Salt for key derivation
410
411
// Create cipher for encryption
412
try {
413
Cipher cipher = CryptoUtils.createCipher("AES/CTR/NoPadding");
414
cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(key, "AES"), new IvParameterSpec(iv));
415
// Use cipher for encryption
416
} catch (GeneralSecurityException e) {
417
System.err.println("Cipher creation failed: " + e.getMessage());
418
}
419
420
// Derive key from password
421
try {
422
String password = "user-password";
423
byte[] derivedKey = CryptoUtils.deriveKey(password, salt, 10000, 256);
424
// Use derived key for encryption
425
} catch (GeneralSecurityException e) {
426
System.err.println("Key derivation failed: " + e.getMessage());
427
}
428
```
429
430
### Enumerations
431
432
Type-safe enumerations for configuration options including I/O modes and byte units.
433
434
```java { .api }
435
/**
436
* Enumeration of I/O modes for transport operations
437
*/
438
public enum IOMode {
439
/** Java NIO-based I/O */
440
NIO,
441
442
/** Linux epoll-based I/O (higher performance on Linux) */
443
EPOLL;
444
}
445
446
/**
447
* Enumeration of byte units for configuration values
448
*/
449
public enum ByteUnit {
450
/** Single byte */
451
BYTE(1L),
452
453
/** Kibibyte (1024 bytes) */
454
KiB(1024L),
455
456
/** Mebibyte (1024^2 bytes) */
457
MiB(1024L * 1024L),
458
459
/** Gibibyte (1024^3 bytes) */
460
GiB(1024L * 1024L * 1024L),
461
462
/** Tebibyte (1024^4 bytes) */
463
TiB(1024L * 1024L * 1024L * 1024L),
464
465
/** Pebibyte (1024^5 bytes) */
466
PiB(1024L * 1024L * 1024L * 1024L * 1024L);
467
468
private final long bytes;
469
470
ByteUnit(long bytes) {
471
this.bytes = bytes;
472
}
473
474
/**
475
* Convert value in this unit to bytes
476
* @param value Value in this unit
477
* @return Value in bytes
478
*/
479
public long toBytes(long value);
480
481
/**
482
* Convert bytes to value in this unit
483
* @param bytes Value in bytes
484
* @return Value in this unit
485
*/
486
public long fromBytes(long bytes);
487
}
488
```
489
490
**Usage Examples:**
491
492
```java
493
// Using byte units
494
long bufferSize = ByteUnit.MiB.toBytes(64); // 64 MB in bytes
495
long diskSpace = ByteUnit.GiB.toBytes(10); // 10 GB in bytes
496
497
// Configuration with byte units
498
long configuredSize = conf.getInt("buffer.size.mb", 32);
499
long actualBytes = ByteUnit.MiB.toBytes(configuredSize);
500
501
// I/O mode configuration
502
String ioModeStr = conf.get("io.mode", "NIO");
503
IOMode ioMode = IOMode.valueOf(ioModeStr.toUpperCase());
504
505
switch (ioMode) {
506
case NIO:
507
System.out.println("Using Java NIO");
508
break;
509
case EPOLL:
510
System.out.println("Using Linux epoll (high performance)");
511
break;
512
}
513
```
514
515
### Frame Decoder
516
517
Transport frame decoder for handling message framing with optional stream interception capabilities.
518
519
```java { .api }
520
/**
521
* Netty frame decoder for transport protocol messages
522
* Handles message framing and optional stream interception
523
*/
524
public class TransportFrameDecoder extends LengthFieldBasedFrameDecoder {
525
/**
526
* Create a frame decoder with default settings
527
*/
528
public TransportFrameDecoder();
529
530
/**
531
* Create a frame decoder with stream interceptor
532
* @param interceptor Optional interceptor for stream frames
533
*/
534
public TransportFrameDecoder(Interceptor interceptor);
535
536
/**
537
* Interface for intercepting decoded frames before processing
538
*/
539
public interface Interceptor {
540
/**
541
* Intercept a decoded frame
542
* @param ctx Channel handler context
543
* @param msgHeader Message header bytes
544
* @param msgBody Message body as ManagedBuffer
545
* @return true to continue processing, false to consume the frame
546
* @throws Exception if interception fails
547
*/
548
boolean handle(ChannelHandlerContext ctx, ByteBuf msgHeader, ManagedBuffer msgBody)
549
throws Exception;
550
}
551
}
552
```
553
554
**Usage Examples:**
555
556
```java
557
// Basic frame decoder
558
TransportFrameDecoder decoder = new TransportFrameDecoder();
559
pipeline.addLast("frameDecoder", decoder);
560
561
// Frame decoder with interceptor
562
TransportFrameDecoder.Interceptor streamInterceptor =
563
new TransportFrameDecoder.Interceptor() {
564
@Override
565
public boolean handle(ChannelHandlerContext ctx, ByteBuf msgHeader, ManagedBuffer msgBody)
566
throws Exception {
567
// Check if this is a stream frame we want to intercept
568
if (isStreamFrame(msgHeader)) {
569
handleStreamFrame(msgBody);
570
return false; // Consume the frame
571
}
572
return true; // Continue normal processing
573
}
574
};
575
576
TransportFrameDecoder interceptingDecoder = new TransportFrameDecoder(streamInterceptor);
577
pipeline.addLast("frameDecoder", interceptingDecoder);
578
```
579
580
## Configuration Usage Patterns
581
582
### Production Configuration
583
584
```java
585
// High-performance production configuration
586
Map<String, String> prodConfig = new HashMap<>();
587
588
// Connection settings
589
prodConfig.put("spark.network.timeout", "300s");
590
prodConfig.put("spark.network.io.mode", "EPOLL");
591
prodConfig.put("spark.network.io.numConnectionsPerPeer", "5");
592
prodConfig.put("spark.network.io.serverThreads", "16");
593
prodConfig.put("spark.network.io.clientThreads", "16");
594
595
// Buffer settings
596
prodConfig.put("spark.network.io.preferDirectBufs", "true");
597
prodConfig.put("spark.network.io.receiveBuf", "1m");
598
prodConfig.put("spark.network.io.sendBuf", "1m");
599
600
// Security settings
601
prodConfig.put("spark.authenticate", "true");
602
prodConfig.put("spark.network.crypto.enabled", "true");
603
prodConfig.put("spark.network.crypto.keyLength", "256");
604
605
// Performance tuning
606
prodConfig.put("spark.network.io.memoryMapBytes", "2m");
607
prodConfig.put("spark.network.io.lazyFD", "true");
608
prodConfig.put("spark.network.maxChunksBeingTransferred", "1000");
609
610
TransportConf prodConf = new TransportConf("spark", new MapConfigProvider(prodConfig));
611
```
612
613
### Development Configuration
614
615
```java
616
// Development/testing configuration
617
Map<String, String> devConfig = new HashMap<>();
618
619
// Relaxed timeouts for debugging
620
devConfig.put("spark.network.timeout", "600s");
621
devConfig.put("spark.network.io.mode", "NIO");
622
devConfig.put("spark.network.io.numConnectionsPerPeer", "1");
623
624
// Smaller thread pools
625
devConfig.put("spark.network.io.serverThreads", "2");
626
devConfig.put("spark.network.io.clientThreads", "2");
627
628
// Verbose metrics for monitoring
629
devConfig.put("spark.network.verbose.metrics", "true");
630
631
// Disabled security for easier testing
632
devConfig.put("spark.authenticate", "false");
633
devConfig.put("spark.network.crypto.enabled", "false");
634
635
TransportConf devConf = new TransportConf("spark-dev", new MapConfigProvider(devConfig));
636
```
637
638
### Dynamic Configuration
639
640
```java
641
// Configuration that can be updated at runtime
642
public class DynamicConfigProvider extends ConfigProvider {
643
private volatile Map<String, String> config = new ConcurrentHashMap<>();
644
645
public void updateConfig(String key, String value) {
646
config.put(key, value);
647
}
648
649
public void removeConfig(String key) {
650
config.remove(key);
651
}
652
653
@Override
654
public String get(String name) {
655
return config.get(name);
656
}
657
}
658
659
DynamicConfigProvider dynamicProvider = new DynamicConfigProvider();
660
TransportConf dynamicConf = new TransportConf("dynamic", dynamicProvider);
661
662
// Update configuration at runtime
663
dynamicProvider.updateConfig("spark.network.timeout", "120s");
664
dynamicProvider.updateConfig("spark.network.io.numConnectionsPerPeer", "3");
665
```