or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

attribute-extraction.mdcrud-handlers.mdcrud-operations.mdcustom-resources.mdindex.mdjunit-integration.mdmock-server-management.mdwebsocket-operations.md

websocket-operations.mddocs/

0

# WebSocket Operations

1

2

WebSocket-based functionality for watch operations and exec/attach commands with proper stream handling and message formatting.

3

4

## Capabilities

5

6

### Watch Events

7

8

Real-time resource change notifications through WebSocket connections.

9

10

```java { .api }

11

/**

12

* Handles WebSocket watch events for resource monitoring

13

* Manages WebSocket connections and event distribution

14

*/

15

public class WatchEventsListener {

16

17

/**

18

* Create a new WatchEventsListener

19

* @param context Server context for processing

20

* @param attributeSet Filter attributes for this watch

21

* @param watchEventListenerList Set of all active listeners

22

* @param logger Logger instance for this listener

23

* @param onOpenAction Action to execute when WebSocket opens

24

*/

25

public WatchEventsListener(Context context, AttributeSet attributeSet,

26

Set<WatchEventsListener> watchEventListenerList,

27

Logger logger, Consumer<WatchEventsListener> onOpenAction);

28

29

/**

30

* Send a WebSocket response for a resource event

31

* @param resource JSON representation of the resource

32

* @param action Type of action (ADDED, MODIFIED, DELETED)

33

*/

34

public void sendWebSocketResponse(String resource, Watcher.Action action);

35

36

/**

37

* Check if attributes match this watch listener's criteria

38

* @param attributes AttributeSet to check against watch filters

39

* @return true if attributes match the watch criteria

40

*/

41

public boolean attributeMatches(AttributeSet attributes);

42

43

// WebSocket lifecycle callbacks

44

45

/**

46

* Called when WebSocket connection is opened

47

* @param webSocket The opened WebSocket

48

* @param response The HTTP response that initiated the WebSocket

49

*/

50

public void onOpen(WebSocket webSocket, Response response);

51

52

/**

53

* Called when WebSocket connection is closing

54

* @param webSocket The closing WebSocket

55

* @param code Close status code

56

* @param reason Close reason message

57

*/

58

public void onClosing(WebSocket webSocket, int code, String reason);

59

60

/**

61

* Called when WebSocket connection is closed

62

* @param webSocket The closed WebSocket

63

* @param code Close status code

64

* @param reason Close reason message

65

*/

66

public void onClosed(WebSocket webSocket, int code, String reason);

67

68

/**

69

* Called when WebSocket connection fails

70

* @param webSocket The failed WebSocket

71

* @param t Exception that caused the failure

72

* @param response The HTTP response (may be null)

73

*/

74

public void onFailure(WebSocket webSocket, Throwable t, Response response);

75

}

76

```

77

78

**Watch Actions:**

79

80

```java { .api }

81

// Watch event actions from Kubernetes client

82

import io.fabric8.kubernetes.client.Watcher.Action;

83

84

// Available actions:

85

// - Action.ADDED: Resource was created

86

// - Action.MODIFIED: Resource was updated

87

// - Action.DELETED: Resource was deleted

88

// - Action.ERROR: Watch error occurred

89

// - Action.BOOKMARK: Bookmark event for resuming watches

90

```

91

92

**Usage Examples:**

93

94

