or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

compression.mdcopc.mdcore-io.mddata-containers.mdindex.mdio-handlers.mdpoint-data.mdvlr.md

io-handlers.mddocs/

0

# I/O Handler Classes

1

2

Specialized reader, writer, and appender classes for different file access patterns including streaming, chunked processing, and memory mapping. These classes provide fine-grained control over LAS file I/O operations.

3

4

## Capabilities

5

6

### LAS File Reader

7

8

Streaming reader for efficient processing of large LAS files with chunked iteration support.

9

10

```python { .api }

11

class LasReader:

12

def __init__(self, source, closefd=True, laz_backend=None, read_evlrs=True, decompression_selection=None):

13

"""

14

Initialize LAS reader.

15

16

Parameters:

17

- source: BinaryIO - LAS/LAZ file stream

18

- closefd: bool - Whether to close file descriptor (default: True)

19

- laz_backend: LazBackend or list - Compression backend(s) to try

20

- read_evlrs: bool - Whether to read Extended VLRs (default: True)

21

- decompression_selection: DecompressionSelection - Fields to decompress

22

"""

23

24

@property

25

def evlrs(self) -> Optional[VLRList]:

26

"""Extended Variable Length Records."""

27

28

@property

29

def header(self) -> LasHeader:

30

"""LAS file header."""

31

32

def read_points(self, n: int) -> ScaleAwarePointRecord:

33

"""

34

Read specified number of points from current position.

35

36

Parameters:

37

- n: int - Number of points to read

38

39

Returns:

40

ScaleAwarePointRecord: Points read from file

41

"""

42

43

def read(self) -> LasData:

44

"""

45

Read entire file into LasData container.

46

47

Returns:

48

LasData: Complete LAS data

49

"""

50

51

def seek(self, pos: int, whence=io.SEEK_SET) -> int:

52

"""

53

Seek to specific point position.

54

55

Parameters:

56

- pos: int - Point position to seek to

57

- whence: int - Seek reference (SEEK_SET, SEEK_CUR, SEEK_END)

58

59

Returns:

60

int: New position

61

"""

62

63

def chunk_iterator(self, points_per_iteration: int) -> PointChunkIterator:

64

"""

65

Create iterator for chunked point reading.

66

67

Parameters:

68

- points_per_iteration: int - Points per chunk

69

70

Returns:

71

PointChunkIterator: Chunk iterator

72

"""

73

74

def read_evlrs(self):

75

"""Read Extended VLRs if not already loaded."""

76

77

def close(self):

78

"""Close reader and free resources."""

79

80

def __enter__(self) -> LasReader: ...

81

def __exit__(self, exc_type, exc_val, exc_tb): ...

82

83

class PointChunkIterator:

84

def __init__(self, reader: LasReader, points_per_iteration: int):

85

"""

86

Initialize chunk iterator.

87

88

Parameters:

89

- reader: LasReader - Reader to iterate over

90

- points_per_iteration: int - Points per chunk

91

"""

92

93

def __next__(self) -> ScaleAwarePointRecord:

94

"""Get next chunk of points."""

95

96

def __iter__(self) -> PointChunkIterator: ...

97

```

98

99

**Usage Examples:**

100

101

```python

102

import laspy

103

104

# Basic point reading

105

with laspy.open('data.las') as reader:

106

print(f"File has {reader.header.point_count} points")

107

108

# Read first 1000 points

109

first_chunk = reader.read_points(1000)

110

print(f"First chunk: {len(first_chunk)} points")

111

112

# Read next 1000 points

113

second_chunk = reader.read_points(1000)

114

print(f"Second chunk: {len(second_chunk)} points")

115

116

# Chunked processing for large files

117

with laspy.open('large.laz') as reader:

118

chunk_size = 100000

119

total_ground_points = 0

120

121

for chunk in reader.chunk_iterator(chunk_size):

122

# Count ground points in this chunk

123

ground_count = np.sum(chunk.classification == 2)

124

total_ground_points += ground_count

125

126

print(f"Chunk: {len(chunk)} points, {ground_count} ground points")

127

128

print(f"Total ground points in file: {total_ground_points}")

129

130

# Seeking to specific positions

131

with laspy.open('data.las') as reader:

132

# Jump to middle of file

133

mid_point = reader.header.point_count // 2

134

reader.seek(mid_point)

135

136

# Read 100 points from middle

137

middle_points = reader.read_points(100)

138

print(f"Points from middle: {len(middle_points)}")

139

```

