or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

builtin-plugins.mdcaching.mdcookies.mdengine-configuration.mdforms.mdhttp-client.mdindex.mdplugin-system.mdrequest-building.mdresponse-handling.mdresponse-observation.mdutilities.mdwebsockets.md

websockets.mddocs/

0

# WebSocket Support

1

2

The Ktor HTTP Client Core provides comprehensive WebSocket client functionality through the `WebSockets` plugin. This enables establishing and managing WebSocket connections with session management, ping/pong handling, message serialization, and full integration with Ktor's coroutine-based architecture.

3

4

## Core WebSocket API

5

6

### WebSockets Plugin

7

8

The main plugin for WebSocket client functionality that handles connection establishment and session management.

9

10

```kotlin { .api }

11

object WebSockets : HttpClientPlugin<WebSockets.Config, WebSockets> {

12

class Config {

13

var pingInterval: Duration? = null

14

var maxFrameSize: Long = Long.MAX_VALUE

15

var masking: Boolean = true

16

var extensions: MutableList<WebSocketExtension<*>> = mutableListOf()

17

18

fun pingInterval(duration: Duration)

19

fun maxFrameSize(size: Long)

20

fun masking(enabled: Boolean)

21

fun extensions(vararg extensions: WebSocketExtension<*>)

22

}

23

}

24

```

25

26

### ClientWebSocketSession

27

28

Interface representing a client-side WebSocket session with full duplex communication capabilities.

29

30

```kotlin { .api }

31

interface ClientWebSocketSession : WebSocketSession {

32

val call: HttpClientCall

33

34

// Inherited from WebSocketSession

35

val incoming: ReceiveChannel<Frame>

36

val outgoing: SendChannel<Frame>

37

val closeReason: Deferred<CloseReason?>

38

val extensions: List<WebSocketExtension<*>>

39

40

// Send text message

41

suspend fun send(data: String)

42

43

// Send binary message

44

suspend fun send(data: ByteArray)

45

46

// Send frame

47

suspend fun send(frame: Frame)

48

49

// Close connection

50

suspend fun close(reason: CloseReason? = null)

51

}

52

```

53

54

### DefaultClientWebSocketSession

55

56

Default implementation of `ClientWebSocketSession` with standard WebSocket protocol handling.

57

58

```kotlin { .api }

59

class DefaultClientWebSocketSession(

60

private val call: HttpClientCall,

61

delegate: WebSocketSession

62

) : ClientWebSocketSession, WebSocketSession by delegate {

63

override val call: HttpClientCall get() = call

64

}

65

```

66

67

## WebSocket Connection Builders

68

69

### Basic WebSocket Connection

70

71

Extension functions for establishing WebSocket connections with various configuration options.

72

73

```kotlin { .api }

74

// Basic WebSocket connection

75

suspend fun HttpClient.webSocket(

76

method: HttpMethod = HttpMethod.Get,

77

host: String? = null,

78

port: Int? = null,

79

path: String? = null,

80

request: HttpRequestBuilder.() -> Unit = {},

81

block: suspend ClientWebSocketSession.() -> Unit

82

)

83

84

suspend fun HttpClient.webSocket(

85

url: String,

86

request: HttpRequestBuilder.() -> Unit = {},

87

block: suspend ClientWebSocketSession.() -> Unit

88

)

89

90

suspend fun HttpClient.webSocket(

91

url: Url,

92

request: HttpRequestBuilder.() -> Unit = {},

93

block: suspend ClientWebSocketSession.() -> Unit

94

)

95

96

// Secure WebSocket connection (WSS)

97

suspend fun HttpClient.wss(

98

method: HttpMethod = HttpMethod.Get,

99

host: String? = null,

100

port: Int? = null,

101

path: String? = null,

102

request: HttpRequestBuilder.() -> Unit = {},

103

block: suspend ClientWebSocketSession.() -> Unit

104

)

105

106

// WebSocket session without automatic connection handling

107

suspend fun HttpClient.webSocketSession(

108

method: HttpMethod = HttpMethod.Get,

109

host: String? = null,

110

port: Int? = null,

111

path: String? = null,

112

request: HttpRequestBuilder.() -> Unit = {}

113

): ClientWebSocketSession

114

115

suspend fun HttpClient.webSocketSession(

116

url: String,

117

request: HttpRequestBuilder.() -> Unit = {}

118

): ClientWebSocketSession

119

120

suspend fun HttpClient.ws(

121

method: HttpMethod = HttpMethod.Get,

122

host: String? = null,

123

port: Int? = null,

124

path: String? = null,

125

request: HttpRequestBuilder.() -> Unit = {},

126

block: suspend ClientWebSocketSession.() -> Unit

127

)

128

```

