or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-apis.mdconfiguration.mddynamic-client.mdindex.mdleader-election.mdstreaming.mdutils.mdwatch.md

streaming.mddocs/

0

# Streaming

1

2

WebSocket-based streaming for exec sessions, port forwarding, and attach operations. Provides real-time bidirectional communication with containers running in Kubernetes pods, enabling interactive terminal sessions and data streaming.

3

4

## Capabilities

5

6

### WebSocket API Client

7

8

Enhanced API client with WebSocket support for streaming operations.

9

10

```python { .api }

11

class WsApiClient(ApiClient):

12

def __init__(self, configuration=None, header_name=None, header_value=None,

13

cookie=None, pool_threads=1, heartbeat=None):

14

"""

15

WebSocket-enabled API client for streaming operations.

16

17

Parameters:

18

- configuration: Configuration, client configuration

19

- header_name: str, custom header name for authentication

20

- header_value: str, custom header value for authentication

21

- cookie: str, cookie string for authentication

22

- pool_threads: int, number of threads for connection pooling

23

- heartbeat: int, WebSocket heartbeat interval in seconds

24

"""

25

26

async def request(self, method, url, query_params=None, headers=None, body=None,

27

post_params=None, _preload_content=True, _request_timeout=None):

28

"""

29

Make WebSocket request for streaming operations.

30

31

Parameters:

32

- method: str, HTTP method (GET for WebSocket upgrade)

33

- url: str, WebSocket URL

34

- query_params: dict, query parameters for WebSocket connection

35

- headers: dict, additional headers

36

- body: bytes, initial data to send

37

- post_params: dict, POST parameters (unused for WebSocket)

38

- _preload_content: bool, whether to preload response content

39

- _request_timeout: int, request timeout in seconds

40

41

Returns:

42

- WsResponse: WebSocket response wrapper

43

"""

44

45

def parse_error_data(self, data):

46

"""

47

Parse error channel data from WebSocket stream.

48

49

Parameters:

50

- data: bytes, raw error data from ERROR_CHANNEL

51

52

Returns:

53

- dict: Parsed error information

54

"""

55

```

56

57

### WebSocket Response Handling

58

59

Response wrapper for WebSocket streaming operations.

60

61

```python { .api }

62

class WsResponse:

63

def __init__(self, websocket):

64

"""

65

WebSocket response wrapper.

66

67

Parameters:

68

- websocket: WebSocket connection object

69

"""

70

71

async def read_channel(self, timeout=None):

72

"""

73

Read data from WebSocket with channel information.

74

75

Parameters:

76

- timeout: int, read timeout in seconds

77

78

Returns:

79

- tuple: (channel_number, data) where channel indicates data type

80

"""

81

82

async def write_channel(self, channel, data):

83

"""

84

Write data to specific WebSocket channel.

85

86

Parameters:

87

- channel: int, target channel number

88

- data: bytes, data to write

89

"""

90

91

async def close(self):

92

"""Close WebSocket connection."""

93

```

94

95

### Channel Constants

96

97

WebSocket channel identifiers for different data streams in exec/attach operations.

98

99

```python { .api }

100

# Standard I/O channels

101

STDIN_CHANNEL: int = 0 # Standard input to container

102

STDOUT_CHANNEL: int = 1 # Standard output from container

103

STDERR_CHANNEL: int = 2 # Standard error from container

104

ERROR_CHANNEL: int = 3 # Error information and status

105

RESIZE_CHANNEL: int = 4 # Terminal resize events

106

```

107

108

### Utility Functions

109

110

Helper functions for WebSocket URL handling and connection setup.

111

112

```python { .api }

113

def get_websocket_url(url):

114

"""

115

Convert HTTP/HTTPS URL to WebSocket URL.

116

117

Parameters:

118

- url: str, HTTP or HTTPS URL

119

120

Returns:

121

- str: Corresponding WebSocket URL (ws:// or wss://)

122

"""

123

```

124

125

## Usage Examples

126

127

### Container Exec Session

128

129