140

141

### LAS File Writer

142

143

Writer for creating new LAS files with compression support and streaming capabilities.

144

145

```python { .api }

146

class LasWriter:

147

def __init__(self, dest, header: LasHeader, do_compress=None, laz_backend=None, closefd=True, encoding_errors="strict"):

148

"""

149

Initialize LAS writer.

150

151

Parameters:

152

- dest: BinaryIO - Output stream

153

- header: LasHeader - LAS header for new file

154

- do_compress: bool - Force compression on/off (optional)

155

- laz_backend: LazBackend - Compression backend (optional)

156

- closefd: bool - Whether to close file descriptor (default: True)

157

- encoding_errors: str - How to handle encoding errors (default: "strict")

158

"""

159

160

def write_points(self, points: PackedPointRecord):

161

"""

162

Write points to file.

163

164

Parameters:

165

- points: PackedPointRecord - Points to write

166

"""

167

168

def write_evlrs(self, evlrs: VLRList):

169

"""

170

Write Extended VLRs.

171

172

Parameters:

173

- evlrs: VLRList - Extended VLRs to write

174

"""

175

176

def close(self):

177

"""Close writer and finalize file."""

178

179

def __enter__(self) -> LasWriter: ...

180

def __exit__(self, exc_type, exc_val, exc_tb): ...

181

```

182

183

**Usage Examples:**

184

185

```python

186

import laspy

187

import numpy as np

188

189

# Create new LAS file

190

header = laspy.LasHeader(point_format=3, version=(1, 2))

191

header.scales = np.array([0.01, 0.01, 0.001])

192

header.offsets = np.array([0, 0, 0])

193

194

with laspy.open('output.las', mode='w', header=header) as writer:

195

# Generate some test points

196

n_points = 10000

197

points = laspy.ScaleAwarePointRecord.zeros(n_points, header=header)

198

199

# Set coordinates

200

points.x = np.random.uniform(0, 1000, n_points)

201

points.y = np.random.uniform(0, 1000, n_points)

202

points.z = np.random.uniform(0, 100, n_points)

203

points.classification = np.random.choice([1, 2, 3], n_points)

204

205

# Write all points at once

206

writer.write_points(points)

207

208

# Streaming write for large datasets

209

def generate_points(count, chunk_size=10000):

210

"""Generator for large point datasets."""

211

for i in range(0, count, chunk_size):

212

current_chunk_size = min(chunk_size, count - i)

213

214

# Create chunk

215

chunk = laspy.ScaleAwarePointRecord.zeros(current_chunk_size, header=header)

216

chunk.x = np.random.uniform(0, 1000, current_chunk_size)

217

chunk.y = np.random.uniform(0, 1000, current_chunk_size)

218

chunk.z = np.random.uniform(0, 100, current_chunk_size)

219

chunk.classification = np.random.choice([1, 2, 3], current_chunk_size)

220

221

yield chunk

222

223

# Write large dataset in chunks

224

header = laspy.LasHeader(point_format=3)

225

with laspy.open('large_output.laz', mode='w', header=header, do_compress=True) as writer:

226

total_points = 1000000

227

228

for chunk in generate_points(total_points):

229

writer.write_points(chunk)

230

print(f"Written {len(chunk)} points")

231

```

232

233

### LAS File Appender

234

235

Append points to existing LAS files while preserving original structure.

236

237

```python { .api }

238

class LasAppender:

239

def __init__(self, dest, laz_backend=None, closefd=True, encoding_errors="strict"):

240

"""

241

Initialize LAS appender.

242

243

Parameters:

244

- dest: BinaryIO - LAS file to append to

245

- laz_backend: LazBackend - Compression backend (optional)

246

- closefd: bool - Whether to close file descriptor (default: True)

247

- encoding_errors: str - How to handle encoding errors (default: "strict")

248

"""

249

250

def append_points(self, points: PackedPointRecord):

251

"""

252

Append points to existing file.

253

254

Parameters:

255

- points: PackedPointRecord - Points to append

256

"""

257

258

def close(self):

259

"""Close appender and update file header."""

260

261

def __enter__(self) -> LasAppender: ...

262

def __exit__(self, exc_type, exc_val, exc_tb): ...

263

```