```java

95

@EnableKubernetesMockClient(crud = true)

96

class WatchOperationsTest {

97

KubernetesClient client;

98

99

@Test

100

void testPodWatch() throws InterruptedException {

101

CountDownLatch latch = new CountDownLatch(3);

102

List<String> events = new ArrayList<>();

103

104

// Start watching pods in default namespace

105

Watch watch = client.pods().inNamespace("default").watch(new Watcher<Pod>() {

106

@Override

107

public void eventReceived(Action action, Pod resource) {

108

events.add(action + ":" + resource.getMetadata().getName());

109

latch.countDown();

110

}

111

112

@Override

113

public void onClose(WatcherException cause) {

114

if (cause != null) {

115

cause.printStackTrace();

116

}

117

}

118

});

119

120

try {

121

// Create a pod - triggers ADDED

122

Pod pod = new PodBuilder()

123

.withNewMetadata().withName("watched-pod").endMetadata()

124

.build();

125

client.pods().inNamespace("default").resource(pod).create();

126

127

// Update the pod - triggers MODIFIED

128

pod.getMetadata().setLabels(Map.of("updated", "true"));

129

client.pods().inNamespace("default").resource(pod).update();

130

131

// Delete the pod - triggers DELETED

132

client.pods().inNamespace("default").withName("watched-pod").delete();

133

134

// Wait for all events

135

assertTrue(latch.await(10, TimeUnit.SECONDS));

136

assertEquals(3, events.size());

137

assertEquals("ADDED:watched-pod", events.get(0));

138

assertEquals("MODIFIED:watched-pod", events.get(1));

139

assertEquals("DELETED:watched-pod", events.get(2));

140

} finally {

141

watch.close();

142

}

143

}

144

145

@Test

146

void testWatchWithLabelSelector() throws InterruptedException {

147

CountDownLatch latch = new CountDownLatch(1);

148

List<Pod> matchedPods = new ArrayList<>();

149

150

// Watch only pods with specific label

151

Watch watch = client.pods().inNamespace("default")

152

.withLabel("app", "web")

153

.watch(new Watcher<Pod>() {

154

@Override

155

public void eventReceived(Action action, Pod resource) {

156

matchedPods.add(resource);

157

latch.countDown();

158

}

159

160

@Override

161

public void onClose(WatcherException cause) {}

162

});

163

164

try {

165

// Create pod without label - should not trigger watch

166

Pod podNoLabel = new PodBuilder()

167

.withNewMetadata().withName("no-label").endMetadata()

168

.build();

169

client.pods().inNamespace("default").resource(podNoLabel).create();

170

171

// Create pod with matching label - should trigger watch

172

Pod podWithLabel = new PodBuilder()

173

.withNewMetadata()

174

.withName("with-label")

175

.addToLabels("app", "web")

176

.endMetadata()

177

.build();

178

client.pods().inNamespace("default").resource(podWithLabel).create();

179

180

assertTrue(latch.await(5, TimeUnit.SECONDS));

181

assertEquals(1, matchedPods.size());

182

assertEquals("with-label", matchedPods.get(0).getMetadata().getName());

183

} finally {

184

watch.close();

185

}

186

}

187

}

188

```

189

190

### Stream Messages

191

192

WebSocket message types for exec and attach operations with proper stream multiplexing.

193

194

```java { .api }

195

/**

196

* WebSocket message for standard output stream data

197

* Used in exec and attach operations

198

*/

199

public class OutputStreamMessage extends WebSocketMessage {

200

/**

201

* Create output stream message

202

* @param body String content for stdout

203

*/

204

public OutputStreamMessage(String body);

205

}

206

207

/**

208

* WebSocket message for error stream data

209

* Used in exec and attach operations

210

*/

211

public class ErrorStreamMessage extends WebSocketMessage {

212

/**

213

* Create error stream message

214

* @param body String content for stderr

215

*/

216

public ErrorStreamMessage(String body);

217

}

218

219

/**

220

* WebSocket message for status information

221

* Used to indicate command completion status

222

*/

223

public class StatusStreamMessage extends WebSocketMessage {

224

/**

225

* Create status stream message

226

* @param body String content for status information

227

*/

228

public StatusStreamMessage(String body);

229

}

230

231

/**

232

* Status message for exec/attach operations

233

* Contains exit code and termination reason

234

*/

235

public class StatusMessage {

236

/**

237

* Get the exit status of the command

238

* @return Exit code (0 for success, non-zero for failure)

239

*/

240

public int getStatus();

241

242

/**

243

* Get the termination reason

244

* @return String describing why the command terminated

245

*/

246

public String getReason();

247

}

248

```