```python

130

import asyncio

131

from kubernetes_asyncio import client, config, stream

132

133

async def exec_in_pod():

134

await config.load_config()

135

136

# Create WebSocket-enabled client

137

ws_client = stream.WsApiClient()

138

v1 = client.CoreV1Api(ws_client)

139

140

try:

141

# Execute command in pod

142

exec_command = ['/bin/sh', '-c', 'echo "Hello from container"; ls -la']

143

144

response = await v1.connect_get_namespaced_pod_exec(

145

name="my-pod",

146

namespace="default",

147

command=exec_command,

148

stderr=True,

149

stdin=False,

150

stdout=True,

151

tty=False

152

)

153

154

# Read output from exec session

155

while True:

156

try:

157

channel, data = await response.read_channel(timeout=10)

158

159

if channel == stream.STDOUT_CHANNEL:

160

print(f"STDOUT: {data.decode('utf-8')}", end="")

161

elif channel == stream.STDERR_CHANNEL:

162

print(f"STDERR: {data.decode('utf-8')}", end="")

163

elif channel == stream.ERROR_CHANNEL:

164

error_info = ws_client.parse_error_data(data)

165

if error_info.get('status') == 'Success':

166

print("Command completed successfully")

167

break

168

else:

169

print(f"Error: {error_info}")

170

break

171

172

except asyncio.TimeoutError:

173

print("Exec session timed out")

174

break

175

176

finally:

177

await response.close()

178

await ws_client.close()

179

180

asyncio.run(exec_in_pod())

181

```

182

183

### Interactive Terminal Session

184

185

```python

186

import sys

187

import select

188

import termios

189

import tty

190

from kubernetes_asyncio import client, config, stream

191

192

async def interactive_shell():

193

await config.load_config()

194

195

ws_client = stream.WsApiClient()

196

v1 = client.CoreV1Api(ws_client)

197

198

try:

199

# Start interactive shell

200

response = await v1.connect_get_namespaced_pod_exec(

201

name="my-pod",

202

namespace="default",

203

command=['/bin/bash'],

204

stderr=True,

205

stdin=True,

206

stdout=True,

207

tty=True

208

)

209

210

# Set terminal to raw mode for interactive session

211

old_settings = termios.tcgetattr(sys.stdin)

212

tty.setraw(sys.stdin.fileno())

213

214

try:

215

# Handle bidirectional communication

216

async def read_output():

217

while True:

218

try:

219

channel, data = await response.read_channel(timeout=0.1)

220

221

if channel == stream.STDOUT_CHANNEL:

222

sys.stdout.write(data.decode('utf-8'))

223

sys.stdout.flush()

224

elif channel == stream.STDERR_CHANNEL:

225

sys.stderr.write(data.decode('utf-8'))

226

sys.stderr.flush()

227

elif channel == stream.ERROR_CHANNEL:

228

error_info = ws_client.parse_error_data(data)

229

if error_info.get('status') != 'Success':

230

print(f"\nError: {error_info}")

231

break

232

233

except asyncio.TimeoutError:

234

continue

235

236

async def send_input():

237

while True:

238

# Check for input without blocking

239

if select.select([sys.stdin], [], [], 0.1)[0]:

240

char = sys.stdin.read(1)

241

if char:

242

await response.write_channel(stream.STDIN_CHANNEL, char.encode('utf-8'))

243

await asyncio.sleep(0.01)

244

245

# Run both input and output handlers concurrently

246

await asyncio.gather(read_output(), send_input())

247

248

finally:

249

termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)

250

251

finally:

252

await response.close()

253

await ws_client.close()

254

255

# asyncio.run(interactive_shell()) # Uncomment to run

256

```

257

258

### Container Attach

259

260

```python

261

async def attach_to_pod():

262

await config.load_config()

263

264

ws_client = stream.WsApiClient()

265

v1 = client.CoreV1Api(ws_client)

266

267

try:

268

# Attach to running container

269

response = await v1.connect_get_namespaced_pod_attach(

270

name="my-pod",

271

namespace="default",

272

container="my-container", # Optional: specify container

273

stderr=True,

274

stdin=True,

275

stdout=True,

276

tty=True

277

)

278

279

# Send initial command

280

command = "/bin/bash\n"

281

await response.write_channel(stream.STDIN_CHANNEL, command.encode('utf-8'))

282

283

# Read responses

284

timeout_count = 0

285

while timeout_count < 5: # Exit after 5 timeouts

286

try:

287

channel, data = await response.read_channel(timeout=2)

288

289

if channel == stream.STDOUT_CHANNEL:

290

print(f"Output: {data.decode('utf-8')}", end="")

291

timeout_count = 0 # Reset timeout counter

292

elif channel == stream.STDERR_CHANNEL:

293

print(f"Error: {data.decode('utf-8')}", end="")

294

timeout_count = 0

295

elif channel == stream.ERROR_CHANNEL:

296

error_info = ws_client.parse_error_data(data)

297

print(f"Status: {error_info}")

298

break

299

300

except asyncio.TimeoutError:

301

timeout_count += 1

302

print("No output received...")

303

304

finally:

305

await response.close()

306

await ws_client.close()

307

308

asyncio.run(attach_to_pod())

309

```

310

311

### Port Forwarding

312

313