264

265

**Usage Examples:**

266

267

```python

268

import laspy

269

import numpy as np

270

271

# Append points from multiple sources

272

def append_multiple_files(target_file, source_files):

273

"""Append points from multiple source files to target."""

274

275

with laspy.open(target_file, mode='a') as appender:

276

total_appended = 0

277

278

for source_file in source_files:

279

print(f"Processing {source_file}")

280

281

with laspy.open(source_file) as reader:

282

# Process in chunks to manage memory

283

for chunk in reader.chunk_iterator(50000):

284

appender.append_points(chunk)

285

total_appended += len(chunk)

286

287

print(f"Appended {reader.header.point_count} points from {source_file}")

288

289

print(f"Total points appended: {total_appended}")

290

291

# Usage

292

source_files = ['file1.las', 'file2.las', 'file3.las']

293

append_multiple_files('combined.las', source_files)

294

295

# Selective appending with filtering

296

with laspy.open('input.las') as reader:

297

with laspy.open('target.las', mode='a') as appender:

298

for chunk in reader.chunk_iterator(100000):

299

# Only append ground points

300

ground_points = chunk[chunk.classification == 2]

301

if len(ground_points) > 0:

302

appender.append_points(ground_points)

303

print(f"Appended {len(ground_points)} ground points")

304

```

305

306

### Memory-Mapped LAS Files

307

308

Memory-mapped access for efficient random access to large uncompressed LAS files.

309

310

```python { .api }

311

class LasMMAP(LasData):

312

def __init__(self, filename):

313

"""

314

Memory-map LAS file.

315

316

Parameters:

317

- filename: str or Path - LAS file to memory-map

318

319

Note: Only works with uncompressed LAS files

320

"""

321

322

def close(self):

323

"""Close memory mapping."""

324

325

def __enter__(self) -> LasMMAP: ...

326

def __exit__(self, exc_type, exc_val, exc_tb): ...

327

```

328

329

**Usage Examples:**

330

331

```python

332

import laspy

333

import numpy as np

334

335

# Memory-mapped random access

336

with laspy.mmap('large_file.las') as las:

337

print(f"Memory-mapped file with {len(las)} points")

338

339

# Random access to points

340

indices = np.random.choice(len(las), 1000, replace=False)

341

sample_points = las.points[indices]

342

343

print(f"Sampled {len(sample_points)} random points")

344

print(f"Sample coordinate range: {sample_points.x.min()}-{sample_points.x.max()}")

345

346

# Spatial filtering (efficient with memory mapping)

347

x_mask = (las.x >= 1000) & (las.x <= 2000)

348

y_mask = (las.y >= 2000) & (las.y <= 3000)

349

spatial_mask = x_mask & y_mask

350

351

filtered_points = las.points[spatial_mask]

352

print(f"Spatial filter found {len(filtered_points)} points")

353

354

# In-place modification with memory mapping

355

def classify_by_height(las_file, ground_threshold=2.0):

356

"""Classify points by height using memory mapping."""

357

358

with laspy.mmap(las_file) as las:

359

print(f"Classifying {len(las)} points by height")

360

361

# Find ground points (below threshold)

362

ground_mask = las.z < ground_threshold

363

364

# Modify classification in-place

365

las.classification[ground_mask] = 2 # Ground class

366

las.classification[~ground_mask] = 1 # Unclassified

367

368

ground_count = np.sum(ground_mask)

369

print(f"Classified {ground_count} points as ground")

370

print(f"Classified {len(las) - ground_count} points as above-ground")

371

372

# Note: Memory mapping only works with uncompressed LAS files

373

classify_by_height('uncompressed.las')

374

```

375

376

## Advanced I/O Patterns

377

378

### Pipeline Processing

379

380