249

250

**Message Format:**

251

252

The WebSocket messages follow Kubernetes' SPDY protocol format with stream multiplexing:

253

- Stream 0: Reserved for error stream

254

- Stream 1: Standard output

255

- Stream 2: Standard error

256

- Stream 3: Status/control messages

257

258

**Usage Examples:**

259

260

```java

261

@Test

262

void testExecOperation() {

263

// Create a pod first

264

Pod pod = new PodBuilder()

265

.withNewMetadata().withName("exec-pod").endMetadata()

266

.withNewSpec()

267

.addNewContainer()

268

.withName("main")

269

.withImage("busybox")

270

.withCommand("sleep", "3600")

271

.endContainer()

272

.endSpec()

273

.build();

274

client.pods().inNamespace("default").resource(pod).create();

275

276

// Set up expectations for exec operation

277

server.expect().get()

278

.withPath("/api/v1/namespaces/default/pods/exec-pod/exec")

279

.andUpgradeToWebSocket()

280

.open()

281

.waitFor(1000)

282

.andEmit(new OutputStreamMessage("Hello from container\n"))

283

.andEmit(new StatusStreamMessage("{\"status\":0,\"reason\":\"Completed\"}"))

284

.done()

285

.once();

286

287

// Execute command

288

ByteArrayOutputStream stdout = new ByteArrayOutputStream();

289

ByteArrayOutputStream stderr = new ByteArrayOutputStream();

290

291

ExecWatch execWatch = client.pods()

292

.inNamespace("default")

293

.withName("exec-pod")

294

.writingOutput(stdout)

295

.writingError(stderr)

296

.exec("echo", "Hello from container");

297

298

// Wait for completion

299

execWatch.exitCode().join();

300

301

assertEquals("Hello from container\n", stdout.toString());

302

assertEquals(0, execWatch.exitCode().getNow(-1));

303

}

304

305

@Test

306

void testAttachOperation() {

307

Pod pod = new PodBuilder()

308

.withNewMetadata().withName("attach-pod").endMetadata()

309

.withNewSpec()

310

.addNewContainer()

311

.withName("main")

312

.withImage("nginx")

313

.endContainer()

314

.endSpec()

315

.build();

316

client.pods().inNamespace("default").resource(pod).create();

317

318

// Set up expectations for attach

319

server.expect().get()

320

.withPath("/api/v1/namespaces/default/pods/attach-pod/attach")

321

.andUpgradeToWebSocket()

322

.open()

323

.waitFor(500)

324

.andEmit(new OutputStreamMessage("Container output\n"))

325

.andEmit(new ErrorStreamMessage("Container error\n"))

326

.done()

327

.once();

328

329

ByteArrayOutputStream stdout = new ByteArrayOutputStream();

330

ByteArrayOutputStream stderr = new ByteArrayOutputStream();

331

332

// Attach to running container

333

ExecWatch attachWatch = client.pods()

334

.inNamespace("default")

335

.withName("attach-pod")

336

.writingOutput(stdout)

337

.writingError(stderr)

338

.attach();

339

340

// Wait for some output

341

Thread.sleep(1000);

342

attachWatch.close();

343

344

assertTrue(stdout.toString().contains("Container output"));

345

assertTrue(stderr.toString().contains("Container error"));

346

}

347

```

348

349

### Log Streaming

350

351

WebSocket-based log streaming with follow support.

352

353

