or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

buffer-management.mdclient-operations.mdconfiguration-utilities.mdindex.mdmessage-protocol.mdsasl-authentication.mdserver-operations.mdtransport-setup.md

configuration-utilities.mddocs/

0

# Configuration and Utilities

1

2

Configuration system and utility classes for transport settings, Netty integration, and Java operations. The configuration framework provides centralized management of transport behavior with multiple provider implementations.

3

4

## Capabilities

5

6

### TransportConf

7

8

Central configuration class managing all transport layer settings and behavior.

9

10

```java { .api }

11

/**

12

* TransportConf manages configuration settings for the Spark transport layer.

13

* It provides typed access to all networking configuration options with defaults.

14

*/

15

public class TransportConf {

16

/**

17

* Creates a transport configuration.

18

*

19

* @param module The configuration module name (used as key prefix)

20

* @param conf The configuration provider for loading settings

21

*/

22

public TransportConf(String module, ConfigProvider conf);

23

24

/**

25

* Gets the I/O mode for network operations.

26

*

27

* @return "NIO" or "EPOLL" (Linux only)

28

*/

29

public String ioMode();

30

31

/**

32

* Whether to prefer direct ByteBuffers for better performance.

33

*

34

* @return true to prefer direct buffers

35

*/

36

public boolean preferDirectBufs();

37

38

/**

39

* Connection timeout in milliseconds.

40

*

41

* @return Timeout for establishing connections

42

*/

43

public int connectionTimeoutMs();

44

45

/**

46

* Maximum number of connections per remote peer.

47

*

48

* @return Number of connections to maintain per peer

49

*/

50

public int numConnectionsPerPeer();

51

52

/**

53

* Server socket backlog size.

54

*

55

* @return Number of pending connections to queue

56

*/

57

public int backLog();

58

59

/**

60

* Number of server worker threads.

61

*

62

* @return Thread count for server-side operations

63

*/

64

public int serverThreads();

65

66

/**

67

* Number of client worker threads.

68

*

69

* @return Thread count for client-side operations

70

*/

71

public int clientThreads();

72

73

/**

74

* Socket receive buffer size in bytes.

75

*

76

* @return Size of socket receive buffer

77

*/

78

public int receiveBuf();

79

80

/**

81

* Socket send buffer size in bytes.

82

*

83

* @return Size of socket send buffer

84

*/

85

public int sendBuf();

86

87

/**

88

* SASL roundtrip timeout in milliseconds.

89

*

90

* @return Timeout for SASL authentication

91

*/

92

public int saslRTTimeoutMs();

93

94

/**

95

* Maximum number of I/O retry attempts.

96

*

97

* @return Number of times to retry failed I/O operations

98

*/

99

public int maxIORetries();

100

101

/**

102

* Wait time between I/O retry attempts in milliseconds.

103

*

104

* @return Delay between retry attempts

105

*/

106

public int ioRetryWaitTimeMs();

107

108

/**

109

* Threshold for memory mapping files instead of reading into heap.

110

*

111

* @return Size threshold in bytes for memory mapping

112

*/

113

public int memoryMapBytes();

114

115

/**

116

* Whether to use lazy file descriptor allocation.

117

*

118

* @return true to defer file opening until needed

119

*/

120

public boolean lazyFileDescriptor();

121

122

/**

123

* Maximum number of port binding retry attempts.

124

*

125

* @return Number of times to retry port binding

126

*/

127

public int portMaxRetries();

128

129

/**

130

* Maximum size of encrypted blocks when using SASL encryption.

131

*

132

* @return Maximum encrypted block size in bytes

133

*/

134

public int maxSaslEncryptedBlockSize();

135

136

/**

137

* Whether the server should always encrypt data (when SASL is enabled).

138

*

139

* @return true to require encryption on server side

140

*/

141

public boolean saslServerAlwaysEncrypt();

142

143

/**

144

* Gets the configuration module name.

145

*

146

* @return The module name used for configuration keys

147

*/

148

public String getModule();

149

150

/**

151

* Gets a raw configuration value.

152

*

153

* @param name The configuration key name

154

* @return The configuration value or null if not set

155

*/

156

public String get(String name);

157

158

/**

159

* Gets a raw configuration value with default.

160

*

161

* @param name The configuration key name

162

* @param defaultValue Default value if key is not set

163

* @return The configuration value or default

164

*/

165

public String get(String name, String defaultValue);

166

}

167

```

