or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

addons.mdcommands.mdconfiguration.mdconnections.mdcontent.mdflow-io.mdhttp-flows.mdindex.mdprotocols.md

content.mddocs/

0

# Content Processing

1

2

Flexible content viewing, transformation, and analysis with support for various data formats and encoding schemes. Includes syntax highlighting, interactive content exploration, and extensible content view system.

3

4

## Capabilities

5

6

### Content View System

7

8

Extensible system for viewing and processing different content types.

9

10

```python { .api }

11

class Contentview:

12

"""

13

Base class for content viewers.

14

15

Content viewers transform raw bytes into human-readable representations

16

with optional syntax highlighting and formatting.

17

"""

18

name: str

19

content_types: List[str]

20

21

def __call__(self, data: bytes, **metadata) -> Tuple[str, Iterator[Tuple[str, bytes]]]:

22

"""

23

Transform content for viewing.

24

25

Parameters:

26

- data: Raw content bytes

27

- **metadata: Additional metadata (content_type, etc.)

28

29

Returns:

30

- Tuple of (description, formatted_lines)

31

"""

32

33

class InteractiveContentview(Contentview):

34

"""

35

Interactive content viewer with user input handling.

36

37

Extends basic content viewing with interactive capabilities

38

for exploring complex data structures.

39

"""

40

def render_priority(self, data: bytes, **metadata) -> float:

41

"""

42

Return priority for this viewer (higher = preferred).

43

44

Parameters:

45

- data: Content to potentially view

46

- **metadata: Content metadata

47

48

Returns:

49

- Priority score (0.0 to 1.0)

50

"""

51

52

class SyntaxHighlight(Contentview):

53

"""

54

Syntax highlighting content viewer.

55

56

Provides syntax highlighting for code and structured data formats.

57

"""

58

59

def add(view: Contentview) -> None:

60

"""

61

Register a custom content view.

62

63

Parameters:

64

- view: Content view instance to register

65

"""

66

67

class Metadata:

68

"""

69

Content metadata container.

70

71

Holds information about content type, encoding, and other properties

72

used by content viewers for processing decisions.

73

"""

74

content_type: Optional[str]

75

charset: Optional[str]

76

filename: Optional[str]

77

size: int

78

```

79

80

### Encoding Utilities

81

82

Content encoding and decoding support for various compression and transformation schemes.

83

84

```python { .api }

85

def encode(data: bytes, encoding: str) -> bytes:

86

"""

87

Encode content using specified encoding scheme.

88

89

Parameters:

90

- data: Raw content bytes to encode

91

- encoding: Encoding scheme name (gzip, deflate, brotli, etc.)

92

93

Returns:

94

- Encoded content bytes

95

96

Raises:

97

- ValueError: If encoding scheme is not supported

98

"""

99

100

def decode(data: bytes, encoding: str) -> bytes:

101

"""

102

Decode content using specified encoding scheme.

103

104

Parameters:

105

- data: Encoded content bytes to decode

106

- encoding: Encoding scheme name (gzip, deflate, brotli, etc.)

107

108

Returns:

109

- Decoded content bytes

110

111

Raises:

112

- ValueError: If encoding scheme is not supported or data is invalid

113

"""

114

115

# Supported encoding schemes

116

ENCODINGS = {

117

"gzip": "GNU zip compression",

118

"deflate": "DEFLATE compression",

119

"brotli": "Brotli compression",

120

"identity": "No encoding (pass-through)",

121

"compress": "Unix compress format",

122

"x-gzip": "Legacy gzip",

123

"x-deflate": "Legacy deflate"

124

}

125

```

126

127

## Usage Examples

128

129

### Custom Content Viewer

130

131

