or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-workloads.mdautoscaling.mdconfiguration.mdcore-resources.mdcustom-resources.mddynamic-client.mdindex.mdleader-election.mdnetworking.mdrbac-security.mdresource-watching.mdstorage.mdstreaming-operations.mdutilities.md

streaming-operations.mddocs/

0

# Streaming Operations

1

2

Execute commands in running pods, stream container logs, and establish port forwarding connections. Essential for debugging, log analysis, and establishing secure connections to pod services through the Kubernetes API server.

3

4

## Capabilities

5

6

### Command Execution

7

8

Execute commands inside running containers and stream the results with support for stdin, stdout, stderr, and TTY.

9

10

```python { .api }

11

def stream(

12

ws_client,

13

channel: str,

14

*args,

15

stdin: bool = False,

16

stdout: bool = True,

17

stderr: bool = True,

18

tty: bool = False,

19

**kwargs

20

):

21

"""

22

Stream command execution or log output from pods.

23

24

Parameters:

25

- ws_client: WebSocket client connection

26

- channel: Channel type (stdin, stdout, stderr, error, resize)

27

- *args: Additional arguments

28

- stdin: Enable stdin stream

29

- stdout: Enable stdout stream

30

- stderr: Enable stderr stream

31

- tty: Allocate TTY

32

33

Returns:

34

Stream object for reading/writing data

35

"""

36

```

37

38

### Port Forwarding

39

40

Establish port forwarding connections to pods for accessing services running inside containers.

41

42

```python { .api }

43

def portforward(

44

api_instance,

45

name: str,

46

namespace: str,

47

ports: list,

48

**kwargs

49

):

50

"""

51

Create port forwarding connection to a pod.

52

53

Parameters:

54

- api_instance: CoreV1Api instance

55

- name: Pod name

56

- namespace: Pod namespace

57

- ports: List of port mappings

58

59

Returns:

60

PortForward connection object

61

"""

62

```

63

64

## Usage Examples

65

66

### Executing Commands in Pods

67

68

```python

69

from kubernetes import client, config

70

from kubernetes.stream import stream

71

72

config.load_kube_config()

73

v1 = client.CoreV1Api()

74

75

# Execute a simple command

76

exec_command = ['/bin/sh', '-c', 'echo "Hello from pod"']

77

78

resp = stream(

79

v1.connect_get_namespaced_pod_exec,

80

name="my-pod",

81

namespace="default",

82

command=exec_command,

83

stderr=True,

84

stdin=False,

85

stdout=True,

86

tty=False

87

)

88

89

print("Command output:")

90

print(resp)

91

```

92

93

### Interactive Shell Session

94

95

```python

96

from kubernetes import client, config

97

from kubernetes.stream import stream

98

import sys

99

import select

100

import termios

101

import tty

102

103

config.load_kube_config()

104

v1 = client.CoreV1Api()

105

106

def interactive_shell(pod_name, namespace="default", container=None):

107

"""Create interactive shell session with pod."""

108

109

# Store original terminal settings

110

old_tty = termios.tcgetattr(sys.stdin)

111

112

try:

113

# Set terminal to raw mode

114

tty.setraw(sys.stdin.fileno())

115

116

# Start exec with TTY

117

exec_command = ['/bin/bash']

118

resp = stream(

119

v1.connect_get_namespaced_pod_exec,

120

name=pod_name,

121

namespace=namespace,

122

container=container,

123

command=exec_command,

124

stderr=True,

125

stdin=True,

126

stdout=True,

127

tty=True,

128

_preload_content=False

129

)

130

131

# Handle input/output

132

while resp.is_open():

133

resp.update(timeout=1)

134

135

# Check for input

136

if select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []):

137

input_char = sys.stdin.read(1)

138

if input_char:

139

resp.write_stdin(input_char)

140

141

# Read output

142

if resp.peek_stdout():

143

print(resp.read_stdout(), end='')

144

if resp.peek_stderr():

145

print(resp.read_stderr(), end='', file=sys.stderr)

146

147

resp.close()

148

149

finally:

150

# Restore terminal settings

151

termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)

152

153

# Start interactive shell

154

interactive_shell("my-pod", "default")

155

```

156

157

### Streaming Container Logs

158

159