168

169

### Configuration Provider Framework

170

171

#### ConfigProvider

172

173

```java { .api }

174

/**

175

* Abstract base class for configuration providers.

176

* Implementations load configuration from different sources (properties, maps, etc.).

177

*/

178

public abstract class ConfigProvider {

179

/**

180

* Gets a configuration value by name.

181

*

182

* @param name The configuration key name

183

* @return The configuration value or null if not found

184

*/

185

public abstract String get(String name);

186

187

/**

188

* Gets a configuration value with a default.

189

*

190

* @param name The configuration key name

191

* @param defaultValue Default value if key is not found

192

* @return The configuration value or default

193

*/

194

public String get(String name, String defaultValue) {

195

String value = get(name);

196

return value != null ? value : defaultValue;

197

}

198

199

/**

200

* Gets an integer configuration value with default.

201

*

202

* @param name The configuration key name

203

* @param defaultValue Default value if key is not found or invalid

204

* @return The integer value or default

205

*/

206

public int getInt(String name, int defaultValue) {

207

String value = get(name);

208

if (value != null) {

209

try {

210

return Integer.parseInt(value);

211

} catch (NumberFormatException e) {

212

// Fall through to default

213

}

214

}

215

return defaultValue;

216

}

217

218

/**

219

* Gets a long configuration value with default.

220

*

221

* @param name The configuration key name

222

* @param defaultValue Default value if key is not found or invalid

223

* @return The long value or default

224

*/

225

public long getLong(String name, long defaultValue) {

226

String value = get(name);

227

if (value != null) {

228

try {

229

return Long.parseLong(value);

230

} catch (NumberFormatException e) {

231

// Fall through to default

232

}

233

}

234

return defaultValue;

235

}

236

237

/**

238

* Gets a double configuration value with default.

239

*

240

* @param name The configuration key name

241

* @param defaultValue Default value if key is not found or invalid

242

* @return The double value or default

243

*/

244

public double getDouble(String name, double defaultValue) {

245

String value = get(name);

246

if (value != null) {

247

try {

248

return Double.parseDouble(value);

249

} catch (NumberFormatException e) {

250

// Fall through to default

251

}

252

}

253

return defaultValue;

254

}

255

256

/**

257

* Gets a boolean configuration value with default.

258

*

259

* @param name The configuration key name

260

* @param defaultValue Default value if key is not found or invalid

261

* @return The boolean value or default

262

*/

263

public boolean getBoolean(String name, boolean defaultValue) {

264

String value = get(name);

265

if (value != null) {

266

return Boolean.parseBoolean(value);

267

}

268

return defaultValue;

269

}

270

}

271

```

272

273

#### MapConfigProvider

274

275

```java { .api }

276

/**

277

* Configuration provider backed by a Map.

278

* Useful for programmatic configuration or testing.

279

*/

280

public class MapConfigProvider extends ConfigProvider {

281

/**

282

* Creates a map-based configuration provider.

283

*

284

* @param config Map containing configuration key-value pairs

285

*/

286

public MapConfigProvider(Map<String, String> config);

287

288

@Override

289

public String get(String name) {

290

return config.get(name);

291

}

292

}

293

```

294

295

#### SystemPropertyConfigProvider

296

297

```java { .api }

298

/**

299

* Configuration provider that loads values from Java system properties.

300

* Useful for configuration via command-line properties.

301

*/

302

public class SystemPropertyConfigProvider extends ConfigProvider {

303

/**

304

* Creates a system property configuration provider.

305

*/

306

public SystemPropertyConfigProvider();

307

308

@Override

309

public String get(String name) {

310

return System.getProperty(name);

311

}

312

}

313

```

314

315

### Utility Enumerations

316

317

#### ByteUnit

318

319

