or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-configuration.mdcookie-management.mdforms-and-uploads.mdhttp-caching.mdhttp-requests.mdindex.mdplugin-system.mdresponse-handling.mdserver-sent-events.mdwebsockets.md

server-sent-events.mddocs/

0

# Server-Sent Events (SSE)

1

2

Server-Sent Events client implementation for real-time event streaming with automatic reconnection and event parsing.

3

4

## Capabilities

5

6

### SSE Connection Functions

7

8

Establish Server-Sent Events connections for real-time data streaming.

9

10

```kotlin { .api }

11

/**

12

* Establish SSE connection with URL string

13

* @param urlString SSE endpoint URL

14

* @param block SSE session handling block

15

*/

16

suspend fun HttpClient.sse(

17

urlString: String,

18

block: suspend ClientSSESession.() -> Unit

19

)

20

21

/**

22

* Establish SSE connection with detailed configuration

23

* @param host SSE server host

24

* @param port SSE server port (default: 80 for HTTP, 443 for HTTPS)

25

* @param path SSE endpoint path

26

* @param block SSE session handling block

27

*/

28

suspend fun HttpClient.sse(

29

host: String,

30

port: Int = DEFAULT_PORT,

31

path: String = "/",

32

block: suspend ClientSSESession.() -> Unit

33

)

34

35

/**

36

* Get SSE session without automatic event handling

37

* @param urlString SSE endpoint URL

38

* @returns SSE session for manual management

39

*/

40

suspend fun HttpClient.sseSession(urlString: String): ClientSSESession

41

42

/**

43

* Get SSE session with detailed configuration

44

* @param host SSE server host

45

* @param port SSE server port

46

* @param path SSE endpoint path

47

* @returns SSE session for manual management

48

*/

49

suspend fun HttpClient.sseSession(

50

host: String,

51

port: Int,

52

path: String

53

): ClientSSESession

54

```

55

56

**Usage Examples:**

57

58

```kotlin

59

val client = HttpClient {

60

install(SSE)

61

}

62

63

// Basic SSE connection

64

client.sse("https://api.example.com/events") {

65

// Process incoming server-sent events

66

for (event in incoming) {

67

when (event.event) {

68

"message" -> {

69

println("New message: ${event.data}")

70

}

71

"notification" -> {

72

println("Notification: ${event.data}")

73

// Parse JSON data if needed

74

val notification = Json.decodeFromString<Notification>(event.data ?: "")

75

handleNotification(notification)

76

}

77

"heartbeat" -> {

78

println("Server heartbeat received")

79

}

80

null -> {

81

// Default event type

82

println("Default event: ${event.data}")

83

}

84

}

85

}

86

}

87

88

// SSE with custom host and port

89

client.sse("localhost", 8080, "/stream") {

90

incoming.consumeEach { event ->

91

println("Event ID: ${event.id}")

92

println("Event Type: ${event.event}")

93

println("Event Data: ${event.data}")

94

println("Retry: ${event.retry}")

95

println("---")

96

}

97

}

98

99

// Manual SSE session management

100

val session = client.sseSession("https://api.example.com/live-updates")

101

try {

102

while (true) {

103

val event = session.incoming.receive()

104

processServerSentEvent(event)

105

}

106

} finally {

107

session.close()

108

}

109

```

110

111

### ClientSSESession Interface

112

113

Core SSE session interface for event handling and connection management.

114

115

```kotlin { .api }

116

/**

117

* Client SSE session interface

118

*/

119

interface ClientSSESession : CoroutineScope {

120

/**

121

* Incoming server-sent events channel

122

*/

123

val incoming: ReceiveChannel<ServerSentEvent>

124

125

/**

126

* HTTP call associated with this SSE session

127

*/

128

val call: HttpClientCall

129

130

/**

131

* Close the SSE connection

132

*/

133

suspend fun close()

134

}

135

136

/**

137

* Default SSE session implementation

138

*/

139

class DefaultClientSSESession(

140

override val call: HttpClientCall,

141

override val coroutineContext: CoroutineContext,

142

override val incoming: ReceiveChannel<ServerSentEvent>

143

) : ClientSSESession

144

```

145

146

### ServerSentEvent Data Class

147

148

Representation of individual server-sent events with all SSE fields.

149

150

```kotlin { .api }

151

/**

152

* Server-sent event representation

153

*/

154

data class ServerSentEvent(

155

/** Event data payload */

156

val data: String?,

157

158

/** Event type identifier */

159

val event: String?,

160

161

/** Event ID for last-event-id tracking */

162

val id: String?,

163

164

/** Retry timeout in milliseconds */

165

val retry: Long?,

166

167

/** Comments from the event stream */

168

val comments: String?

169

) {

170

companion object {

171

/**

172

* Create empty SSE event

173

* @returns Empty ServerSentEvent

174

*/

175

fun empty(): ServerSentEvent = ServerSentEvent(null, null, null, null, null)

176

}

177

}

178

```