```python

160

from kubernetes import client, config

161

from kubernetes.stream import stream

162

163

config.load_kube_config()

164

v1 = client.CoreV1Api()

165

166

# Stream logs from a container

167

def stream_logs(pod_name, namespace="default", container=None, follow=True):

168

"""Stream logs from a pod container."""

169

170

w = stream(

171

v1.read_namespaced_pod_log,

172

name=pod_name,

173

namespace=namespace,

174

container=container,

175

follow=follow,

176

_preload_content=False

177

)

178

179

try:

180

for line in w.stream():

181

print(line.decode('utf-8').rstrip())

182

except KeyboardInterrupt:

183

w.close()

184

print("\nLog streaming stopped")

185

186

# Stream logs

187

stream_logs("my-pod", "default", follow=True)

188

```

189

190

### Port Forwarding to Pod

191

192

```python

193

from kubernetes import client, config

194

from kubernetes.stream import portforward

195

import requests

196

import threading

197

import time

198

199

config.load_kube_config()

200

v1 = client.CoreV1Api()

201

202

def create_port_forward(pod_name, namespace, local_port, pod_port):

203

"""Create port forward connection."""

204

205

# Start port forwarding in background thread

206

def port_forward_thread():

207

try:

208

pf = portforward(

209

v1.connect_get_namespaced_pod_portforward,

210

name=pod_name,

211

namespace=namespace,

212

ports=f"{local_port}:{pod_port}"

213

)

214

215

print(f"Port forwarding active: localhost:{local_port} -> {pod_name}:{pod_port}")

216

217

# Keep connection alive

218

while True:

219

time.sleep(1)

220

221

except Exception as e:

222

print(f"Port forwarding error: {e}")

223

224

# Start port forwarding

225

pf_thread = threading.Thread(target=port_forward_thread, daemon=True)

226

pf_thread.start()

227

228

# Wait for connection to establish

229

time.sleep(2)

230

231

return pf_thread

232

233

# Example: Forward local port 8080 to pod port 80

234

pf_thread = create_port_forward("nginx-pod", "default", 8080, 80)

235

236

# Use the forwarded connection

237

try:

238

response = requests.get("http://localhost:8080")

239

print(f"Response: {response.status_code}")

240

print(response.text[:200])

241

except requests.RequestException as e:

242

print(f"Request failed: {e}")

243

```

244

245

### Copying Files To/From Pods

246

247

```python

248

from kubernetes import client, config

249

from kubernetes.stream import stream

250

import os

251

import tarfile

252

import tempfile

253

254

config.load_kube_config()

255

v1 = client.CoreV1Api()

256

257

def copy_file_to_pod(pod_name, namespace, local_file, pod_path, container=None):

258

"""Copy file from local system to pod."""

259

260

# Create tar archive of the file

261

with tempfile.NamedTemporaryFile() as tar_buffer:

262

with tarfile.open(fileobj=tar_buffer, mode='w') as tar:

263

tar.add(local_file, arcname=os.path.basename(local_file))

264

265

tar_buffer.seek(0)

266

tar_data = tar_buffer.read()

267

268

# Execute tar command to extract file in pod

269

exec_command = ['tar', 'xf', '-', '-C', os.path.dirname(pod_path)]

270

271

resp = stream(

272

v1.connect_get_namespaced_pod_exec,

273

name=pod_name,

274

namespace=namespace,

275

container=container,

276

command=exec_command,

277

stderr=True,

278

stdin=True,

279

stdout=True,

280

tty=False,

281

_preload_content=False

282

)

283

284

# Send tar data to pod

285

resp.write_stdin(tar_data)

286

resp.close()

287

288

print(f"File copied to {pod_name}:{pod_path}")

289

290

def copy_file_from_pod(pod_name, namespace, pod_path, local_file, container=None):

291

"""Copy file from pod to local system."""

292

293

# Create tar archive of the file in pod

294

exec_command = ['tar', 'cf', '-', pod_path]

295

296

resp = stream(

297

v1.connect_get_namespaced_pod_exec,

298

name=pod_name,

299

namespace=namespace,

300

container=container,

301

command=exec_command,

302

stderr=True,

303

stdin=False,

304

stdout=True,

305

tty=False,

306

_preload_content=False

307

)

308

309

# Read tar data

310

tar_data = b''

311

while resp.is_open():

312

resp.update(timeout=1)

313

if resp.peek_stdout():

314

tar_data += resp.read_stdout()

315

316

# Extract file from tar data

317

with tempfile.NamedTemporaryFile() as tar_buffer:

318

tar_buffer.write(tar_data)

319

tar_buffer.seek(0)

320

321

with tarfile.open(fileobj=tar_buffer, mode='r') as tar:

322

tar.extractall(path=os.path.dirname(local_file))

323

324

print(f"File copied from {pod_name}:{pod_path} to {local_file}")

325

326

# Example usage

327

copy_file_to_pod("my-pod", "default", "/local/file.txt", "/tmp/file.txt")

328

copy_file_from_pod("my-pod", "default", "/tmp/output.txt", "/local/output.txt")

329

```