```python

132

from mitmproxy import contentviews

133

from mitmproxy.contentviews import base

134

import json

135

import yaml

136

137

class YAMLContentView(base.Contentview):

138

"""Custom content viewer for YAML files."""

139

140

name = "YAML"

141

content_types = ["application/yaml", "application/x-yaml", "text/yaml"]

142

143

def __call__(self, data, **metadata):

144

try:

145

# Parse YAML content

146

parsed = yaml.safe_load(data.decode('utf-8'))

147

148

# Convert to pretty-printed JSON for display

149

formatted = json.dumps(parsed, indent=2, ensure_ascii=False)

150

151

# Return formatted content with syntax highlighting

152

lines = []

153

for i, line in enumerate(formatted.split('\n')):

154

# Simple syntax highlighting for JSON

155

if line.strip().startswith('"') and ':' in line:

156

# Key lines

157

lines.append(("text", f"{i+1:4d} "), ("key", line.encode('utf-8')))

158

elif line.strip() in ['{', '}', '[', ']']:

159

# Structural lines

160

lines.append(("text", f"{i+1:4d} "), ("punctuation", line.encode('utf-8')))

161

else:

162

# Value lines

163

lines.append(("text", f"{i+1:4d} "), ("value", line.encode('utf-8')))

164

165

return "YAML", lines

166

167

except (yaml.YAMLError, UnicodeDecodeError) as e:

168

return "YAML (parse error)", [("error", str(e).encode('utf-8'))]

169

170

def render_priority(self, data, **metadata):

171

# High priority for YAML content types

172

content_type = metadata.get("content_type", "")

173

if any(ct in content_type for ct in self.content_types):

174

return 0.9

175

176

# Medium priority if content looks like YAML

177

try:

178

text = data.decode('utf-8')

179

if any(indicator in text[:100] for indicator in ['---', '- ', ': ']):

180

return 0.5

181

except UnicodeDecodeError:

182

pass

183

184

return 0.0

185

186

# Register the custom viewer

187

contentviews.add(YAMLContentView())

188

189

class XMLContentView(base.Contentview):

190

"""Custom content viewer for XML with pretty printing."""

191

192

name = "XML Pretty"

193

content_types = ["application/xml", "text/xml"]

194

195

def __call__(self, data, **metadata):

196

try:

197

import xml.etree.ElementTree as ET

198

from xml.dom import minidom

199

200

# Parse and pretty-print XML

201

root = ET.fromstring(data)

202

rough_string = ET.tostring(root, encoding='unicode')

203

reparsed = minidom.parseString(rough_string)

204

pretty = reparsed.toprettyxml(indent=" ")

205

206

# Remove empty lines

207

lines = [line for line in pretty.split('\n') if line.strip()]

208

209

# Format for display with line numbers

210

formatted_lines = []

211

for i, line in enumerate(lines):

212

formatted_lines.append(("text", f"{i+1:4d} "), ("xml", line.encode('utf-8')))

213

214

return f"XML ({len(lines)} lines)", formatted_lines

215

216

except ET.ParseError as e:

217

return "XML (parse error)", [("error", str(e).encode('utf-8'))]

218

219

def render_priority(self, data, **metadata):

220

content_type = metadata.get("content_type", "")

221

if any(ct in content_type for ct in self.content_types):

222

return 0.8

223

224

# Check if content starts with XML declaration

225

try:

226

text = data.decode('utf-8').strip()

227

if text.startswith('<?xml') or text.startswith('<'):

228

return 0.6

229

except UnicodeDecodeError:

230

pass

231

232

return 0.0

233

234

contentviews.add(XMLContentView())

235

```

236

237

### Content Processing in Addons

238

239