179

180

**Usage Examples:**

181

182

```kotlin

183

client.sse("https://api.example.com/events") {

184

for (event in incoming) {

185

// Access all event fields

186

val eventData = event.data ?: "No data"

187

val eventType = event.event ?: "message" // Default event type

188

val eventId = event.id

189

val retryTime = event.retry

190

val comments = event.comments

191

192

println("Event: $eventType")

193

println("Data: $eventData")

194

195

// Use event ID for tracking

196

if (eventId != null) {

197

saveLastEventId(eventId)

198

}

199

200

// Handle retry timing

201

if (retryTime != null) {

202

println("Server suggests retry after ${retryTime}ms")

203

}

204

205

// Process based on event type

206

when (eventType) {

207

"user-joined" -> {

208

val user = Json.decodeFromString<User>(eventData)

209

handleUserJoined(user)

210

}

211

"user-left" -> {

212

val user = Json.decodeFromString<User>(eventData)

213

handleUserLeft(user)

214

}

215

"chat-message" -> {

216

val message = Json.decodeFromString<ChatMessage>(eventData)

217

displayMessage(message)

218

}

219

}

220

}

221

}

222

```

223

224

### SSE Plugin Configuration

225

226

SSE plugin installation and configuration options.

227

228

```kotlin { .api }

229

/**

230

* SSE plugin object

231

*/

232

object SSE : HttpClientPlugin<SSEConfig, SSEConfig> {

233

override val key: AttributeKey<SSEConfig>

234

235

override fun prepare(block: SSEConfig.() -> Unit): SSEConfig

236

override fun install(plugin: SSEConfig, scope: HttpClient)

237

}

238

239

/**

240

* SSE configuration

241

*/

242

class SSEConfig {

243

/** Whether to show comment lines in events */

244

var showCommentLines: Boolean = false

245

246

/** Whether to show retry directive in events */

247

var showRetryDirective: Boolean = false

248

249

/** Custom reconnection strategy */

250

var reconnectionStrategy: SSEReconnectionStrategy? = null

251

}

252

253

/**

254

* SSE reconnection strategy interface

255

*/

256

interface SSEReconnectionStrategy {

257

/**

258

* Determine if reconnection should be attempted

259

* @param attempt Reconnection attempt number (starting from 1)

260

* @param lastError Last connection error

261

* @returns True if should reconnect, false otherwise

262

*/

263

suspend fun shouldReconnect(attempt: Int, lastError: Throwable?): Boolean

264

265

/**

266

* Calculate delay before reconnection attempt

267

* @param attempt Reconnection attempt number

268

* @returns Delay in milliseconds

269

*/

270

suspend fun delayBeforeReconnect(attempt: Int): Long

271

}

272

```

273

274

**Usage Examples:**

275

276

```kotlin

277

// Configure SSE plugin

278

val client = HttpClient {

279

install(SSE) {

280

showCommentLines = true

281

showRetryDirective = true

282

283

// Custom reconnection strategy

284

reconnectionStrategy = object : SSEReconnectionStrategy {

285

override suspend fun shouldReconnect(attempt: Int, lastError: Throwable?): Boolean {

286

return attempt <= 5 // Max 5 reconnection attempts

287

}

288

289

override suspend fun delayBeforeReconnect(attempt: Int): Long {

290

return (attempt * 1000).toLong() // Exponential backoff

291

}

292

}

293

}

294

}

295

296

// Use configured SSE client

297

client.sse("https://api.example.com/events") {

298

for (event in incoming) {

299

// Comments and retry directives included based on config

300

if (event.comments != null) {

301

println("Server comment: ${event.comments}")

302

}

303

304

if (event.retry != null) {

305

println("Retry directive: ${event.retry}ms")

306

}

307

308

// Process event data

309

event.data?.let { data ->

310

processEventData(data, event.event)

311

}

312

}

313

}

314

```

315

316

### SSE Request Configuration

317

318

Configure SSE requests with headers, authentication, and parameters.

319

320

```kotlin { .api }

321

/**

322

* Configure SSE request with custom headers and parameters

323

*/

324

suspend fun HttpClient.sse(

325

urlString: String,

326

block: suspend ClientSSESession.() -> Unit,

327

requestBuilder: HttpRequestBuilder.() -> Unit = {}

328

)

329

```

330

331

**Usage Examples:**

332

333