```java { .api }

320

/**

321

* Enumeration for byte size units with conversion utilities.

322

* Provides convenient methods for converting between different byte units.

323

*/

324

public enum ByteUnit {

325

BYTE(1),

326

KiB(1024L),

327

MiB(1024L * 1024L),

328

GiB(1024L * 1024L * 1024L),

329

TiB(1024L * 1024L * 1024L * 1024L),

330

PiB(1024L * 1024L * 1024L * 1024L * 1024L);

331

332

private final long multiplier;

333

334

ByteUnit(long multiplier) {

335

this.multiplier = multiplier;

336

}

337

338

/**

339

* Converts a value from another unit to this unit.

340

*

341

* @param d The value to convert

342

* @param u The source unit

343

* @return The converted value in this unit

344

*/

345

public long convertFrom(long d, ByteUnit u) {

346

return (d * u.multiplier) / this.multiplier;

347

}

348

349

/**

350

* Converts a value from this unit to another unit.

351

*

352

* @param d The value to convert

353

* @param u The target unit

354

* @return The converted value in the target unit

355

*/

356

public long convertTo(long d, ByteUnit u) {

357

return (d * this.multiplier) / u.multiplier;

358

}

359

360

/**

361

* Converts a value in this unit to bytes.

362

*

363

* @param d The value in this unit

364

* @return The value in bytes as a double

365

*/

366

public double toBytes(long d) {

367

return d * multiplier;

368

}

369

370

/**

371

* Converts a value in this unit to KiB.

372

*

373

* @param d The value in this unit

374

* @return The value in KiB

375

*/

376

public long toKiB(long d) {

377

return convertTo(d, KiB);

378

}

379

380

/**

381

* Converts a value in this unit to MiB.

382

*

383

* @param d The value in this unit

384

* @return The value in MiB

385

*/

386

public long toMiB(long d) {

387

return convertTo(d, MiB);

388

}

389

390

/**

391

* Converts a value in this unit to GiB.

392

*

393

* @param d The value in this unit

394

* @return The value in GiB

395

*/

396

public long toGiB(long d) {

397

return convertTo(d, GiB);

398

}

399

400

/**

401

* Converts a value in this unit to TiB.

402

*

403

* @param d The value in this unit

404

* @return The value in TiB

405

*/

406

public long toTiB(long d) {

407

return convertTo(d, TiB);

408

}

409

410

/**

411

* Converts a value in this unit to PiB.

412

*

413

* @param d The value in this unit

414

* @return The value in PiB

415

*/

416

public long toPiB(long d) {

417

return convertTo(d, PiB);

418

}

419

}

420

```

421

422

#### IOMode

423

424

```java { .api }

425

/**

426

* Enumeration for I/O modes supported by the transport layer.

427

* Different modes provide different performance characteristics.

428

*/

429

public enum IOMode {

430

/**

431

* Standard Java NIO - works on all platforms but may have lower performance

432

*/

433

NIO,

434

435

/**

436

* Linux EPOLL - higher performance on Linux systems, requires native library

437

*/

438

EPOLL

439

}

440

```

441

442

### Java Utility Classes

443

444

#### JavaUtils

445

446

```java { .api }

447

/**

448

* General utility methods for Java operations used throughout the transport layer.

449

*/

450

public class JavaUtils {

451

/**

452

* Closes a Closeable resource quietly, ignoring any exceptions.

453

*

454

* @param closeable The resource to close (can be null)

455

*/

456

public static void closeQuietly(Closeable closeable) {

457

if (closeable != null) {

458

try {

459

closeable.close();

460

} catch (IOException e) {

461

// Ignore exception

462

}

463

}

464

}

465

466

/**

467

* Parses a time string (e.g., "30s", "5m", "2h") to seconds.

468

*

469

* @param str The time string to parse

470

* @return Time in seconds

471

* @throws NumberFormatException if string format is invalid

472

*/

473

public static long timeStringAsSec(String str) {

474

return parseTimeString(str, TimeUnit.SECONDS);

475

}

476

477

/**

478

* Parses a time string to the specified time unit.

479

*

480

* @param str The time string to parse

481

* @param unit The target time unit

482

* @return Time in the specified unit

483

*/

484

public static long timeStringAs(String str, TimeUnit unit) {

485

return parseTimeString(str, unit);

486

}

487

488

/**

489

* Parses a byte string (e.g., "100k", "64m", "1g") to bytes.

490

*

491

* @param str The byte string to parse

492

* @return Size in bytes

493

* @throws NumberFormatException if string format is invalid

494

*/

495

public static long byteStringAsBytes(String str) {

496

return parseByteString(str);

497

}

498

499

/**

500

* Formats bytes as a human-readable string.

501

*

502

* @param size Size in bytes

503

* @return Formatted string (e.g., "1.5 GB", "256 MB")

504

*/

505

public static String bytesToString(long size) {

506

return formatBytes(size);

507

}

508

509

/**

510

* Gets the system property or environment variable with the given name.

511

* Checks system properties first, then environment variables.

512

*

513

* @param name The property/variable name

514

* @param defaultValue Default value if not found

515

* @return The property value or default

516

*/

517

public static String getSystemProperty(String name, String defaultValue) {

518

String value = System.getProperty(name);

519

if (value == null) {

520

value = System.getenv(name);

521

}

522

return value != null ? value : defaultValue;

523

}

524

}

525

```

