0
# Configuration and Utilities
1
2
Configuration system and utility classes for transport settings, Netty integration, and Java operations. The configuration framework provides centralized management of transport behavior with multiple provider implementations.
3
4
## Capabilities
5
6
### TransportConf
7
8
Central configuration class managing all transport layer settings and behavior.
9
10
```java { .api }
11
/**
12
* TransportConf manages configuration settings for the Spark transport layer.
13
* It provides typed access to all networking configuration options with defaults.
14
*/
15
public class TransportConf {
16
/**
17
* Creates a transport configuration.
18
*
19
* @param module The configuration module name (used as key prefix)
20
* @param conf The configuration provider for loading settings
21
*/
22
public TransportConf(String module, ConfigProvider conf);
23
24
/**
25
* Gets the I/O mode for network operations.
26
*
27
* @return "NIO" or "EPOLL" (Linux only)
28
*/
29
public String ioMode();
30
31
/**
32
* Whether to prefer direct ByteBuffers for better performance.
33
*
34
* @return true to prefer direct buffers
35
*/
36
public boolean preferDirectBufs();
37
38
/**
39
* Connection timeout in milliseconds.
40
*
41
* @return Timeout for establishing connections
42
*/
43
public int connectionTimeoutMs();
44
45
/**
46
* Maximum number of connections per remote peer.
47
*
48
* @return Number of connections to maintain per peer
49
*/
50
public int numConnectionsPerPeer();
51
52
/**
53
* Server socket backlog size.
54
*
55
* @return Number of pending connections to queue
56
*/
57
public int backLog();
58
59
/**
60
* Number of server worker threads.
61
*
62
* @return Thread count for server-side operations
63
*/
64
public int serverThreads();
65
66
/**
67
* Number of client worker threads.
68
*
69
* @return Thread count for client-side operations
70
*/
71
public int clientThreads();
72
73
/**
74
* Socket receive buffer size in bytes.
75
*
76
* @return Size of socket receive buffer
77
*/
78
public int receiveBuf();
79
80
/**
81
* Socket send buffer size in bytes.
82
*
83
* @return Size of socket send buffer
84
*/
85
public int sendBuf();
86
87
/**
88
* SASL roundtrip timeout in milliseconds.
89
*
90
* @return Timeout for SASL authentication
91
*/
92
public int saslRTTimeoutMs();
93
94
/**
95
* Maximum number of I/O retry attempts.
96
*
97
* @return Number of times to retry failed I/O operations
98
*/
99
public int maxIORetries();
100
101
/**
102
* Wait time between I/O retry attempts in milliseconds.
103
*
104
* @return Delay between retry attempts
105
*/
106
public int ioRetryWaitTimeMs();
107
108
/**
109
* Threshold for memory mapping files instead of reading into heap.
110
*
111
* @return Size threshold in bytes for memory mapping
112
*/
113
public int memoryMapBytes();
114
115
/**
116
* Whether to use lazy file descriptor allocation.
117
*
118
* @return true to defer file opening until needed
119
*/
120
public boolean lazyFileDescriptor();
121
122
/**
123
* Maximum number of port binding retry attempts.
124
*
125
* @return Number of times to retry port binding
126
*/
127
public int portMaxRetries();
128
129
/**
130
* Maximum size of encrypted blocks when using SASL encryption.
131
*
132
* @return Maximum encrypted block size in bytes
133
*/
134
public int maxSaslEncryptedBlockSize();
135
136
/**
137
* Whether the server should always encrypt data (when SASL is enabled).
138
*
139
* @return true to require encryption on server side
140
*/
141
public boolean saslServerAlwaysEncrypt();
142
143
/**
144
* Gets the configuration module name.
145
*
146
* @return The module name used for configuration keys
147
*/
148
public String getModule();
149
150
/**
151
* Gets a raw configuration value.
152
*
153
* @param name The configuration key name
154
* @return The configuration value or null if not set
155
*/
156
public String get(String name);
157
158
/**
159
* Gets a raw configuration value with default.
160
*
161
* @param name The configuration key name
162
* @param defaultValue Default value if key is not set
163
* @return The configuration value or default
164
*/
165
public String get(String name, String defaultValue);
166
}
167
```
168
169
### Configuration Provider Framework
170
171
#### ConfigProvider
172
173
```java { .api }
174
/**
175
* Abstract base class for configuration providers.
176
* Implementations load configuration from different sources (properties, maps, etc.).
177
*/
178
public abstract class ConfigProvider {
179
/**
180
* Gets a configuration value by name.
181
*
182
* @param name The configuration key name
183
* @return The configuration value or null if not found
184
*/
185
public abstract String get(String name);
186
187
/**
188
* Gets a configuration value with a default.
189
*
190
* @param name The configuration key name
191
* @param defaultValue Default value if key is not found
192
* @return The configuration value or default
193
*/
194
public String get(String name, String defaultValue) {
195
String value = get(name);
196
return value != null ? value : defaultValue;
197
}
198
199
/**
200
* Gets an integer configuration value with default.
201
*
202
* @param name The configuration key name
203
* @param defaultValue Default value if key is not found or invalid
204
* @return The integer value or default
205
*/
206
public int getInt(String name, int defaultValue) {
207
String value = get(name);
208
if (value != null) {
209
try {
210
return Integer.parseInt(value);
211
} catch (NumberFormatException e) {
212
// Fall through to default
213
}
214
}
215
return defaultValue;
216
}
217
218
/**
219
* Gets a long configuration value with default.
220
*
221
* @param name The configuration key name
222
* @param defaultValue Default value if key is not found or invalid
223
* @return The long value or default
224
*/
225
public long getLong(String name, long defaultValue) {
226
String value = get(name);
227
if (value != null) {
228
try {
229
return Long.parseLong(value);
230
} catch (NumberFormatException e) {
231
// Fall through to default
232
}
233
}
234
return defaultValue;
235
}
236
237
/**
238
* Gets a double configuration value with default.
239
*
240
* @param name The configuration key name
241
* @param defaultValue Default value if key is not found or invalid
242
* @return The double value or default
243
*/
244
public double getDouble(String name, double defaultValue) {
245
String value = get(name);
246
if (value != null) {
247
try {
248
return Double.parseDouble(value);
249
} catch (NumberFormatException e) {
250
// Fall through to default
251
}
252
}
253
return defaultValue;
254
}
255
256
/**
257
* Gets a boolean configuration value with default.
258
*
259
* @param name The configuration key name
260
* @param defaultValue Default value if key is not found or invalid
261
* @return The boolean value or default
262
*/
263
public boolean getBoolean(String name, boolean defaultValue) {
264
String value = get(name);
265
if (value != null) {
266
return Boolean.parseBoolean(value);
267
}
268
return defaultValue;
269
}
270
}
271
```
272
273
#### MapConfigProvider
274
275
```java { .api }
276
/**
277
* Configuration provider backed by a Map.
278
* Useful for programmatic configuration or testing.
279
*/
280
public class MapConfigProvider extends ConfigProvider {
281
/**
282
* Creates a map-based configuration provider.
283
*
284
* @param config Map containing configuration key-value pairs
285
*/
286
public MapConfigProvider(Map<String, String> config);
287
288
@Override
289
public String get(String name) {
290
return config.get(name);
291
}
292
}
293
```
294
295
#### SystemPropertyConfigProvider
296
297
```java { .api }
298
/**
299
* Configuration provider that loads values from Java system properties.
300
* Useful for configuration via command-line properties.
301
*/
302
public class SystemPropertyConfigProvider extends ConfigProvider {
303
/**
304
* Creates a system property configuration provider.
305
*/
306
public SystemPropertyConfigProvider();
307
308
@Override
309
public String get(String name) {
310
return System.getProperty(name);
311
}
312
}
313
```
314
315
### Utility Enumerations
316
317
#### ByteUnit
318
319
```java { .api }
320
/**
321
* Enumeration for byte size units with conversion utilities.
322
* Provides convenient methods for converting between different byte units.
323
*/
324
public enum ByteUnit {
325
BYTE(1),
326
KiB(1024L),
327
MiB(1024L * 1024L),
328
GiB(1024L * 1024L * 1024L),
329
TiB(1024L * 1024L * 1024L * 1024L),
330
PiB(1024L * 1024L * 1024L * 1024L * 1024L);
331
332
private final long multiplier;
333
334
ByteUnit(long multiplier) {
335
this.multiplier = multiplier;
336
}
337
338
/**
339
* Converts a value from another unit to this unit.
340
*
341
* @param d The value to convert
342
* @param u The source unit
343
* @return The converted value in this unit
344
*/
345
public long convertFrom(long d, ByteUnit u) {
346
return (d * u.multiplier) / this.multiplier;
347
}
348
349
/**
350
* Converts a value from this unit to another unit.
351
*
352
* @param d The value to convert
353
* @param u The target unit
354
* @return The converted value in the target unit
355
*/
356
public long convertTo(long d, ByteUnit u) {
357
return (d * this.multiplier) / u.multiplier;
358
}
359
360
/**
361
* Converts a value in this unit to bytes.
362
*
363
* @param d The value in this unit
364
* @return The value in bytes as a double
365
*/
366
public double toBytes(long d) {
367
return d * multiplier;
368
}
369
370
/**
371
* Converts a value in this unit to KiB.
372
*
373
* @param d The value in this unit
374
* @return The value in KiB
375
*/
376
public long toKiB(long d) {
377
return convertTo(d, KiB);
378
}
379
380
/**
381
* Converts a value in this unit to MiB.
382
*
383
* @param d The value in this unit
384
* @return The value in MiB
385
*/
386
public long toMiB(long d) {
387
return convertTo(d, MiB);
388
}
389
390
/**
391
* Converts a value in this unit to GiB.
392
*
393
* @param d The value in this unit
394
* @return The value in GiB
395
*/
396
public long toGiB(long d) {
397
return convertTo(d, GiB);
398
}
399
400
/**
401
* Converts a value in this unit to TiB.
402
*
403
* @param d The value in this unit
404
* @return The value in TiB
405
*/
406
public long toTiB(long d) {
407
return convertTo(d, TiB);
408
}
409
410
/**
411
* Converts a value in this unit to PiB.
412
*
413
* @param d The value in this unit
414
* @return The value in PiB
415
*/
416
public long toPiB(long d) {
417
return convertTo(d, PiB);
418
}
419
}
420
```
421
422
#### IOMode
423
424
```java { .api }
425
/**
426
* Enumeration for I/O modes supported by the transport layer.
427
* Different modes provide different performance characteristics.
428
*/
429
public enum IOMode {
430
/**
431
* Standard Java NIO - works on all platforms but may have lower performance
432
*/
433
NIO,
434
435
/**
436
* Linux EPOLL - higher performance on Linux systems, requires native library
437
*/
438
EPOLL
439
}
440
```
441
442
### Java Utility Classes
443
444
#### JavaUtils
445
446
```java { .api }
447
/**
448
* General utility methods for Java operations used throughout the transport layer.
449
*/
450
public class JavaUtils {
451
/**
452
* Closes a Closeable resource quietly, ignoring any exceptions.
453
*
454
* @param closeable The resource to close (can be null)
455
*/
456
public static void closeQuietly(Closeable closeable) {
457
if (closeable != null) {
458
try {
459
closeable.close();
460
} catch (IOException e) {
461
// Ignore exception
462
}
463
}
464
}
465
466
/**
467
* Parses a time string (e.g., "30s", "5m", "2h") to seconds.
468
*
469
* @param str The time string to parse
470
* @return Time in seconds
471
* @throws NumberFormatException if string format is invalid
472
*/
473
public static long timeStringAsSec(String str) {
474
return parseTimeString(str, TimeUnit.SECONDS);
475
}
476
477
/**
478
* Parses a time string to the specified time unit.
479
*
480
* @param str The time string to parse
481
* @param unit The target time unit
482
* @return Time in the specified unit
483
*/
484
public static long timeStringAs(String str, TimeUnit unit) {
485
return parseTimeString(str, unit);
486
}
487
488
/**
489
* Parses a byte string (e.g., "100k", "64m", "1g") to bytes.
490
*
491
* @param str The byte string to parse
492
* @return Size in bytes
493
* @throws NumberFormatException if string format is invalid
494
*/
495
public static long byteStringAsBytes(String str) {
496
return parseByteString(str);
497
}
498
499
/**
500
* Formats bytes as a human-readable string.
501
*
502
* @param size Size in bytes
503
* @return Formatted string (e.g., "1.5 GB", "256 MB")
504
*/
505
public static String bytesToString(long size) {
506
return formatBytes(size);
507
}
508
509
/**
510
* Gets the system property or environment variable with the given name.
511
* Checks system properties first, then environment variables.
512
*
513
* @param name The property/variable name
514
* @param defaultValue Default value if not found
515
* @return The property value or default
516
*/
517
public static String getSystemProperty(String name, String defaultValue) {
518
String value = System.getProperty(name);
519
if (value == null) {
520
value = System.getenv(name);
521
}
522
return value != null ? value : defaultValue;
523
}
524
}
525
```
526
527
#### NettyUtils
528
529
```java { .api }
530
/**
531
* Utility methods for Netty integration and configuration.
532
* Provides helpers for creating Netty components with proper settings.
533
*/
534
public class NettyUtils {
535
/**
536
* Gets the remote address from a Netty channel as a string.
537
*
538
* @param channel The Netty channel
539
* @return String representation of remote address
540
*/
541
public static String getRemoteAddress(Channel channel) {
542
if (channel != null && channel.remoteAddress() != null) {
543
return channel.remoteAddress().toString();
544
}
545
return "unknown";
546
}
547
548
/**
549
* Creates an EventLoopGroup for the specified I/O mode.
550
*
551
* @param mode The I/O mode (NIO or EPOLL)
552
* @param numThreads Number of threads in the event loop
553
* @param threadPrefix Prefix for thread names
554
* @return Configured EventLoopGroup
555
*/
556
public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String threadPrefix) {
557
ThreadFactory threadFactory = createThreadFactory(threadPrefix);
558
559
switch (mode) {
560
case NIO:
561
return new NioEventLoopGroup(numThreads, threadFactory);
562
case EPOLL:
563
return new EpollEventLoopGroup(numThreads, threadFactory);
564
default:
565
throw new IllegalArgumentException("Unknown I/O mode: " + mode);
566
}
567
}
568
569
/**
570
* Gets the server channel class for the specified I/O mode.
571
*
572
* @param mode The I/O mode
573
* @return ServerChannel class for the mode
574
*/
575
public static Class<? extends ServerChannel> getServerChannelClass(IOMode mode) {
576
switch (mode) {
577
case NIO:
578
return NioServerSocketChannel.class;
579
case EPOLL:
580
return EpollServerSocketChannel.class;
581
default:
582
throw new IllegalArgumentException("Unknown I/O mode: " + mode);
583
}
584
}
585
586
/**
587
* Gets the client channel class for the specified I/O mode.
588
*
589
* @param mode The I/O mode
590
* @return Channel class for the mode
591
*/
592
public static Class<? extends Channel> getClientChannelClass(IOMode mode) {
593
switch (mode) {
594
case NIO:
595
return NioSocketChannel.class;
596
case EPOLL:
597
return EpollSocketChannel.class;
598
default:
599
throw new IllegalArgumentException("Unknown I/O mode: " + mode);
600
}
601
}
602
603
/**
604
* Creates a pooled ByteBuf allocator with specified settings.
605
*
606
* @param allowDirectBufs Whether to allow direct buffer allocation
607
* @param allowCache Whether to enable buffer caching
608
* @param numCores Number of CPU cores (affects pool sizing)
609
* @return Configured PooledByteBufAllocator
610
*/
611
public static PooledByteBufAllocator createPooledByteBufAllocator(
612
boolean allowDirectBufs, boolean allowCache, int numCores) {
613
614
int numDirectArenas = allowDirectBufs ? numCores : 0;
615
int numHeapArenas = numCores;
616
617
return new PooledByteBufAllocator(
618
allowDirectBufs, // preferDirect
619
numHeapArenas, // nHeapArena
620
numDirectArenas, // nDirectArena
621
8192, // pageSize
622
11, // maxOrder
623
64, // tinyCacheSize
624
32, // smallCacheSize
625
8, // normalCacheSize
626
allowCache // useCacheForAllThreads
627
);
628
}
629
630
/**
631
* Creates a frame decoder for the transport protocol.
632
*
633
* @return Configured TransportFrameDecoder
634
*/
635
public static TransportFrameDecoder createFrameDecoder() {
636
return new TransportFrameDecoder();
637
}
638
639
/**
640
* Configures common channel options for Spark transport.
641
*
642
* @param bootstrap The bootstrap to configure
643
* @param conf Transport configuration
644
*/
645
public static void configureChannelOptions(Bootstrap bootstrap, TransportConf conf) {
646
bootstrap.option(ChannelOption.ALLOCATOR, createPooledByteBufAllocator(
647
conf.preferDirectBufs(), true, Runtime.getRuntime().availableProcessors()));
648
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs());
649
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
650
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
651
bootstrap.option(ChannelOption.SO_RCVBUF, conf.receiveBuf());
652
bootstrap.option(ChannelOption.SO_SNDBUF, conf.sendBuf());
653
}
654
655
/**
656
* Configures common channel options for server bootstrap.
657
*
658
* @param bootstrap The server bootstrap to configure
659
* @param conf Transport configuration
660
*/
661
public static void configureServerChannelOptions(ServerBootstrap bootstrap, TransportConf conf) {
662
bootstrap.option(ChannelOption.ALLOCATOR, createPooledByteBufAllocator(
663
conf.preferDirectBufs(), true, Runtime.getRuntime().availableProcessors()));
664
bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
665
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
666
667
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
668
bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
669
bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
670
}
671
}
672
```
673
674
### Specialized Utility Classes
675
676
#### TransportFrameDecoder
677
678
```java { .api }
679
/**
680
* Netty decoder for transport protocol frames.
681
* Handles frame parsing and message boundary detection.
682
*/
683
public class TransportFrameDecoder extends ByteToMessageDecoder {
684
/** Maximum frame size to prevent memory exhaustion */
685
public static final int MAX_FRAME_SIZE = Integer.MAX_VALUE;
686
687
@Override
688
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
689
// Decodes length-prefixed frames from the transport protocol
690
// Implementation handles partial frames and validates frame sizes
691
}
692
693
/**
694
* Gets the maximum allowed frame size.
695
*
696
* @return Maximum frame size in bytes
697
*/
698
public int getMaxFrameSize() {
699
return MAX_FRAME_SIZE;
700
}
701
}
702
```
703
704
#### ByteArrayWritableChannel
705
706
```java { .api }
707
/**
708
* WritableByteChannel implementation backed by a byte array.
709
* Useful for collecting data written to a channel into memory.
710
*/
711
public class ByteArrayWritableChannel implements WritableByteChannel {
712
/**
713
* Creates a byte array writable channel.
714
*
715
* @param initialCapacity Initial capacity of the internal buffer
716
*/
717
public ByteArrayWritableChannel(int initialCapacity);
718
719
@Override
720
public int write(ByteBuffer src) throws IOException {
721
// Writes data from ByteBuffer to internal byte array
722
int remaining = src.remaining();
723
// Implementation copies data and grows array as needed
724
return remaining;
725
}
726
727
@Override
728
public boolean isOpen() {
729
return open;
730
}
731
732
@Override
733
public void close() throws IOException {
734
open = false;
735
}
736
737
/**
738
* Gets the current data as a byte array.
739
*
740
* @return Copy of the accumulated data
741
*/
742
public byte[] getData() {
743
return Arrays.copyOf(buffer, position);
744
}
745
746
/**
747
* Gets the number of bytes written.
748
*
749
* @return Number of bytes in the buffer
750
*/
751
public int size() {
752
return position;
753
}
754
755
/**
756
* Resets the channel to empty state.
757
*/
758
public void reset() {
759
position = 0;
760
}
761
}
762
```
763
764
#### LimitedInputStream
765
766
```java { .api }
767
/**
768
* FilterInputStream that limits the number of bytes that can be read.
769
* Useful for reading only a portion of a larger stream.
770
*/
771
public class LimitedInputStream extends FilterInputStream {
772
/**
773
* Creates a limited input stream.
774
*
775
* @param in The underlying input stream
776
* @param limit Maximum number of bytes to read
777
*/
778
public LimitedInputStream(InputStream in, long limit);
779
780
@Override
781
public int read() throws IOException {
782
if (remaining <= 0) {
783
return -1; // EOF
784
}
785
786
int result = super.read();
787
if (result != -1) {
788
remaining--;
789
}
790
return result;
791
}
792
793
@Override
794
public int read(byte[] b, int off, int len) throws IOException {
795
if (remaining <= 0) {
796
return -1; // EOF
797
}
798
799
len = (int) Math.min(len, remaining);
800
int result = super.read(b, off, len);
801
if (result > 0) {
802
remaining -= result;
803
}
804
return result;
805
}
806
807
/**
808
* Gets the number of bytes remaining to be read.
809
*
810
* @return Remaining byte count
811
*/
812
public long getRemaining() {
813
return remaining;
814
}
815
}
816
```
817
818
## Usage Examples
819
820
### Configuration Management
821
822
```java
823
import org.apache.spark.network.util.*;
824
import java.util.HashMap;
825
import java.util.Map;
826
827
// Create configuration from multiple sources
828
public class ConfigurationExample {
829
public static TransportConf createConfiguration() {
830
// Start with system properties
831
ConfigProvider systemConfig = new SystemPropertyConfigProvider();
832
833
// Override with application-specific settings
834
Map<String, String> appConfig = new HashMap<>();
835
appConfig.put("spark.network.io.mode", "EPOLL");
836
appConfig.put("spark.network.io.numConnectionsPerPeer", "3");
837
appConfig.put("spark.network.io.serverThreads", "8");
838
appConfig.put("spark.network.io.clientThreads", "8");
839
appConfig.put("spark.network.io.preferDirectBufs", "true");
840
appConfig.put("spark.network.io.connectionTimeout", "60s");
841
842
// Create layered configuration
843
ConfigProvider layeredConfig = new LayeredConfigProvider(
844
new MapConfigProvider(appConfig), // Higher priority
845
systemConfig // Lower priority
846
);
847
848
return new TransportConf("spark.network", layeredConfig);
849
}
850
851
// Custom layered config provider
852
private static class LayeredConfigProvider extends ConfigProvider {
853
private final ConfigProvider[] providers;
854
855
public LayeredConfigProvider(ConfigProvider... providers) {
856
this.providers = providers;
857
}
858
859
@Override
860
public String get(String name) {
861
for (ConfigProvider provider : providers) {
862
String value = provider.get(name);
863
if (value != null) {
864
return value;
865
}
866
}
867
return null;
868
}
869
}
870
}
871
```
872
873
### Byte Unit Conversions
874
875
```java
876
import org.apache.spark.network.util.ByteUnit;
877
878
public class ByteUnitExample {
879
public static void demonstrateConversions() {
880
// Convert different units
881
long sizeInBytes = ByteUnit.GiB.toBytes(2); // 2 GB in bytes
882
long sizeInMB = ByteUnit.BYTE.toMiB(1024 * 1024 * 1024); // 1 GB in MB
883
884
// Convert between units
885
long kbToMb = ByteUnit.KiB.convertTo(1024, ByteUnit.MiB); // 1024 KB to MB
886
long mbToGb = ByteUnit.MiB.convertTo(2048, ByteUnit.GiB); // 2048 MB to GB
887
888
System.out.println("2 GiB = " + sizeInBytes + " bytes");
889
System.out.println("1 GiB = " + sizeInMB + " MiB");
890
System.out.println("1024 KiB = " + kbToMb + " MiB");
891
System.out.println("2048 MiB = " + mbToGb + " GiB");
892
}
893
894
public static long parseConfigSize(String sizeStr) {
895
// Parse configuration size strings
896
if (sizeStr.endsWith("k") || sizeStr.endsWith("K")) {
897
long value = Long.parseLong(sizeStr.substring(0, sizeStr.length() - 1));
898
return ByteUnit.KiB.toBytes(value);
899
} else if (sizeStr.endsWith("m") || sizeStr.endsWith("M")) {
900
long value = Long.parseLong(sizeStr.substring(0, sizeStr.length() - 1));
901
return ByteUnit.MiB.toBytes(value);
902
} else if (sizeStr.endsWith("g") || sizeStr.endsWith("G")) {
903
long value = Long.parseLong(sizeStr.substring(0, sizeStr.length() - 1));
904
return ByteUnit.GiB.toBytes(value);
905
} else {
906
return Long.parseLong(sizeStr); // Assume bytes
907
}
908
}
909
}
910
```
911
912
### Netty Integration
913
914
```java
915
import org.apache.spark.network.util.NettyUtils;
916
import io.netty.bootstrap.Bootstrap;
917
import io.netty.bootstrap.ServerBootstrap;
918
919
public class NettySetupExample {
920
public static void setupNettyClient(TransportConf conf) {
921
IOMode ioMode = IOMode.valueOf(conf.ioMode());
922
923
// Create event loop
924
EventLoopGroup workerGroup = NettyUtils.createEventLoop(
925
ioMode,
926
conf.clientThreads(),
927
"spark-client"
928
);
929
930
// Create bootstrap
931
Bootstrap bootstrap = new Bootstrap();
932
bootstrap.group(workerGroup)
933
.channel(NettyUtils.getClientChannelClass(ioMode));
934
935
// Configure options
936
NettyUtils.configureChannelOptions(bootstrap, conf);
937
938
// Set up pipeline
939
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
940
@Override
941
protected void initChannel(SocketChannel ch) {
942
ChannelPipeline pipeline = ch.pipeline();
943
944
// Add frame decoder
945
pipeline.addLast("frameDecoder", NettyUtils.createFrameDecoder());
946
947
// Add other handlers...
948
}
949
});
950
}
951
952
public static void setupNettyServer(TransportConf conf) {
953
IOMode ioMode = IOMode.valueOf(conf.ioMode());
954
955
// Create event loops
956
EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, 1, "spark-server-boss");
957
EventLoopGroup workerGroup = NettyUtils.createEventLoop(
958
ioMode,
959
conf.serverThreads(),
960
"spark-server-worker"
961
);
962
963
// Create server bootstrap
964
ServerBootstrap bootstrap = new ServerBootstrap();
965
bootstrap.group(bossGroup, workerGroup)
966
.channel(NettyUtils.getServerChannelClass(ioMode));
967
968
// Configure options
969
NettyUtils.configureServerChannelOptions(bootstrap, conf);
970
971
// Set up child handler
972
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
973
@Override
974
protected void initChannel(SocketChannel ch) {
975
// Set up server pipeline
976
}
977
});
978
}
979
}
980
```
981
982
### Utility Operations
983
984
```java
985
import org.apache.spark.network.util.JavaUtils;
986
987
public class UtilityExample {
988
public void demonstrateUtils() {
989
// Parse time strings
990
long timeoutSec = JavaUtils.timeStringAsSec("30s"); // 30 seconds
991
long timeoutMs = JavaUtils.timeStringAs("5m", TimeUnit.MILLISECONDS); // 5 minutes in ms
992
993
// Parse byte strings
994
long bufferSize = JavaUtils.byteStringAsBytes("64m"); // 64 MB in bytes
995
996
// Format bytes
997
String formatted = JavaUtils.bytesToString(1024 * 1024 * 1024); // "1 GB"
998
999
// Safe resource cleanup
1000
FileInputStream fis = null;
1001
try {
1002
fis = new FileInputStream("data.txt");
1003
// Use stream...
1004
} finally {
1005
JavaUtils.closeQuietly(fis); // Won't throw exception
1006
}
1007
1008
// Get configuration from system property or environment
1009
String dataDir = JavaUtils.getSystemProperty("SPARK_DATA_DIR", "/tmp/spark");
1010
}
1011
1012
public void configurationValidation(TransportConf conf) {
1013
// Validate configuration values
1014
if (conf.connectionTimeoutMs() <= 0) {
1015
throw new IllegalArgumentException("Connection timeout must be positive");
1016
}
1017
1018
if (conf.numConnectionsPerPeer() < 1) {
1019
throw new IllegalArgumentException("Must have at least 1 connection per peer");
1020
}
1021
1022
// Check I/O mode availability
1023
IOMode ioMode = IOMode.valueOf(conf.ioMode());
1024
if (ioMode == IOMode.EPOLL && !isLinux()) {
1025
System.err.println("EPOLL mode only available on Linux, falling back to NIO");
1026
// Override configuration...
1027
}
1028
}
1029
1030
private boolean isLinux() {
1031
return System.getProperty("os.name").toLowerCase().contains("linux");
1032
}
1033
}