129

130

## Frame Types

131

132

### WebSocket Frame Hierarchy

133

134

```kotlin { .api }

135

sealed class Frame {

136

abstract val fin: Boolean

137

abstract val data: ByteArray

138

139

data class Text(val data: ByteArray, override val fin: Boolean = true) : Frame() {

140

constructor(text: String) : this(text.toByteArray(Charsets.UTF_8))

141

fun readText(): String = data.toString(Charsets.UTF_8)

142

}

143

144

data class Binary(override val data: ByteArray, override val fin: Boolean = true) : Frame()

145

146

data class Close(val data: ByteArray) : Frame() {

147

constructor(reason: CloseReason) : this(reason.toByteArray())

148

fun readReason(): CloseReason? = CloseReason.parse(data)

149

}

150

151

data class Ping(override val data: ByteArray) : Frame()

152

data class Pong(override val data: ByteArray) : Frame()

153

}

154

155

data class CloseReason(val code: Short, val message: String) {

156

companion object {

157

val NORMAL = CloseReason(1000, "Normal Closure")

158

val GOING_AWAY = CloseReason(1001, "Going Away")

159

val PROTOCOL_ERROR = CloseReason(1002, "Protocol Error")

160

val CANNOT_ACCEPT = CloseReason(1003, "Cannot Accept")

161

val NOT_CONSISTENT = CloseReason(1007, "Not Consistent")

162

val VIOLATED_POLICY = CloseReason(1008, "Violated Policy")

163

val TOO_BIG = CloseReason(1009, "Too Big")

164

val NO_EXTENSION = CloseReason(1010, "No Extension")

165

val INTERNAL_ERROR = CloseReason(1011, "Internal Error")

166

val SERVICE_RESTART = CloseReason(1012, "Service Restart")

167

val TRY_AGAIN_LATER = CloseReason(1013, "Try Again Later")

168

}

169

}

170

```

171

172

## Basic Usage

173

174

### Simple WebSocket Communication

175

176

```kotlin

177

val client = HttpClient {

178

install(WebSockets)

179

}

180

181

client.webSocket("wss://echo.websocket.org") {

182

// Send a message

183

send("Hello WebSocket!")

184

185

// Receive messages

186

for (frame in incoming) {

187

when (frame) {

188

is Frame.Text -> {

189

val receivedText = frame.readText()

190

println("Received: $receivedText")

191

192

if (receivedText == "Hello WebSocket!") {

193

send("Thanks for the echo!")

194

}

195

}

196

is Frame.Close -> {

197

println("Connection closed: ${frame.readReason()}")

198

break

199

}

200

else -> {

201

// Handle other frame types

202

}

203

}

204

}

205

}

206

207

client.close()

208

```

209

210

### Bidirectional Communication

211

212