526

527

#### NettyUtils

528

529

```java { .api }

530

/**

531

* Utility methods for Netty integration and configuration.

532

* Provides helpers for creating Netty components with proper settings.

533

*/

534

public class NettyUtils {

535

/**

536

* Gets the remote address from a Netty channel as a string.

537

*

538

* @param channel The Netty channel

539

* @return String representation of remote address

540

*/

541

public static String getRemoteAddress(Channel channel) {

542

if (channel != null && channel.remoteAddress() != null) {

543

return channel.remoteAddress().toString();

544

}

545

return "unknown";

546

}

547

548

/**

549

* Creates an EventLoopGroup for the specified I/O mode.

550

*

551

* @param mode The I/O mode (NIO or EPOLL)

552

* @param numThreads Number of threads in the event loop

553

* @param threadPrefix Prefix for thread names

554

* @return Configured EventLoopGroup

555

*/

556

public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String threadPrefix) {

557

ThreadFactory threadFactory = createThreadFactory(threadPrefix);

558

559

switch (mode) {

560

case NIO:

561

return new NioEventLoopGroup(numThreads, threadFactory);

562

case EPOLL:

563

return new EpollEventLoopGroup(numThreads, threadFactory);

564

default:

565

throw new IllegalArgumentException("Unknown I/O mode: " + mode);

566

}

567

}

568

569

/**

570

* Gets the server channel class for the specified I/O mode.

571

*

572

* @param mode The I/O mode

573

* @return ServerChannel class for the mode

574

*/

575

public static Class<? extends ServerChannel> getServerChannelClass(IOMode mode) {

576

switch (mode) {

577

case NIO:

578

return NioServerSocketChannel.class;

579

case EPOLL:

580

return EpollServerSocketChannel.class;

581

default:

582

throw new IllegalArgumentException("Unknown I/O mode: " + mode);

583

}

584

}

585

586

/**

587

* Gets the client channel class for the specified I/O mode.

588

*

589

* @param mode The I/O mode

590

* @return Channel class for the mode

591

*/

592

public static Class<? extends Channel> getClientChannelClass(IOMode mode) {

593

switch (mode) {

594

case NIO:

595

return NioSocketChannel.class;

596

case EPOLL:

597

return EpollSocketChannel.class;

598

default:

599

throw new IllegalArgumentException("Unknown I/O mode: " + mode);

600

}

601

}

602

603

/**

604

* Creates a pooled ByteBuf allocator with specified settings.

605

*

606

* @param allowDirectBufs Whether to allow direct buffer allocation

607

* @param allowCache Whether to enable buffer caching

608

* @param numCores Number of CPU cores (affects pool sizing)

609

* @return Configured PooledByteBufAllocator

610

*/

611

public static PooledByteBufAllocator createPooledByteBufAllocator(

612

boolean allowDirectBufs, boolean allowCache, int numCores) {

613

614

int numDirectArenas = allowDirectBufs ? numCores : 0;

615

int numHeapArenas = numCores;

616

617

return new PooledByteBufAllocator(

618

allowDirectBufs, // preferDirect

619

numHeapArenas, // nHeapArena

620

numDirectArenas, // nDirectArena

621

8192, // pageSize

622

11, // maxOrder

623

64, // tinyCacheSize

624

32, // smallCacheSize

625

8, // normalCacheSize

626

allowCache // useCacheForAllThreads

627

);

628

}

629

630

/**

631

* Creates a frame decoder for the transport protocol.

632

*

633

* @return Configured TransportFrameDecoder

634

*/

635

public static TransportFrameDecoder createFrameDecoder() {

636

return new TransportFrameDecoder();

637

}

638

639

/**

640

* Configures common channel options for Spark transport.

641

*

642

* @param bootstrap The bootstrap to configure

643

* @param conf Transport configuration

644

*/

645

public static void configureChannelOptions(Bootstrap bootstrap, TransportConf conf) {

646

bootstrap.option(ChannelOption.ALLOCATOR, createPooledByteBufAllocator(

647

conf.preferDirectBufs(), true, Runtime.getRuntime().availableProcessors()));

648

bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs());

649

bootstrap.option(ChannelOption.SO_KEEPALIVE, true);

650

bootstrap.option(ChannelOption.SO_REUSEADDR, true);

651

bootstrap.option(ChannelOption.SO_RCVBUF, conf.receiveBuf());

652

bootstrap.option(ChannelOption.SO_SNDBUF, conf.sendBuf());

653

}

654

655

/**

656

* Configures common channel options for server bootstrap.

657

*

658

* @param bootstrap The server bootstrap to configure

659

* @param conf Transport configuration

660

*/

661

public static void configureServerChannelOptions(ServerBootstrap bootstrap, TransportConf conf) {

662

bootstrap.option(ChannelOption.ALLOCATOR, createPooledByteBufAllocator(

663

conf.preferDirectBufs(), true, Runtime.getRuntime().availableProcessors()));

664

bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());

665

bootstrap.option(ChannelOption.SO_REUSEADDR, true);

666

667

bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);

668

bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());

669

bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());

670

}

671

}

672

```

