0
# I/O Handler Classes
1
2
Specialized reader, writer, and appender classes for different file access patterns including streaming, chunked processing, and memory mapping. These classes provide fine-grained control over LAS file I/O operations.
3
4
## Capabilities
5
6
### LAS File Reader
7
8
Streaming reader for efficient processing of large LAS files with chunked iteration support.
9
10
```python { .api }
11
class LasReader:
12
def __init__(self, source, closefd=True, laz_backend=None, read_evlrs=True, decompression_selection=None):
13
"""
14
Initialize LAS reader.
15
16
Parameters:
17
- source: BinaryIO - LAS/LAZ file stream
18
- closefd: bool - Whether to close file descriptor (default: True)
19
- laz_backend: LazBackend or list - Compression backend(s) to try
20
- read_evlrs: bool - Whether to read Extended VLRs (default: True)
21
- decompression_selection: DecompressionSelection - Fields to decompress
22
"""
23
24
@property
25
def evlrs(self) -> Optional[VLRList]:
26
"""Extended Variable Length Records."""
27
28
@property
29
def header(self) -> LasHeader:
30
"""LAS file header."""
31
32
def read_points(self, n: int) -> ScaleAwarePointRecord:
33
"""
34
Read specified number of points from current position.
35
36
Parameters:
37
- n: int - Number of points to read
38
39
Returns:
40
ScaleAwarePointRecord: Points read from file
41
"""
42
43
def read(self) -> LasData:
44
"""
45
Read entire file into LasData container.
46
47
Returns:
48
LasData: Complete LAS data
49
"""
50
51
def seek(self, pos: int, whence=io.SEEK_SET) -> int:
52
"""
53
Seek to specific point position.
54
55
Parameters:
56
- pos: int - Point position to seek to
57
- whence: int - Seek reference (SEEK_SET, SEEK_CUR, SEEK_END)
58
59
Returns:
60
int: New position
61
"""
62
63
def chunk_iterator(self, points_per_iteration: int) -> PointChunkIterator:
64
"""
65
Create iterator for chunked point reading.
66
67
Parameters:
68
- points_per_iteration: int - Points per chunk
69
70
Returns:
71
PointChunkIterator: Chunk iterator
72
"""
73
74
def read_evlrs(self):
75
"""Read Extended VLRs if not already loaded."""
76
77
def close(self):
78
"""Close reader and free resources."""
79
80
def __enter__(self) -> LasReader: ...
81
def __exit__(self, exc_type, exc_val, exc_tb): ...
82
83
class PointChunkIterator:
84
def __init__(self, reader: LasReader, points_per_iteration: int):
85
"""
86
Initialize chunk iterator.
87
88
Parameters:
89
- reader: LasReader - Reader to iterate over
90
- points_per_iteration: int - Points per chunk
91
"""
92
93
def __next__(self) -> ScaleAwarePointRecord:
94
"""Get next chunk of points."""
95
96
def __iter__(self) -> PointChunkIterator: ...
97
```
98
99
**Usage Examples:**
100
101
```python
102
import laspy
103
104
# Basic point reading
105
with laspy.open('data.las') as reader:
106
print(f"File has {reader.header.point_count} points")
107
108
# Read first 1000 points
109
first_chunk = reader.read_points(1000)
110
print(f"First chunk: {len(first_chunk)} points")
111
112
# Read next 1000 points
113
second_chunk = reader.read_points(1000)
114
print(f"Second chunk: {len(second_chunk)} points")
115
116
# Chunked processing for large files
117
with laspy.open('large.laz') as reader:
118
chunk_size = 100000
119
total_ground_points = 0
120
121
for chunk in reader.chunk_iterator(chunk_size):
122
# Count ground points in this chunk
123
ground_count = np.sum(chunk.classification == 2)
124
total_ground_points += ground_count
125
126
print(f"Chunk: {len(chunk)} points, {ground_count} ground points")
127
128
print(f"Total ground points in file: {total_ground_points}")
129
130
# Seeking to specific positions
131
with laspy.open('data.las') as reader:
132
# Jump to middle of file
133
mid_point = reader.header.point_count // 2
134
reader.seek(mid_point)
135
136
# Read 100 points from middle
137
middle_points = reader.read_points(100)
138
print(f"Points from middle: {len(middle_points)}")
139
```
140
141
### LAS File Writer
142
143
Writer for creating new LAS files with compression support and streaming capabilities.
144
145
```python { .api }
146
class LasWriter:
147
def __init__(self, dest, header: LasHeader, do_compress=None, laz_backend=None, closefd=True, encoding_errors="strict"):
148
"""
149
Initialize LAS writer.
150
151
Parameters:
152
- dest: BinaryIO - Output stream
153
- header: LasHeader - LAS header for new file
154
- do_compress: bool - Force compression on/off (optional)
155
- laz_backend: LazBackend - Compression backend (optional)
156
- closefd: bool - Whether to close file descriptor (default: True)
157
- encoding_errors: str - How to handle encoding errors (default: "strict")
158
"""
159
160
def write_points(self, points: PackedPointRecord):
161
"""
162
Write points to file.
163
164
Parameters:
165
- points: PackedPointRecord - Points to write
166
"""
167
168
def write_evlrs(self, evlrs: VLRList):
169
"""
170
Write Extended VLRs.
171
172
Parameters:
173
- evlrs: VLRList - Extended VLRs to write
174
"""
175
176
def close(self):
177
"""Close writer and finalize file."""
178
179
def __enter__(self) -> LasWriter: ...
180
def __exit__(self, exc_type, exc_val, exc_tb): ...
181
```
182
183
**Usage Examples:**
184
185
```python
186
import laspy
187
import numpy as np
188
189
# Create new LAS file
190
header = laspy.LasHeader(point_format=3, version=(1, 2))
191
header.scales = np.array([0.01, 0.01, 0.001])
192
header.offsets = np.array([0, 0, 0])
193
194
with laspy.open('output.las', mode='w', header=header) as writer:
195
# Generate some test points
196
n_points = 10000
197
points = laspy.ScaleAwarePointRecord.zeros(n_points, header=header)
198
199
# Set coordinates
200
points.x = np.random.uniform(0, 1000, n_points)
201
points.y = np.random.uniform(0, 1000, n_points)
202
points.z = np.random.uniform(0, 100, n_points)
203
points.classification = np.random.choice([1, 2, 3], n_points)
204
205
# Write all points at once
206
writer.write_points(points)
207
208
# Streaming write for large datasets
209
def generate_points(count, chunk_size=10000):
210
"""Generator for large point datasets."""
211
for i in range(0, count, chunk_size):
212
current_chunk_size = min(chunk_size, count - i)
213
214
# Create chunk
215
chunk = laspy.ScaleAwarePointRecord.zeros(current_chunk_size, header=header)
216
chunk.x = np.random.uniform(0, 1000, current_chunk_size)
217
chunk.y = np.random.uniform(0, 1000, current_chunk_size)
218
chunk.z = np.random.uniform(0, 100, current_chunk_size)
219
chunk.classification = np.random.choice([1, 2, 3], current_chunk_size)
220
221
yield chunk
222
223
# Write large dataset in chunks
224
header = laspy.LasHeader(point_format=3)
225
with laspy.open('large_output.laz', mode='w', header=header, do_compress=True) as writer:
226
total_points = 1000000
227
228
for chunk in generate_points(total_points):
229
writer.write_points(chunk)
230
print(f"Written {len(chunk)} points")
231
```
232
233
### LAS File Appender
234
235
Append points to existing LAS files while preserving original structure.
236
237
```python { .api }
238
class LasAppender:
239
def __init__(self, dest, laz_backend=None, closefd=True, encoding_errors="strict"):
240
"""
241
Initialize LAS appender.
242
243
Parameters:
244
- dest: BinaryIO - LAS file to append to
245
- laz_backend: LazBackend - Compression backend (optional)
246
- closefd: bool - Whether to close file descriptor (default: True)
247
- encoding_errors: str - How to handle encoding errors (default: "strict")
248
"""
249
250
def append_points(self, points: PackedPointRecord):
251
"""
252
Append points to existing file.
253
254
Parameters:
255
- points: PackedPointRecord - Points to append
256
"""
257
258
def close(self):
259
"""Close appender and update file header."""
260
261
def __enter__(self) -> LasAppender: ...
262
def __exit__(self, exc_type, exc_val, exc_tb): ...
263
```
264
265
**Usage Examples:**
266
267
```python
268
import laspy
269
import numpy as np
270
271
# Append points from multiple sources
272
def append_multiple_files(target_file, source_files):
273
"""Append points from multiple source files to target."""
274
275
with laspy.open(target_file, mode='a') as appender:
276
total_appended = 0
277
278
for source_file in source_files:
279
print(f"Processing {source_file}")
280
281
with laspy.open(source_file) as reader:
282
# Process in chunks to manage memory
283
for chunk in reader.chunk_iterator(50000):
284
appender.append_points(chunk)
285
total_appended += len(chunk)
286
287
print(f"Appended {reader.header.point_count} points from {source_file}")
288
289
print(f"Total points appended: {total_appended}")
290
291
# Usage
292
source_files = ['file1.las', 'file2.las', 'file3.las']
293
append_multiple_files('combined.las', source_files)
294
295
# Selective appending with filtering
296
with laspy.open('input.las') as reader:
297
with laspy.open('target.las', mode='a') as appender:
298
for chunk in reader.chunk_iterator(100000):
299
# Only append ground points
300
ground_points = chunk[chunk.classification == 2]
301
if len(ground_points) > 0:
302
appender.append_points(ground_points)
303
print(f"Appended {len(ground_points)} ground points")
304
```
305
306
### Memory-Mapped LAS Files
307
308
Memory-mapped access for efficient random access to large uncompressed LAS files.
309
310
```python { .api }
311
class LasMMAP(LasData):
312
def __init__(self, filename):
313
"""
314
Memory-map LAS file.
315
316
Parameters:
317
- filename: str or Path - LAS file to memory-map
318
319
Note: Only works with uncompressed LAS files
320
"""
321
322
def close(self):
323
"""Close memory mapping."""
324
325
def __enter__(self) -> LasMMAP: ...
326
def __exit__(self, exc_type, exc_val, exc_tb): ...
327
```
328
329
**Usage Examples:**
330
331
```python
332
import laspy
333
import numpy as np
334
335
# Memory-mapped random access
336
with laspy.mmap('large_file.las') as las:
337
print(f"Memory-mapped file with {len(las)} points")
338
339
# Random access to points
340
indices = np.random.choice(len(las), 1000, replace=False)
341
sample_points = las.points[indices]
342
343
print(f"Sampled {len(sample_points)} random points")
344
print(f"Sample coordinate range: {sample_points.x.min()}-{sample_points.x.max()}")
345
346
# Spatial filtering (efficient with memory mapping)
347
x_mask = (las.x >= 1000) & (las.x <= 2000)
348
y_mask = (las.y >= 2000) & (las.y <= 3000)
349
spatial_mask = x_mask & y_mask
350
351
filtered_points = las.points[spatial_mask]
352
print(f"Spatial filter found {len(filtered_points)} points")
353
354
# In-place modification with memory mapping
355
def classify_by_height(las_file, ground_threshold=2.0):
356
"""Classify points by height using memory mapping."""
357
358
with laspy.mmap(las_file) as las:
359
print(f"Classifying {len(las)} points by height")
360
361
# Find ground points (below threshold)
362
ground_mask = las.z < ground_threshold
363
364
# Modify classification in-place
365
las.classification[ground_mask] = 2 # Ground class
366
las.classification[~ground_mask] = 1 # Unclassified
367
368
ground_count = np.sum(ground_mask)
369
print(f"Classified {ground_count} points as ground")
370
print(f"Classified {len(las) - ground_count} points as above-ground")
371
372
# Note: Memory mapping only works with uncompressed LAS files
373
classify_by_height('uncompressed.las')
374
```
375
376
## Advanced I/O Patterns
377
378
### Pipeline Processing
379
380
```python
381
import laspy
382
from typing import Iterator, Callable
383
384
def create_processing_pipeline(input_file: str,
385
output_file: str,
386
processors: list[Callable],
387
chunk_size: int = 100000):
388
"""Create processing pipeline for large LAS files."""
389
390
with laspy.open(input_file) as reader:
391
header = reader.header.copy()
392
393
with laspy.open(output_file, mode='w', header=header) as writer:
394
total_processed = 0
395
396
for chunk in reader.chunk_iterator(chunk_size):
397
# Apply all processors to chunk
398
processed_chunk = chunk
399
for processor in processors:
400
processed_chunk = processor(processed_chunk)
401
402
# Write processed chunk
403
if len(processed_chunk) > 0:
404
writer.write_points(processed_chunk)
405
total_processed += len(processed_chunk)
406
407
print(f"Processed {total_processed} points")
408
409
# Example processors
410
def normalize_intensity(points):
411
"""Normalize intensity values to 0-65535 range."""
412
if hasattr(points, 'intensity') and len(points) > 0:
413
max_val = points.intensity.max()
414
if max_val > 0:
415
points.intensity = (points.intensity / max_val * 65535).astype(np.uint16)
416
return points
417
418
def filter_outliers(points):
419
"""Remove statistical outliers based on Z coordinate."""
420
if len(points) == 0:
421
return points
422
423
z_mean = points.z.mean()
424
z_std = points.z.std()
425
426
# Keep points within 3 standard deviations
427
mask = np.abs(points.z - z_mean) <= 3 * z_std
428
return points[mask]
429
430
def ground_classification(points):
431
"""Simple ground classification based on Z percentile."""
432
if len(points) == 0:
433
return points
434
435
ground_threshold = np.percentile(points.z, 10)
436
points.classification[points.z <= ground_threshold] = 2 # Ground
437
return points
438
439
# Use pipeline
440
processors = [normalize_intensity, filter_outliers, ground_classification]
441
create_processing_pipeline('input.las', 'processed.las', processors)
442
```
443
444
### Parallel Processing
445
446
```python
447
import laspy
448
import numpy as np
449
from concurrent.futures import ThreadPoolExecutor, as_completed
450
451
def parallel_chunk_processing(input_file: str,
452
output_file: str,
453
processor_func: Callable,
454
chunk_size: int = 50000,
455
max_workers: int = 4):
456
"""Process LAS file chunks in parallel."""
457
458
with laspy.open(input_file) as reader:
459
header = reader.header.copy()
460
461
# Read all chunks first (for parallel processing)
462
chunks = []
463
for chunk in reader.chunk_iterator(chunk_size):
464
chunks.append(chunk.copy()) # Copy to avoid memory mapping issues
465
466
print(f"Processing {len(chunks)} chunks with {max_workers} workers")
467
468
# Process chunks in parallel
469
processed_chunks = []
470
with ThreadPoolExecutor(max_workers=max_workers) as executor:
471
# Submit all chunks for processing
472
future_to_chunk = {
473
executor.submit(processor_func, chunk): i
474
for i, chunk in enumerate(chunks)
475
}
476
477
# Collect results in order
478
results = [None] * len(chunks)
479
for future in as_completed(future_to_chunk):
480
chunk_idx = future_to_chunk[future]
481
try:
482
results[chunk_idx] = future.result()
483
except Exception as e:
484
print(f"Chunk {chunk_idx} failed: {e}")
485
results[chunk_idx] = chunks[chunk_idx] # Use original
486
487
# Write results
488
with laspy.open(output_file, mode='w', header=header) as writer:
489
total_written = 0
490
for result_chunk in results:
491
if result_chunk is not None and len(result_chunk) > 0:
492
writer.write_points(result_chunk)
493
total_written += len(result_chunk)
494
495
print(f"Wrote {total_written} processed points")
496
497
def intensive_processor(points):
498
"""Example computationally intensive processor."""
499
if len(points) == 0:
500
return points
501
502
# Simulate intensive computation (e.g., complex filtering)
503
# This would be replaced with actual processing logic
504
import time
505
time.sleep(0.01) # Simulate processing time
506
507
# Example: smooth Z coordinates using rolling mean
508
window_size = min(100, len(points))
509
if window_size > 1:
510
smoothed_z = np.convolve(points.z, np.ones(window_size)/window_size, mode='same')
511
points.z = smoothed_z.astype(points.z.dtype)
512
513
return points
514
515
# Use parallel processing
516
parallel_chunk_processing('large_input.las', 'processed_output.las', intensive_processor)
517
```