```kotlin

213

val client = HttpClient {

214

install(WebSockets) {

215

pingInterval = 20.seconds

216

maxFrameSize = 1024 * 1024 // 1MB

217

}

218

}

219

220

client.webSocket("ws://localhost:8080/chat") {

221

// Launch coroutine for sending messages

222

launch {

223

repeat(10) { i ->

224

send("Message $i")

225

delay(1000)

226

}

227

close(CloseReason.NORMAL)

228

}

229

230

// Receive messages on main coroutine

231

for (frame in incoming) {

232

when (frame) {

233

is Frame.Text -> {

234

println("Server: ${frame.readText()}")

235

}

236

is Frame.Binary -> {

237

println("Received binary data: ${frame.data.size} bytes")

238

}

239

is Frame.Close -> {

240

println("Connection closed by server")

241

break

242

}

243

is Frame.Ping -> {

244

// Pong is sent automatically

245

println("Received ping")

246

}

247

is Frame.Pong -> {

248

println("Received pong")

249

}

250

}

251

}

252

}

253

```

254

255

### Secure WebSocket (WSS)

256

257

```kotlin

258

val client = HttpClient {

259

install(WebSockets)

260

}

261

262

client.wss(

263

host = "secure-chat.example.com",

264

port = 443,

265

path = "/websocket"

266

) {

267

// Configure headers for authentication

268

request {

269

header("Authorization", "Bearer $accessToken")

270

header("X-Client-Version", "1.0")

271

}

272

273

block = {

274

// WebSocket communication

275

send("Secure message")

276

277

for (frame in incoming) {

278

when (frame) {

279

is Frame.Text -> println("Secure: ${frame.readText()}")

280

is Frame.Close -> break

281

else -> { /* handle other frames */ }

282

}

283

}

284

}

285

}

286

```

287

288

## Advanced WebSocket Features

289

290

### Custom WebSocket Extensions

291

292

```kotlin

293

class CompressionExtension : WebSocketExtension<CompressionExtension.Config> {

294

class Config {

295

var compressionLevel: Int = 6

296

var windowBits: Int = 15

297

}

298

299

override val factory: WebSocketExtensionFactory<Config, CompressionExtension>

300

get() = TODO("Implement extension factory")

301

302

override val protocols: List<WebSocketExtensionHeader>

303

get() = listOf(WebSocketExtensionHeader("deflate-frame"))

304

}

305

306

val client = HttpClient {

307

install(WebSockets) {

308

extensions(CompressionExtension())

309

}

310

}

311

```

312

313

### Connection Management

314

315

```kotlin

316

class WebSocketManager {

317

private val client = HttpClient {

318

install(WebSockets) {

319

pingInterval = 30.seconds

320

}

321

}

322

323

private var session: ClientWebSocketSession? = null

324

private var reconnectJob: Job? = null

325

326

suspend fun connect(url: String): Boolean {

327

return try {

328

session = client.webSocketSession(url) {

329

header("Authorization", "Bearer $token")

330

}

331

332

// Start message handling

333

launch {

334

handleIncomingMessages()

335

}

336

337

true

338

} catch (e: Exception) {

339

println("Connection failed: ${e.message}")

340

scheduleReconnect(url)

341

false

342

}

343

}

344

345

private suspend fun handleIncomingMessages() {

346

try {

347

session?.let { session ->

348

for (frame in session.incoming) {

349

when (frame) {

350

is Frame.Text -> processMessage(frame.readText())

351

is Frame.Close -> {

352

println("Connection closed: ${frame.readReason()}")

353

break

354

}

355

else -> { /* handle other frames */ }

356

}

357

}

358

}

359

} catch (e: Exception) {

360

println("Message handling error: ${e.message}")

361

}

362

}

363

364

private fun scheduleReconnect(url: String) {

365

reconnectJob?.cancel()

366

reconnectJob = CoroutineScope(Dispatchers.IO).launch {

367

delay(5000) // Wait 5 seconds before reconnecting

368

connect(url)

369

}

370

}

371

372

suspend fun sendMessage(text: String): Boolean {

373

return try {

374

session?.send(text)

375

true

376

} catch (e: Exception) {

377

println("Send failed: ${e.message}")

378

false

379

}

380

}

381

382

suspend fun disconnect() {

383

reconnectJob?.cancel()

384

session?.close(CloseReason.NORMAL)

385

session = null

386

client.close()

387

}

388

389

private fun processMessage(message: String) {

390

// Process incoming message

391

println("Received: $message")

392

}

393

}

394

```