330

331

### Monitoring Pod Resource Usage

332

333

```python

334

from kubernetes import client, config

335

from kubernetes.stream import stream

336

import json

337

import time

338

339

config.load_kube_config()

340

v1 = client.CoreV1Api()

341

342

def monitor_pod_resources(pod_name, namespace="default", container=None):

343

"""Monitor pod resource usage using kubectl top equivalent."""

344

345

# Use metrics from /proc or system commands

346

exec_command = [

347

'sh', '-c',

348

'while true; do '

349

'echo "=== $(date) ==="; '

350

'cat /proc/meminfo | grep -E "MemTotal|MemAvailable"; '

351

'cat /proc/loadavg; '

352

'sleep 5; '

353

'done'

354

]

355

356

resp = stream(

357

v1.connect_get_namespaced_pod_exec,

358

name=pod_name,

359

namespace=namespace,

360

container=container,

361

command=exec_command,

362

stderr=True,

363

stdin=False,

364

stdout=True,

365

tty=False,

366

_preload_content=False

367

)

368

369

print(f"Monitoring resources for {pod_name}")

370

371

try:

372

while resp.is_open():

373

resp.update(timeout=1)

374

if resp.peek_stdout():

375

output = resp.read_stdout()

376

print(output.decode('utf-8'), end='')

377

378

time.sleep(1)

379

380

except KeyboardInterrupt:

381

resp.close()

382

print("\nMonitoring stopped")

383

384

# Monitor pod resources

385

monitor_pod_resources("my-pod", "default")

386

```

387

388

### Debug Pod with Ephemeral Container

389

390

```python

391

from kubernetes import client, config

392

from kubernetes.stream import stream

393

394

config.load_kube_config()

395

v1 = client.CoreV1Api()

396

397

def debug_pod_with_ephemeral(pod_name, namespace="default"):

398

"""Debug pod using ephemeral container (Kubernetes 1.23+)."""

399

400

# Get existing pod

401

pod = v1.read_namespaced_pod(name=pod_name, namespace=namespace)

402

403

# Add ephemeral container for debugging

404

ephemeral_container = {

405

"name": "debugger",

406

"image": "busybox:latest",

407

"command": ["/bin/sh"],

408

"stdin": True,

409

"tty": True,

410

"targetContainerName": pod.spec.containers[0].name

411

}

412

413

# Update pod with ephemeral container

414

if not pod.spec.ephemeral_containers:

415

pod.spec.ephemeral_containers = []

416

pod.spec.ephemeral_containers.append(ephemeral_container)

417

418

# Patch the pod

419

v1.patch_namespaced_pod(

420

name=pod_name,

421

namespace=namespace,

422

body=pod

423

)

424

425

print(f"Ephemeral container added to {pod_name}")

426

427

# Connect to the ephemeral container

428

resp = stream(

429

v1.connect_get_namespaced_pod_exec,

430

name=pod_name,

431

namespace=namespace,

432

container="debugger",

433

command=["/bin/sh"],

434

stderr=True,

435

stdin=True,

436

stdout=True,

437

tty=True,

438

_preload_content=False

439

)

440

441

print("Connected to debug container. Type 'exit' to quit.")

442

443

# Interactive session with debug container

444

try:

445

while resp.is_open():

446

resp.update(timeout=1)

447

448

if resp.peek_stdout():

449

print(resp.read_stdout().decode('utf-8'), end='')

450

if resp.peek_stderr():

451

print(resp.read_stderr().decode('utf-8'), end='')

452

453

except KeyboardInterrupt:

454

resp.close()

455

print("\nDebug session ended")

456

457

# Debug pod with ephemeral container

458

debug_pod_with_ephemeral("problematic-pod", "default")

459

```