673

674

### Specialized Utility Classes

675

676

#### TransportFrameDecoder

677

678

```java { .api }

679

/**

680

* Netty decoder for transport protocol frames.

681

* Handles frame parsing and message boundary detection.

682

*/

683

public class TransportFrameDecoder extends ByteToMessageDecoder {

684

/** Maximum frame size to prevent memory exhaustion */

685

public static final int MAX_FRAME_SIZE = Integer.MAX_VALUE;

686

687

@Override

688

protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

689

// Decodes length-prefixed frames from the transport protocol

690

// Implementation handles partial frames and validates frame sizes

691

}

692

693

/**

694

* Gets the maximum allowed frame size.

695

*

696

* @return Maximum frame size in bytes

697

*/

698

public int getMaxFrameSize() {

699

return MAX_FRAME_SIZE;

700

}

701

}

702

```

703

704

#### ByteArrayWritableChannel

705

706

```java { .api }

707

/**

708

* WritableByteChannel implementation backed by a byte array.

709

* Useful for collecting data written to a channel into memory.

710

*/

711

public class ByteArrayWritableChannel implements WritableByteChannel {

712

/**

713

* Creates a byte array writable channel.

714

*

715

* @param initialCapacity Initial capacity of the internal buffer

716

*/

717

public ByteArrayWritableChannel(int initialCapacity);

718

719

@Override

720

public int write(ByteBuffer src) throws IOException {

721

// Writes data from ByteBuffer to internal byte array

722

int remaining = src.remaining();

723

// Implementation copies data and grows array as needed

724

return remaining;

725

}

726

727

@Override

728

public boolean isOpen() {

729

return open;

730

}

731

732

@Override

733

public void close() throws IOException {

734

open = false;

735

}

736

737

/**

738

* Gets the current data as a byte array.

739

*

740

* @return Copy of the accumulated data

741

*/

742

public byte[] getData() {

743

return Arrays.copyOf(buffer, position);

744

}

745

746

/**

747

* Gets the number of bytes written.

748

*

749

* @return Number of bytes in the buffer

750

*/

751

public int size() {

752

return position;

753

}

754

755

/**

756

* Resets the channel to empty state.

757

*/

758

public void reset() {

759

position = 0;

760

}

761

}

762

```

763

764

#### LimitedInputStream

765

766