```kotlin

334

// SSE with authentication and custom headers

335

client.sse("https://api.example.com/private-events", {

336

// SSE session handling

337

for (event in incoming) {

338

handlePrivateEvent(event)

339

}

340

}) {

341

// Request configuration

342

bearerAuth("your-jwt-token")

343

header("X-Client-Version", "1.0")

344

parameter("channel", "notifications")

345

parameter("user_id", "12345")

346

347

// Set Last-Event-ID for resuming

348

val lastEventId = getStoredLastEventId()

349

if (lastEventId != null) {

350

header("Last-Event-ID", lastEventId)

351

}

352

}

353

354

// SSE with query parameters for filtering

355

client.sse("https://api.example.com/events", {

356

for (event in incoming) {

357

when (event.event) {

358

"price-update" -> handlePriceUpdate(event.data)

359

"trade-executed" -> handleTradeExecution(event.data)

360

}

361

}

362

}) {

363

parameter("symbols", "AAPL,GOOGL,MSFT")

364

parameter("events", "price-update,trade-executed")

365

header("Accept", "text/event-stream")

366

}

367

```

368

369

### Error Handling and Reconnection

370

371

Handle SSE connection errors and implement custom reconnection logic.

372

373

```kotlin { .api }

374

/**

375

* SSE connection exception

376

*/

377

class SSEConnectionException(

378

message: String,

379

cause: Throwable? = null

380

) : Exception(message, cause)

381

382

/**

383

* Handle SSE with error recovery

384

*/

385

suspend fun <T> HttpClient.sseWithRetry(

386

urlString: String,

387

maxRetries: Int = 3,

388

retryDelay: Long = 1000,

389

block: suspend ClientSSESession.() -> T

390

): T

391

```

392

393

**Usage Examples:**

394

395

```kotlin

396

// Error handling with manual retry

397

suspend fun connectToSSEWithRetry() {

398

var attempt = 0

399

val maxAttempts = 5

400

401

while (attempt < maxAttempts) {

402

try {

403

client.sse("https://api.example.com/events") {

404

println("Connected to SSE stream (attempt ${attempt + 1})")

405

406

for (event in incoming) {

407

processEvent(event)

408

}

409

}

410

break // Success, exit retry loop

411

} catch (e: SSEConnectionException) {

412

attempt++

413

println("SSE connection failed (attempt $attempt): ${e.message}")

414

415

if (attempt < maxAttempts) {

416

val delay = attempt * 2000L // Exponential backoff

417

println("Retrying in ${delay}ms...")

418

delay(delay)

419

} else {

420

println("Max retry attempts reached, giving up")

421

throw e

422

}

423

}

424

}

425

}

426

427

// Graceful error handling within SSE session

428

client.sse("https://api.example.com/events") {

429

try {

430

for (event in incoming) {

431

try {

432

processEvent(event)

433

} catch (e: Exception) {

434

println("Error processing event: ${e.message}")

435

// Continue processing other events

436

}

437

}

438

} catch (e: ChannelClosedException) {

439

println("SSE channel closed: ${e.message}")

440

} catch (e: Exception) {

441

println("SSE session error: ${e.message}")

442

throw e

443

}

444

}

445

```

446

447

## Types

448

449

### SSE Types

450

451

```kotlin { .api }

452

/**

453

* SSE content wrapper for pipeline processing

454

*/

455

class SSEClientContent : OutgoingContent.NoContent() {

456

override val contentType: ContentType = ContentType.Text.EventStream

457

}

458

459

/**

460

* Channel result for SSE operations

461

*/

462

sealed class ChannelResult<out T> {

463

data class Success<T>(val value: T) : ChannelResult<T>()

464

data class Failure(val exception: Throwable) : ChannelResult<Nothing>()

465

object Closed : ChannelResult<Nothing>()

466

467

val isSuccess: Boolean

468

val isFailure: Boolean

469

val isClosed: Boolean

470

471

fun getOrNull(): T?

472

fun exceptionOrNull(): Throwable?

473

}

474

475

/**

476

* Channel closed exception

477

*/

478

class ChannelClosedException(

479

message: String? = null,

480

cause: Throwable? = null

481

) : Exception(message, cause)

482

```

483

484

### Content Types

485

486

```kotlin { .api }

487

/**

488

* SSE-specific content types

489

*/

490

object ContentType {

491

object Text {

492

/** Content type for Server-Sent Events */

493

val EventStream: ContentType = ContentType("text", "event-stream")

494

}

495

}

496

497

/**

498

* SSE-specific HTTP headers

499

*/

500

object SSEHeaders {

501

const val LastEventId = "Last-Event-ID"

502

const val CacheControl = "Cache-Control"

503

const val Connection = "Connection"

504

const val Accept = "Accept"

505

}

506

```