```java

354

@Test

355

void testLogStreaming() throws InterruptedException {

356

Pod pod = new PodBuilder()

357

.withNewMetadata().withName("log-pod").endMetadata()

358

.build();

359

client.pods().inNamespace("default").resource(pod).create();

360

361

// Set up log streaming expectation

362

server.expect().get()

363

.withPath("/api/v1/namespaces/default/pods/log-pod/log?follow=true")

364

.andUpgradeToWebSocket()

365

.open()

366

.waitFor(100)

367

.andEmit(new OutputStreamMessage("Log line 1\n"))

368

.waitFor(100)

369

.andEmit(new OutputStreamMessage("Log line 2\n"))

370

.waitFor(100)

371

.andEmit(new OutputStreamMessage("Log line 3\n"))

372

.done()

373

.once();

374

375

CountDownLatch latch = new CountDownLatch(3);

376

List<String> logLines = new ArrayList<>();

377

378

// Follow logs

379

LogWatch logWatch = client.pods()

380

.inNamespace("default")

381

.withName("log-pod")

382

.watchLog(new InputStream() {

383

// Custom input stream that captures log lines

384

private final List<String> lines = Arrays.asList("Log line 1\n", "Log line 2\n", "Log line 3\n");

385

private int index = 0;

386

387

@Override

388

public int read() {

389

if (index >= lines.size()) return -1;

390

String line = lines.get(index++);

391

logLines.add(line);

392

latch.countDown();

393

return line.charAt(0);

394

}

395

});

396

397

try {

398

assertTrue(latch.await(5, TimeUnit.SECONDS));

399

assertEquals(3, logLines.size());

400

} finally {

401

logWatch.close();

402

}

403

}

404

```

405

406

### Port Forwarding

407

408

WebSocket-based port forwarding functionality.

409

410

```java

411

@Test

412

void testPortForwarding() {

413

Pod pod = new PodBuilder()

414

.withNewMetadata().withName("port-forward-pod").endMetadata()

415

.withNewSpec()

416

.addNewContainer()

417

.withName("web")

418

.withImage("nginx")

419

.addNewPort()

420

.withContainerPort(80)

421

.endPort()

422

.endContainer()

423

.endSpec()

424

.build();

425

client.pods().inNamespace("default").resource(pod).create();

426

427

// Set up port forward expectation

428

server.expect().get()

429

.withPath("/api/v1/namespaces/default/pods/port-forward-pod/portforward")

430

.andUpgradeToWebSocket()

431

.open()

432

// Port forwarding uses binary WebSocket frames

433

// Mock server handles the port forwarding protocol

434

.done()

435

.once();

436

437

// Start port forwarding

438

LocalPortForward portForward = client.pods()

439

.inNamespace("default")

440

.withName("port-forward-pod")

441

.portForward(80, 8080);

442

443

try {

444

// Port forward is now active on local port 8080

445

assertTrue(portForward.isAlive());

446

assertEquals(8080, portForward.getLocalPort());

447

448

// In a real scenario, you could now connect to localhost:8080

449

// to reach the pod's port 80

450

} finally {

451

portForward.close();

452

}

453

}

454

```

455

456

### WebSocket Connection Management

457

458

The mock server automatically handles WebSocket protocol upgrades and connection lifecycle.

459

460

**Connection Features:**

461

- Automatic protocol upgrade from HTTP to WebSocket

462

- Proper handshake handling

463

- Binary and text frame support

464

- Connection keep-alive and cleanup

465

- Multiple concurrent connections

466

- Stream multiplexing for exec/attach operations

467

468

**Error Handling:**

469

470

```java

471

@Test

472

void testWebSocketErrors() throws InterruptedException {

473

CountDownLatch errorLatch = new CountDownLatch(1);

474

WatcherException[] exception = new WatcherException[1];

475

476

// Watch for a pod that will cause an error

477

Watch watch = client.pods().inNamespace("nonexistent").watch(new Watcher<Pod>() {

478

@Override

479

public void eventReceived(Action action, Pod resource) {

480

// Should not be called

481

}

482

483

@Override

484

public void onClose(WatcherException cause) {

485

exception[0] = cause;

486

errorLatch.countDown();

487

}

488

});

489

490

// Simulate server error by not setting up expectation

491

// This will cause the watch to fail

492

493

assertTrue(errorLatch.await(5, TimeUnit.SECONDS));

494

assertNotNull(exception[0]);

495

watch.close();

496

}

497

```