```python

240

from mitmproxy import http, contentviews

241

from mitmproxy.net import encoding

242

import mitmproxy.ctx as ctx

243

import gzip

244

import json

245

246

class ContentProcessorAddon:

247

"""Addon for comprehensive content processing."""

248

249

def response(self, flow: http.HTTPFlow):

250

"""Process response content."""

251

if not flow.response:

252

return

253

254

content_type = flow.response.headers.get("content-type", "")

255

content_encoding = flow.response.headers.get("content-encoding", "")

256

257

# Decode compressed content

258

if content_encoding:

259

try:

260

decoded_content = encoding.decode(flow.response.content, content_encoding)

261

ctx.log.info(f"Decoded {content_encoding} content: {len(flow.response.content)} -> {len(decoded_content)} bytes")

262

263

# Store original for potential re-encoding

264

flow.metadata["original_encoding"] = content_encoding

265

flow.metadata["original_content"] = flow.response.content

266

267

# Update response with decoded content

268

flow.response.content = decoded_content

269

del flow.response.headers["content-encoding"]

270

271

except ValueError as e:

272

ctx.log.error(f"Failed to decode {content_encoding}: {e}")

273

274

# Process JSON content

275

if "application/json" in content_type:

276

self.process_json_content(flow)

277

278

# Process HTML content

279

elif "text/html" in content_type:

280

self.process_html_content(flow)

281

282

# Process image content

283

elif content_type.startswith("image/"):

284

self.process_image_content(flow)

285

286

def process_json_content(self, flow: http.HTTPFlow):

287

"""Process JSON response content."""

288

try:

289

data = flow.response.json()

290

291

# Log JSON structure

292

ctx.log.info(f"JSON response structure: {type(data).__name__}")

293

if isinstance(data, dict):

294

ctx.log.info(f"JSON keys: {list(data.keys())}")

295

elif isinstance(data, list):

296

ctx.log.info(f"JSON array length: {len(data)}")

297

298

# Pretty-print JSON for debugging

299

pretty_json = json.dumps(data, indent=2, ensure_ascii=False)

300

ctx.log.info(f"JSON content preview:\n{pretty_json[:500]}...")

301

302

# Could modify JSON data here

303

if isinstance(data, dict) and "debug" not in data:

304

data["debug"] = {"processed_by": "mitmproxy", "timestamp": flow.response.timestamp_start}

305

flow.response.set_text(json.dumps(data))

306

307

except ValueError as e:

308

ctx.log.error(f"Invalid JSON in response: {e}")

309

310

def process_html_content(self, flow: http.HTTPFlow):

311

"""Process HTML response content."""

312

try:

313

html_content = flow.response.get_text()

314

315

# Log HTML info

316

title_start = html_content.find("<title>")

317

title_end = html_content.find("</title>")

318

if title_start != -1 and title_end != -1:

319

title = html_content[title_start + 7:title_end]

320

ctx.log.info(f"HTML page title: {title}")

321

322

# Count common elements

323

element_counts = {

324

"links": html_content.count("<a "),

325

"images": html_content.count("<img "),

326

"scripts": html_content.count("<script"),

327

"forms": html_content.count("<form")

328

}

329

ctx.log.info(f"HTML elements: {element_counts}")

330

331

except UnicodeDecodeError as e:

332

ctx.log.error(f"Failed to decode HTML: {e}")

333

334

def process_image_content(self, flow: http.HTTPFlow):

335

"""Process image response content."""

336

content_type = flow.response.headers.get("content-type", "")

337

content_size = len(flow.response.content)

338

339

ctx.log.info(f"Image: {content_type}, {content_size} bytes")

340

341

# Could analyze image properties here

342

if content_type == "image/jpeg":

343

# Simple JPEG header analysis

344

if flow.response.content.startswith(b'\xff\xd8\xff'):

345

ctx.log.info("Valid JPEG header detected")

346

elif content_type == "image/png":

347

# PNG header analysis

348

if flow.response.content.startswith(b'\x89PNG\r\n\x1a\n'):

349

ctx.log.info("Valid PNG header detected")

350

351

addons = [ContentProcessorAddon()]

352

```

353

354

### Advanced Content Analysis

355

356