395

396

### Message Serialization

397

398

```kotlin

399

// JSON message serialization

400

suspend fun ClientWebSocketSession.sendJson(data: Any) {

401

val json = Json.encodeToString(data)

402

send(json)

403

}

404

405

suspend inline fun <reified T> ClientWebSocketSession.receiveJson(): T? {

406

for (frame in incoming) {

407

when (frame) {

408

is Frame.Text -> {

409

return Json.decodeFromString<T>(frame.readText())

410

}

411

is Frame.Close -> return null

412

else -> continue

413

}

414

}

415

return null

416

}

417

418

// Usage

419

data class ChatMessage(val user: String, val text: String, val timestamp: Long)

420

421

client.webSocket("ws://chat.example.com") {

422

// Send JSON message

423

sendJson(ChatMessage("Alice", "Hello!", System.currentTimeMillis()))

424

425

// Receive JSON message

426

val message: ChatMessage? = receiveJson()

427

message?.let { println("${it.user}: ${it.text}") }

428

}

429

```

430

431

### Binary Data Handling

432

433

```kotlin

434

client.webSocket("ws://binary-service.example.com") {

435

// Send binary data

436

val imageData = File("image.png").readBytes()

437

send(imageData)

438

439

// Receive binary data

440

for (frame in incoming) {

441

when (frame) {

442

is Frame.Binary -> {

443

val data = frame.data

444

File("received-${System.currentTimeMillis()}.bin").writeBytes(data)

445

println("Received ${data.size} bytes of binary data")

446

}

447

is Frame.Close -> break

448

else -> { /* handle other frames */ }

449

}

450

}

451

}

452

```

453

454

## Error Handling and Recovery

455

456

### Connection Error Handling

457

458

```kotlin

459

val client = HttpClient {

460

install(WebSockets) {

461

pingInterval = 10.seconds

462

}

463

}

464

465

suspend fun connectWithRetry(url: String, maxRetries: Int = 3) {

466

repeat(maxRetries) { attempt ->

467

try {

468

client.webSocket(url) {

469

println("Connected successfully on attempt ${attempt + 1}")

470

471

// Handle connection

472

for (frame in incoming) {

473

when (frame) {

474

is Frame.Text -> println(frame.readText())

475

is Frame.Close -> {

476

println("Connection closed normally")

477

return@webSocket

478

}

479

else -> { /* handle other frames */ }

480

}

481

}

482

}

483

return // Success, exit retry loop

484

} catch (e: ConnectTimeoutException) {

485

println("Connection timeout on attempt ${attempt + 1}")

486

if (attempt == maxRetries - 1) throw e

487

delay(2000 * (attempt + 1)) // Exponential backoff

488

} catch (e: Exception) {

489

println("Connection error on attempt ${attempt + 1}: ${e.message}")

490

if (attempt == maxRetries - 1) throw e

491

delay(1000)

492

}

493

}

494

}

495

```

496

497

### Graceful Disconnection

498

499

```kotlin

500

client.webSocket("ws://example.com/socket") {

501

try {

502

// WebSocket communication

503

for (frame in incoming) {

504

when (frame) {

505

is Frame.Text -> {

506

val message = frame.readText()

507

if (message == "shutdown") {

508

send("Acknowledged shutdown")

509

close(CloseReason.NORMAL)

510

break

511

}

512

// Process other messages

513

}

514

is Frame.Close -> {

515

println("Server closed connection: ${frame.readReason()}")

516

break

517

}

518

else -> { /* handle other frames */ }

519

}

520

}

521

} catch (e: Exception) {

522

println("WebSocket error: ${e.message}")

523

// Attempt graceful close

524

try {

525

close(CloseReason(1011, "Internal Error"))

526

} catch (closeException: Exception) {

527

println("Failed to close gracefully: ${closeException.message}")

528

}

529

} finally {

530

println("WebSocket session ended")

531

}

532

}

533

```