```java { .api }

767

/**

768

* FilterInputStream that limits the number of bytes that can be read.

769

* Useful for reading only a portion of a larger stream.

770

*/

771

public class LimitedInputStream extends FilterInputStream {

772

/**

773

* Creates a limited input stream.

774

*

775

* @param in The underlying input stream

776

* @param limit Maximum number of bytes to read

777

*/

778

public LimitedInputStream(InputStream in, long limit);

779

780

@Override

781

public int read() throws IOException {

782

if (remaining <= 0) {

783

return -1; // EOF

784

}

785

786

int result = super.read();

787

if (result != -1) {

788

remaining--;

789

}

790

return result;

791

}

792

793

@Override

794

public int read(byte[] b, int off, int len) throws IOException {

795

if (remaining <= 0) {

796

return -1; // EOF

797

}

798

799

len = (int) Math.min(len, remaining);

800

int result = super.read(b, off, len);

801

if (result > 0) {

802

remaining -= result;

803

}

804

return result;

805

}

806

807

/**

808

* Gets the number of bytes remaining to be read.

809

*

810

* @return Remaining byte count

811

*/

812

public long getRemaining() {

813

return remaining;

814

}

815

}

816

```

817

818

## Usage Examples

819

820

### Configuration Management

821

822

```java

823

import org.apache.spark.network.util.*;

824

import java.util.HashMap;

825

import java.util.Map;

826

827

// Create configuration from multiple sources

828

public class ConfigurationExample {

829

public static TransportConf createConfiguration() {

830

// Start with system properties

831

ConfigProvider systemConfig = new SystemPropertyConfigProvider();

832

833

// Override with application-specific settings

834

Map<String, String> appConfig = new HashMap<>();

835

appConfig.put("spark.network.io.mode", "EPOLL");

836

appConfig.put("spark.network.io.numConnectionsPerPeer", "3");

837

appConfig.put("spark.network.io.serverThreads", "8");

838

appConfig.put("spark.network.io.clientThreads", "8");

839

appConfig.put("spark.network.io.preferDirectBufs", "true");

840

appConfig.put("spark.network.io.connectionTimeout", "60s");

841

842

// Create layered configuration

843

ConfigProvider layeredConfig = new LayeredConfigProvider(

844

new MapConfigProvider(appConfig), // Higher priority

845

systemConfig // Lower priority

846

);

847

848

return new TransportConf("spark.network", layeredConfig);

849

}

850

851

// Custom layered config provider

852

private static class LayeredConfigProvider extends ConfigProvider {

853

private final ConfigProvider[] providers;

854

855

public LayeredConfigProvider(ConfigProvider... providers) {

856

this.providers = providers;

857

}

858

859

@Override

860

public String get(String name) {

861

for (ConfigProvider provider : providers) {

862

String value = provider.get(name);

863

if (value != null) {

864

return value;

865

}

866

}

867

return null;

868

}

869

}

870

}

871

```

872

873

### Byte Unit Conversions

874

875

```java

876

import org.apache.spark.network.util.ByteUnit;

877

878

public class ByteUnitExample {

879

public static void demonstrateConversions() {

880

// Convert different units

881

long sizeInBytes = ByteUnit.GiB.toBytes(2); // 2 GB in bytes

882

long sizeInMB = ByteUnit.BYTE.toMiB(1024 * 1024 * 1024); // 1 GB in MB

883

884

// Convert between units

885

long kbToMb = ByteUnit.KiB.convertTo(1024, ByteUnit.MiB); // 1024 KB to MB

886

long mbToGb = ByteUnit.MiB.convertTo(2048, ByteUnit.GiB); // 2048 MB to GB

887

888

System.out.println("2 GiB = " + sizeInBytes + " bytes");

889

System.out.println("1 GiB = " + sizeInMB + " MiB");

890

System.out.println("1024 KiB = " + kbToMb + " MiB");

891

System.out.println("2048 MiB = " + mbToGb + " GiB");

892

}

893

894

public static long parseConfigSize(String sizeStr) {

895

// Parse configuration size strings

896

if (sizeStr.endsWith("k") || sizeStr.endsWith("K")) {

897

long value = Long.parseLong(sizeStr.substring(0, sizeStr.length() - 1));

898

return ByteUnit.KiB.toBytes(value);

899

} else if (sizeStr.endsWith("m") || sizeStr.endsWith("M")) {

900

long value = Long.parseLong(sizeStr.substring(0, sizeStr.length() - 1));

901

return ByteUnit.MiB.toBytes(value);

902

} else if (sizeStr.endsWith("g") || sizeStr.endsWith("G")) {

903

long value = Long.parseLong(sizeStr.substring(0, sizeStr.length() - 1));

904

return ByteUnit.GiB.toBytes(value);

905

} else {

906

return Long.parseLong(sizeStr); // Assume bytes

907

}

908

}

909

}

910

```