```python

357

from mitmproxy import http

358

import mitmproxy.ctx as ctx

359

import hashlib

360

import magic # python-magic library for file type detection

361

import re

362

363

class ContentAnalyzerAddon:

364

"""Advanced content analysis and classification."""

365

366

def __init__(self):

367

self.content_stats = {

368

"total_bytes": 0,

369

"content_types": {},

370

"encodings": {},

371

"file_types": {}

372

}

373

374

def response(self, flow: http.HTTPFlow):

375

"""Analyze response content comprehensively."""

376

if not flow.response or not flow.response.content:

377

return

378

379

content = flow.response.content

380

content_size = len(content)

381

content_type = flow.response.headers.get("content-type", "unknown")

382

content_encoding = flow.response.headers.get("content-encoding", "none")

383

384

# Update statistics

385

self.content_stats["total_bytes"] += content_size

386

self.content_stats["content_types"][content_type] = self.content_stats["content_types"].get(content_type, 0) + 1

387

self.content_stats["encodings"][content_encoding] = self.content_stats["encodings"].get(content_encoding, 0) + 1

388

389

# Detect actual file type using magic numbers

390

try:

391

detected_type = magic.from_buffer(content, mime=True)

392

self.content_stats["file_types"][detected_type] = self.content_stats["file_types"].get(detected_type, 0) + 1

393

394

# Check for content type mismatch

395

if detected_type != content_type.split(';')[0]:

396

ctx.log.warn(f"Content type mismatch: declared={content_type}, detected={detected_type}")

397

398

except Exception as e:

399

ctx.log.error(f"File type detection failed: {e}")

400

401

# Calculate content hash

402

content_hash = hashlib.sha256(content).hexdigest()[:16]

403

404

# Security analysis

405

self.analyze_security(flow, content, content_type)

406

407

# Performance analysis

408

self.analyze_performance(flow, content, content_size)

409

410

# Log analysis summary

411

ctx.log.info(f"Content analysis: {flow.request.url}")

412

ctx.log.info(f" Size: {content_size} bytes, Type: {content_type}")

413

ctx.log.info(f" Hash: {content_hash}, Encoding: {content_encoding}")

414

415

def analyze_security(self, flow, content, content_type):

416

"""Analyze content for security issues."""

417

security_issues = []

418

419

# Check for potential XSS in HTML

420

if "text/html" in content_type:

421

try:

422

html_text = content.decode('utf-8', errors='ignore')

423

424

# Simple XSS pattern detection

425

xss_patterns = [

426

r'<script[^>]*>.*?javascript:',

427

r'on\w+\s*=\s*["\'].*?javascript:',

428

r'<iframe[^>]*src\s*=\s*["\']javascript:',

429

]

430

431

for pattern in xss_patterns:

432

if re.search(pattern, html_text, re.IGNORECASE | re.DOTALL):

433

security_issues.append("Potential XSS vector detected")

434

break

435

436

# Check for inline scripts

437

if '<script' in html_text and 'javascript:' in html_text:

438

security_issues.append("Inline JavaScript detected")

439

440

except UnicodeDecodeError:

441

pass

442

443

# Check for exposed sensitive data in JSON

444

elif "application/json" in content_type:

445

try:

446

json_text = content.decode('utf-8', errors='ignore').lower()

447

448

sensitive_keywords = ['password', 'token', 'secret', 'key', 'api_key', 'private']

449

for keyword in sensitive_keywords:

450

if keyword in json_text:

451

security_issues.append(f"Potentially sensitive data: {keyword}")

452

453

except UnicodeDecodeError:

454

pass

455

456

# Log security issues

457

if security_issues:

458

ctx.log.warn(f"Security analysis for {flow.request.url}:")

459

for issue in security_issues:

460

ctx.log.warn(f" - {issue}")

461

462

def analyze_performance(self, flow, content, content_size):

463

"""Analyze content for performance implications."""

464

performance_notes = []

465

466

# Large content warning

467

if content_size > 1024 * 1024: # > 1MB

468

performance_notes.append(f"Large response: {content_size / (1024*1024):.2f} MB")

469

470

# Check compression effectiveness

471

content_encoding = flow.response.headers.get("content-encoding", "")

472

if not content_encoding and content_size > 1024: # > 1KB uncompressed

473

performance_notes.append("Content could benefit from compression")

474

475

# Check caching headers

476

cache_control = flow.response.headers.get("cache-control", "")

477

expires = flow.response.headers.get("expires", "")

478

etag = flow.response.headers.get("etag", "")

479

480

if not any([cache_control, expires, etag]):

481

performance_notes.append("No caching headers present")

482

483

# Log performance notes

484

if performance_notes:

485

ctx.log.info(f"Performance analysis for {flow.request.url}:")

486

for note in performance_notes:

487

ctx.log.info(f" - {note}")

488

489

def done(self):

490

"""Log final content statistics."""

491

stats = self.content_stats

492

ctx.log.info("Content Analysis Summary:")

493

ctx.log.info(f" Total bytes processed: {stats['total_bytes']:,}")

494

ctx.log.info(f" Unique content types: {len(stats['content_types'])}")

495

ctx.log.info(f" Most common content type: {max(stats['content_types'], key=stats['content_types'].get) if stats['content_types'] else 'None'}")

496

ctx.log.info(f" Encoding distribution: {dict(list(stats['encodings'].items())[:5])}")

497

498

addons = [ContentAnalyzerAddon()]

499

```