534

535

## WebSocket Testing

536

537

### Mock WebSocket Server

538

539

```kotlin

540

class MockWebSocketServer {

541

private val responses = mutableListOf<String>()

542

543

fun addResponse(response: String) {

544

responses.add(response)

545

}

546

547

suspend fun simulate(session: ClientWebSocketSession) {

548

// Simulate server responses

549

responses.forEach { response ->

550

session.send(response)

551

delay(100)

552

}

553

session.close(CloseReason.NORMAL)

554

}

555

}

556

557

// Test WebSocket client

558

suspend fun testWebSocketClient() {

559

val client = HttpClient {

560

install(WebSockets)

561

}

562

563

// In actual tests, this would be a real test server

564

client.webSocket("ws://test-server:8080/test") {

565

send("ping")

566

567

val response = incoming.receive()

568

if (response is Frame.Text) {

569

assertEquals("pong", response.readText())

570

}

571

}

572

573

client.close()

574

}

575

```

576

577

## Performance Optimization

578

579

### Connection Pooling

580

581

```kotlin

582

class WebSocketPool(private val maxConnections: Int = 10) {

583

private val availableConnections = Channel<ClientWebSocketSession>(maxConnections)

584

private val client = HttpClient {

585

install(WebSockets) {

586

pingInterval = 30.seconds

587

}

588

}

589

590

suspend fun borrowConnection(url: String): ClientWebSocketSession {

591

return availableConnections.tryReceive().getOrNull()

592

?: client.webSocketSession(url)

593

}

594

595

suspend fun returnConnection(session: ClientWebSocketSession) {

596

if (!session.closeReason.isCompleted) {

597

availableConnections.trySend(session)

598

}

599

}

600

601

fun close() {

602

client.close()

603

}

604

}

605

```

606

607

### Batched Message Sending

608

609

```kotlin

610

class BatchedWebSocketSender(private val session: ClientWebSocketSession) {

611

private val messageQueue = Channel<String>(Channel.UNLIMITED)

612

private val batchSize = 10

613

private val flushInterval = 100L // milliseconds

614

615

init {

616

CoroutineScope(Dispatchers.IO).launch {

617

processBatches()

618

}

619

}

620

621

suspend fun sendMessage(message: String) {

622

messageQueue.send(message)

623

}

624

625

private suspend fun processBatches() {

626

val batch = mutableListOf<String>()

627

628

while (!messageQueue.isClosedForReceive) {

629

// Collect messages for batch

630

val timeout = withTimeoutOrNull(flushInterval) {

631

repeat(batchSize) {

632

batch.add(messageQueue.receive())

633

}

634

}

635

636

// Send batch if we have messages

637

if (batch.isNotEmpty()) {

638

val combinedMessage = batch.joinToString("\n")

639

session.send(combinedMessage)

640

batch.clear()

641

}

642

}

643

}

644

}

645

```

646

647

## Best Practices

648

649

1. **Handle all frame types**: Always process Text, Binary, Close, Ping, and Pong frames appropriately

650

2. **Implement reconnection logic**: Network connections can be unreliable, implement automatic reconnection

651

3. **Use heartbeats**: Configure ping intervals to detect connection issues early

652

4. **Graceful shutdown**: Always close connections properly with appropriate close reasons

653

5. **Error handling**: Wrap WebSocket operations in try-catch blocks for robust error handling

654

6. **Message size limits**: Be aware of frame size limits and implement chunking for large messages

655

7. **Authentication**: Include proper authentication headers when establishing connections

656

8. **Resource cleanup**: Always close clients and sessions to prevent resource leaks

657

9. **Concurrent access**: Be careful with concurrent access to WebSocket sessions, they are not thread-safe

658

10. **Protocol negotiation**: Use WebSocket subprotocols for structured communication protocols