```python

314

async def port_forward():

315

await config.load_config()

316

317

ws_client = stream.WsApiClient()

318

v1 = client.CoreV1Api(ws_client)

319

320

try:

321

# Set up port forwarding

322

response = await v1.connect_get_namespaced_pod_portforward(

323

name="my-pod",

324

namespace="default",

325

ports="8080" # Forward port 8080

326

)

327

328

print("Port forwarding established on port 8080")

329

330

# Handle port forwarding data

331

while True:

332

try:

333

channel, data = await response.read_channel(timeout=30)

334

335

if channel == stream.STDOUT_CHANNEL:

336

# Handle forwarded data from port 8080

337

print(f"Received {len(data)} bytes from port 8080")

338

# Process or forward data as needed

339

elif channel == stream.ERROR_CHANNEL:

340

error_info = ws_client.parse_error_data(data)

341

if error_info.get('status') != 'Success':

342

print(f"Port forward error: {error_info}")

343

break

344

345

except asyncio.TimeoutError:

346

print("Port forwarding timeout - connection may be idle")

347

continue

348

349

finally:

350

await response.close()

351

await ws_client.close()

352

353

# asyncio.run(port_forward()) # Uncomment to run

354

```

355

356

### Terminal Resize Handling

357

358

```python

359

import signal

360

import struct

361

import fcntl

362

import termios

363

364

async def exec_with_resize():

365

await config.load_config()

366

367

ws_client = stream.WsApiClient()

368

v1 = client.CoreV1Api(ws_client)

369

370

try:

371

response = await v1.connect_get_namespaced_pod_exec(

372

name="my-pod",

373

namespace="default",

374

command=['/bin/bash'],

375

stderr=True,

376

stdin=True,

377

stdout=True,

378

tty=True

379

)

380

381

# Get initial terminal size

382

def get_terminal_size():

383

s = struct.pack('HHHH', 0, 0, 0, 0)

384

x = fcntl.ioctl(sys.stdout.fileno(), termios.TIOCGWINSZ, s)

385

return struct.unpack('HHHH', x)[:2] # rows, cols

386

387

# Send initial terminal size

388

rows, cols = get_terminal_size()

389

resize_data = f'{{"Width":{cols},"Height":{rows}}}'

390

await response.write_channel(stream.RESIZE_CHANNEL, resize_data.encode('utf-8'))

391

392

# Handle terminal resize signals

393

def handle_resize(signum, frame):

394

rows, cols = get_terminal_size()

395

resize_data = f'{{"Width":{cols},"Height":{rows}}}'

396

397

# Note: In real implementation, you'd need to queue this

398

# for the async event loop to process

399

print(f"Terminal resized to {cols}x{rows}")

400

401

signal.signal(signal.SIGWINCH, handle_resize)

402

403

# Continue with normal exec session handling...

404

# (Similar to interactive_shell example above)

405

406

finally:

407

await response.close()

408

await ws_client.close()

409

410

# asyncio.run(exec_with_resize()) # Uncomment to run

411

```

412

413

### File Upload via Exec

414

415

```python

416

import base64

417

418

async def upload_file_to_pod():

419

await config.load_config()

420

421

ws_client = stream.WsApiClient()

422

v1 = client.CoreV1Api(ws_client)

423

424

try:

425

# Read local file

426

with open("/local/path/to/file.txt", "rb") as f:

427

file_content = f.read()

428

429

# Base64 encode for safe transmission

430

encoded_content = base64.b64encode(file_content).decode('utf-8')

431

432

# Create command to decode and write file in pod

433

command = [

434

'/bin/sh', '-c',

435

f'echo "{encoded_content}" | base64 -d > /remote/path/to/file.txt && echo "File uploaded successfully"'

436

]

437

438

response = await v1.connect_get_namespaced_pod_exec(

439

name="my-pod",

440

namespace="default",

441

command=command,

442

stderr=True,

443

stdin=False,

444

stdout=True,

445

tty=False

446

)

447

448

# Monitor upload progress

449

while True:

450

try:

451

channel, data = await response.read_channel(timeout=30)

452

453

if channel == stream.STDOUT_CHANNEL:

454

output = data.decode('utf-8')

455

print(f"Output: {output}")

456

elif channel == stream.STDERR_CHANNEL:

457

error = data.decode('utf-8')

458

print(f"Error: {error}")

459

elif channel == stream.ERROR_CHANNEL:

460

error_info = ws_client.parse_error_data(data)

461

if error_info.get('status') == 'Success':

462

print("File upload completed")

463

break

464

else:

465

print(f"Upload failed: {error_info}")

466

break

467

468

except asyncio.TimeoutError:

469

print("Upload timeout")

470

break

471

472

finally:

473

await response.close()

474

await ws_client.close()

475

476

asyncio.run(upload_file_to_pod())

477

```