500

501

### Content Transformation

502

503

```python

504

from mitmproxy import http

505

from mitmproxy.net import encoding

506

import mitmproxy.ctx as ctx

507

import json

508

import re

509

510

class ContentTransformerAddon:

511

"""Transform content based on rules and filters."""

512

513

def __init__(self):

514

self.transformation_rules = {

515

# URL pattern -> transformation function

516

r".*\.json$": self.transform_json,

517

r".*/api/.*": self.transform_api_response,

518

r".*\.html$": self.transform_html,

519

}

520

521

def response(self, flow: http.HTTPFlow):

522

"""Apply content transformations based on URL patterns."""

523

if not flow.response:

524

return

525

526

url = flow.request.url

527

528

# Find matching transformation rules

529

for pattern, transform_func in self.transformation_rules.items():

530

if re.match(pattern, url):

531

try:

532

transform_func(flow)

533

except Exception as e:

534

ctx.log.error(f"Transformation failed for {url}: {e}")

535

536

def transform_json(self, flow: http.HTTPFlow):

537

"""Transform JSON responses."""

538

try:

539

data = flow.response.json()

540

541

# Add metadata to all JSON responses

542

if isinstance(data, dict):

543

data["_metadata"] = {

544

"processed_by": "mitmproxy",

545

"original_size": len(flow.response.content),

546

"url": flow.request.url

547

}

548

549

# Pretty-format JSON

550

flow.response.set_text(json.dumps(data, indent=2, ensure_ascii=False))

551

552

ctx.log.info(f"Transformed JSON response: {flow.request.url}")

553

554

except ValueError:

555

ctx.log.warn(f"Failed to parse JSON: {flow.request.url}")

556

557

def transform_api_response(self, flow: http.HTTPFlow):

558

"""Transform API responses with additional headers."""

559

# Add API processing headers

560

flow.response.headers["X-API-Processed"] = "true"

561

flow.response.headers["X-Processing-Time"] = str(int(time.time()))

562

563

# Add CORS headers for development

564

flow.response.headers["Access-Control-Allow-Origin"] = "*"

565

flow.response.headers["Access-Control-Allow-Methods"] = "GET,POST,PUT,DELETE,OPTIONS"

566

flow.response.headers["Access-Control-Allow-Headers"] = "Content-Type,Authorization"

567

568

ctx.log.info(f"Transformed API response: {flow.request.url}")

569

570

def transform_html(self, flow: http.HTTPFlow):

571

"""Transform HTML responses."""

572

try:

573

html_content = flow.response.get_text()

574

575

# Inject debugging script

576

debug_script = """

577

<script>

578

console.log('Page processed by mitmproxy');

579

window.mitmproxy_processed = true;

580

</script>

581

"""

582

583

# Insert before closing </body> tag

584

if "</body>" in html_content:

585

html_content = html_content.replace("</body>", debug_script + "</body>")

586

else:

587

html_content += debug_script

588

589

# Add meta tag

590

meta_tag = '<meta name="processed-by" content="mitmproxy">'

591

if "<head>" in html_content:

592

html_content = html_content.replace("<head>", "<head>" + meta_tag)

593

594

flow.response.set_text(html_content)

595

596

ctx.log.info(f"Transformed HTML response: {flow.request.url}")

597

598

except UnicodeDecodeError:

599

ctx.log.warn(f"Failed to decode HTML: {flow.request.url}")

600

601

addons = [ContentTransformerAddon()]

602

```