```python

381

import laspy

382

from typing import Iterator, Callable

383

384

def create_processing_pipeline(input_file: str,

385

output_file: str,

386

processors: list[Callable],

387

chunk_size: int = 100000):

388

"""Create processing pipeline for large LAS files."""

389

390

with laspy.open(input_file) as reader:

391

header = reader.header.copy()

392

393

with laspy.open(output_file, mode='w', header=header) as writer:

394

total_processed = 0

395

396

for chunk in reader.chunk_iterator(chunk_size):

397

# Apply all processors to chunk

398

processed_chunk = chunk

399

for processor in processors:

400

processed_chunk = processor(processed_chunk)

401

402

# Write processed chunk

403

if len(processed_chunk) > 0:

404

writer.write_points(processed_chunk)

405

total_processed += len(processed_chunk)

406

407

print(f"Processed {total_processed} points")

408

409

# Example processors

410

def normalize_intensity(points):

411

"""Normalize intensity values to 0-65535 range."""

412

if hasattr(points, 'intensity') and len(points) > 0:

413

max_val = points.intensity.max()

414

if max_val > 0:

415

points.intensity = (points.intensity / max_val * 65535).astype(np.uint16)

416

return points

417

418

def filter_outliers(points):

419

"""Remove statistical outliers based on Z coordinate."""

420

if len(points) == 0:

421

return points

422

423

z_mean = points.z.mean()

424

z_std = points.z.std()

425

426

# Keep points within 3 standard deviations

427

mask = np.abs(points.z - z_mean) <= 3 * z_std

428

return points[mask]

429

430

def ground_classification(points):

431

"""Simple ground classification based on Z percentile."""

432

if len(points) == 0:

433

return points

434

435

ground_threshold = np.percentile(points.z, 10)

436

points.classification[points.z <= ground_threshold] = 2 # Ground

437

return points

438

439

# Use pipeline

440

processors = [normalize_intensity, filter_outliers, ground_classification]

441

create_processing_pipeline('input.las', 'processed.las', processors)

442

```

443

444

### Parallel Processing

445

446

```python

447

import laspy

448

import numpy as np

449

from concurrent.futures import ThreadPoolExecutor, as_completed

450

451

def parallel_chunk_processing(input_file: str,

452

output_file: str,

453

processor_func: Callable,

454

chunk_size: int = 50000,

455

max_workers: int = 4):

456

"""Process LAS file chunks in parallel."""

457

458

with laspy.open(input_file) as reader:

459

header = reader.header.copy()

460

461

# Read all chunks first (for parallel processing)

462

chunks = []

463

for chunk in reader.chunk_iterator(chunk_size):

464

chunks.append(chunk.copy()) # Copy to avoid memory mapping issues

465

466

print(f"Processing {len(chunks)} chunks with {max_workers} workers")

467

468

# Process chunks in parallel

469

processed_chunks = []

470

with ThreadPoolExecutor(max_workers=max_workers) as executor:

471

# Submit all chunks for processing

472

future_to_chunk = {

473

executor.submit(processor_func, chunk): i

474

for i, chunk in enumerate(chunks)

475

}

476

477

# Collect results in order

478

results = [None] * len(chunks)

479

for future in as_completed(future_to_chunk):

480

chunk_idx = future_to_chunk[future]

481

try:

482

results[chunk_idx] = future.result()

483

except Exception as e:

484

print(f"Chunk {chunk_idx} failed: {e}")

485

results[chunk_idx] = chunks[chunk_idx] # Use original

486

487

# Write results

488

with laspy.open(output_file, mode='w', header=header) as writer:

489

total_written = 0

490

for result_chunk in results:

491

if result_chunk is not None and len(result_chunk) > 0:

492

writer.write_points(result_chunk)

493

total_written += len(result_chunk)

494

495

print(f"Wrote {total_written} processed points")

496

497

def intensive_processor(points):

498

"""Example computationally intensive processor."""

499

if len(points) == 0:

500

return points

501

502

# Simulate intensive computation (e.g., complex filtering)

503

# This would be replaced with actual processing logic

504

import time

505

time.sleep(0.01) # Simulate processing time

506

507

# Example: smooth Z coordinates using rolling mean

508

window_size = min(100, len(points))

509

if window_size > 1:

510

smoothed_z = np.convolve(points.z, np.ones(window_size)/window_size, mode='same')

511

points.z = smoothed_z.astype(points.z.dtype)

512

513

return points

514

515

# Use parallel processing

516

parallel_chunk_processing('large_input.las', 'processed_output.las', intensive_processor)

517

```