911

912

### Netty Integration

913

914

```java

915

import org.apache.spark.network.util.NettyUtils;

916

import io.netty.bootstrap.Bootstrap;

917

import io.netty.bootstrap.ServerBootstrap;

918

919

public class NettySetupExample {

920

public static void setupNettyClient(TransportConf conf) {

921

IOMode ioMode = IOMode.valueOf(conf.ioMode());

922

923

// Create event loop

924

EventLoopGroup workerGroup = NettyUtils.createEventLoop(

925

ioMode,

926

conf.clientThreads(),

927

"spark-client"

928

);

929

930

// Create bootstrap

931

Bootstrap bootstrap = new Bootstrap();

932

bootstrap.group(workerGroup)

933

.channel(NettyUtils.getClientChannelClass(ioMode));

934

935

// Configure options

936

NettyUtils.configureChannelOptions(bootstrap, conf);

937

938

// Set up pipeline

939

bootstrap.handler(new ChannelInitializer<SocketChannel>() {

940

@Override

941

protected void initChannel(SocketChannel ch) {

942

ChannelPipeline pipeline = ch.pipeline();

943

944

// Add frame decoder

945

pipeline.addLast("frameDecoder", NettyUtils.createFrameDecoder());

946

947

// Add other handlers...

948

}

949

});

950

}

951

952

public static void setupNettyServer(TransportConf conf) {

953

IOMode ioMode = IOMode.valueOf(conf.ioMode());

954

955

// Create event loops

956

EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, 1, "spark-server-boss");

957

EventLoopGroup workerGroup = NettyUtils.createEventLoop(

958

ioMode,

959

conf.serverThreads(),

960

"spark-server-worker"

961

);

962

963

// Create server bootstrap

964

ServerBootstrap bootstrap = new ServerBootstrap();

965

bootstrap.group(bossGroup, workerGroup)

966

.channel(NettyUtils.getServerChannelClass(ioMode));

967

968

// Configure options

969

NettyUtils.configureServerChannelOptions(bootstrap, conf);

970

971

// Set up child handler

972

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

973

@Override

974

protected void initChannel(SocketChannel ch) {

975

// Set up server pipeline

976

}

977

});

978

}

979

}

980

```

981

982

### Utility Operations

983

984

```java

985

import org.apache.spark.network.util.JavaUtils;

986

987

public class UtilityExample {

988

public void demonstrateUtils() {

989

// Parse time strings

990

long timeoutSec = JavaUtils.timeStringAsSec("30s"); // 30 seconds

991

long timeoutMs = JavaUtils.timeStringAs("5m", TimeUnit.MILLISECONDS); // 5 minutes in ms

992

993

// Parse byte strings

994

long bufferSize = JavaUtils.byteStringAsBytes("64m"); // 64 MB in bytes

995

996

// Format bytes

997

String formatted = JavaUtils.bytesToString(1024 * 1024 * 1024); // "1 GB"

998

999

// Safe resource cleanup

1000

FileInputStream fis = null;

1001

try {

1002

fis = new FileInputStream("data.txt");

1003

// Use stream...

1004

} finally {

1005

JavaUtils.closeQuietly(fis); // Won't throw exception

1006

}

1007

1008

// Get configuration from system property or environment

1009

String dataDir = JavaUtils.getSystemProperty("SPARK_DATA_DIR", "/tmp/spark");

1010

}

1011

1012

public void configurationValidation(TransportConf conf) {

1013

// Validate configuration values

1014

if (conf.connectionTimeoutMs() <= 0) {

1015

throw new IllegalArgumentException("Connection timeout must be positive");

1016

}

1017

1018

if (conf.numConnectionsPerPeer() < 1) {

1019

throw new IllegalArgumentException("Must have at least 1 connection per peer");

1020

}

1021

1022

// Check I/O mode availability

1023

IOMode ioMode = IOMode.valueOf(conf.ioMode());

1024

if (ioMode == IOMode.EPOLL && !isLinux()) {

1025

System.err.println("EPOLL mode only available on Linux, falling back to NIO");

1026

// Override configuration...

1027

}

1028

}

1029

1030

private boolean isLinux() {

1031

return System.getProperty("os.name").toLowerCase().contains("linux");

1032

}

1033

}