Native Python ASPRS LAS read/write library for processing LiDAR point cloud data
—
Specialized reader, writer, and appender classes for different file access patterns including streaming, chunked processing, and memory mapping. These classes provide fine-grained control over LAS file I/O operations.
Streaming reader for efficient processing of large LAS files with chunked iteration support.
class LasReader:
def __init__(self, source, closefd=True, laz_backend=None, read_evlrs=True, decompression_selection=None):
"""
Initialize LAS reader.
Parameters:
- source: BinaryIO - LAS/LAZ file stream
- closefd: bool - Whether to close file descriptor (default: True)
- laz_backend: LazBackend or list - Compression backend(s) to try
- read_evlrs: bool - Whether to read Extended VLRs (default: True)
- decompression_selection: DecompressionSelection - Fields to decompress
"""
@property
def evlrs(self) -> Optional[VLRList]:
"""Extended Variable Length Records."""
@property
def header(self) -> LasHeader:
"""LAS file header."""
def read_points(self, n: int) -> ScaleAwarePointRecord:
"""
Read specified number of points from current position.
Parameters:
- n: int - Number of points to read
Returns:
ScaleAwarePointRecord: Points read from file
"""
def read(self) -> LasData:
"""
Read entire file into LasData container.
Returns:
LasData: Complete LAS data
"""
def seek(self, pos: int, whence=io.SEEK_SET) -> int:
"""
Seek to specific point position.
Parameters:
- pos: int - Point position to seek to
- whence: int - Seek reference (SEEK_SET, SEEK_CUR, SEEK_END)
Returns:
int: New position
"""
def chunk_iterator(self, points_per_iteration: int) -> PointChunkIterator:
"""
Create iterator for chunked point reading.
Parameters:
- points_per_iteration: int - Points per chunk
Returns:
PointChunkIterator: Chunk iterator
"""
def read_evlrs(self):
"""Read Extended VLRs if not already loaded."""
def close(self):
"""Close reader and free resources."""
def __enter__(self) -> LasReader: ...
def __exit__(self, exc_type, exc_val, exc_tb): ...
class PointChunkIterator:
def __init__(self, reader: LasReader, points_per_iteration: int):
"""
Initialize chunk iterator.
Parameters:
- reader: LasReader - Reader to iterate over
- points_per_iteration: int - Points per chunk
"""
def __next__(self) -> ScaleAwarePointRecord:
"""Get next chunk of points."""
def __iter__(self) -> PointChunkIterator: ...Usage Examples:
import laspy
# Basic point reading
with laspy.open('data.las') as reader:
print(f"File has {reader.header.point_count} points")
# Read first 1000 points
first_chunk = reader.read_points(1000)
print(f"First chunk: {len(first_chunk)} points")
# Read next 1000 points
second_chunk = reader.read_points(1000)
print(f"Second chunk: {len(second_chunk)} points")
# Chunked processing for large files
with laspy.open('large.laz') as reader:
chunk_size = 100000
total_ground_points = 0
for chunk in reader.chunk_iterator(chunk_size):
# Count ground points in this chunk
ground_count = np.sum(chunk.classification == 2)
total_ground_points += ground_count
print(f"Chunk: {len(chunk)} points, {ground_count} ground points")
print(f"Total ground points in file: {total_ground_points}")
# Seeking to specific positions
with laspy.open('data.las') as reader:
# Jump to middle of file
mid_point = reader.header.point_count // 2
reader.seek(mid_point)
# Read 100 points from middle
middle_points = reader.read_points(100)
print(f"Points from middle: {len(middle_points)}")Writer for creating new LAS files with compression support and streaming capabilities.
class LasWriter:
def __init__(self, dest, header: LasHeader, do_compress=None, laz_backend=None, closefd=True, encoding_errors="strict"):
"""
Initialize LAS writer.
Parameters:
- dest: BinaryIO - Output stream
- header: LasHeader - LAS header for new file
- do_compress: bool - Force compression on/off (optional)
- laz_backend: LazBackend - Compression backend (optional)
- closefd: bool - Whether to close file descriptor (default: True)
- encoding_errors: str - How to handle encoding errors (default: "strict")
"""
def write_points(self, points: PackedPointRecord):
"""
Write points to file.
Parameters:
- points: PackedPointRecord - Points to write
"""
def write_evlrs(self, evlrs: VLRList):
"""
Write Extended VLRs.
Parameters:
- evlrs: VLRList - Extended VLRs to write
"""
def close(self):
"""Close writer and finalize file."""
def __enter__(self) -> LasWriter: ...
def __exit__(self, exc_type, exc_val, exc_tb): ...Usage Examples:
import laspy
import numpy as np
# Create new LAS file
header = laspy.LasHeader(point_format=3, version=(1, 2))
header.scales = np.array([0.01, 0.01, 0.001])
header.offsets = np.array([0, 0, 0])
with laspy.open('output.las', mode='w', header=header) as writer:
# Generate some test points
n_points = 10000
points = laspy.ScaleAwarePointRecord.zeros(n_points, header=header)
# Set coordinates
points.x = np.random.uniform(0, 1000, n_points)
points.y = np.random.uniform(0, 1000, n_points)
points.z = np.random.uniform(0, 100, n_points)
points.classification = np.random.choice([1, 2, 3], n_points)
# Write all points at once
writer.write_points(points)
# Streaming write for large datasets
def generate_points(count, chunk_size=10000):
"""Generator for large point datasets."""
for i in range(0, count, chunk_size):
current_chunk_size = min(chunk_size, count - i)
# Create chunk
chunk = laspy.ScaleAwarePointRecord.zeros(current_chunk_size, header=header)
chunk.x = np.random.uniform(0, 1000, current_chunk_size)
chunk.y = np.random.uniform(0, 1000, current_chunk_size)
chunk.z = np.random.uniform(0, 100, current_chunk_size)
chunk.classification = np.random.choice([1, 2, 3], current_chunk_size)
yield chunk
# Write large dataset in chunks
header = laspy.LasHeader(point_format=3)
with laspy.open('large_output.laz', mode='w', header=header, do_compress=True) as writer:
total_points = 1000000
for chunk in generate_points(total_points):
writer.write_points(chunk)
print(f"Written {len(chunk)} points")Append points to existing LAS files while preserving original structure.
class LasAppender:
def __init__(self, dest, laz_backend=None, closefd=True, encoding_errors="strict"):
"""
Initialize LAS appender.
Parameters:
- dest: BinaryIO - LAS file to append to
- laz_backend: LazBackend - Compression backend (optional)
- closefd: bool - Whether to close file descriptor (default: True)
- encoding_errors: str - How to handle encoding errors (default: "strict")
"""
def append_points(self, points: PackedPointRecord):
"""
Append points to existing file.
Parameters:
- points: PackedPointRecord - Points to append
"""
def close(self):
"""Close appender and update file header."""
def __enter__(self) -> LasAppender: ...
def __exit__(self, exc_type, exc_val, exc_tb): ...Usage Examples:
import laspy
import numpy as np
# Append points from multiple sources
def append_multiple_files(target_file, source_files):
"""Append points from multiple source files to target."""
with laspy.open(target_file, mode='a') as appender:
total_appended = 0
for source_file in source_files:
print(f"Processing {source_file}")
with laspy.open(source_file) as reader:
# Process in chunks to manage memory
for chunk in reader.chunk_iterator(50000):
appender.append_points(chunk)
total_appended += len(chunk)
print(f"Appended {reader.header.point_count} points from {source_file}")
print(f"Total points appended: {total_appended}")
# Usage
source_files = ['file1.las', 'file2.las', 'file3.las']
append_multiple_files('combined.las', source_files)
# Selective appending with filtering
with laspy.open('input.las') as reader:
with laspy.open('target.las', mode='a') as appender:
for chunk in reader.chunk_iterator(100000):
# Only append ground points
ground_points = chunk[chunk.classification == 2]
if len(ground_points) > 0:
appender.append_points(ground_points)
print(f"Appended {len(ground_points)} ground points")Memory-mapped access for efficient random access to large uncompressed LAS files.
class LasMMAP(LasData):
def __init__(self, filename):
"""
Memory-map LAS file.
Parameters:
- filename: str or Path - LAS file to memory-map
Note: Only works with uncompressed LAS files
"""
def close(self):
"""Close memory mapping."""
def __enter__(self) -> LasMMAP: ...
def __exit__(self, exc_type, exc_val, exc_tb): ...Usage Examples:
import laspy
import numpy as np
# Memory-mapped random access
with laspy.mmap('large_file.las') as las:
print(f"Memory-mapped file with {len(las)} points")
# Random access to points
indices = np.random.choice(len(las), 1000, replace=False)
sample_points = las.points[indices]
print(f"Sampled {len(sample_points)} random points")
print(f"Sample coordinate range: {sample_points.x.min()}-{sample_points.x.max()}")
# Spatial filtering (efficient with memory mapping)
x_mask = (las.x >= 1000) & (las.x <= 2000)
y_mask = (las.y >= 2000) & (las.y <= 3000)
spatial_mask = x_mask & y_mask
filtered_points = las.points[spatial_mask]
print(f"Spatial filter found {len(filtered_points)} points")
# In-place modification with memory mapping
def classify_by_height(las_file, ground_threshold=2.0):
"""Classify points by height using memory mapping."""
with laspy.mmap(las_file) as las:
print(f"Classifying {len(las)} points by height")
# Find ground points (below threshold)
ground_mask = las.z < ground_threshold
# Modify classification in-place
las.classification[ground_mask] = 2 # Ground class
las.classification[~ground_mask] = 1 # Unclassified
ground_count = np.sum(ground_mask)
print(f"Classified {ground_count} points as ground")
print(f"Classified {len(las) - ground_count} points as above-ground")
# Note: Memory mapping only works with uncompressed LAS files
classify_by_height('uncompressed.las')import laspy
from typing import Iterator, Callable
def create_processing_pipeline(input_file: str,
output_file: str,
processors: list[Callable],
chunk_size: int = 100000):
"""Create processing pipeline for large LAS files."""
with laspy.open(input_file) as reader:
header = reader.header.copy()
with laspy.open(output_file, mode='w', header=header) as writer:
total_processed = 0
for chunk in reader.chunk_iterator(chunk_size):
# Apply all processors to chunk
processed_chunk = chunk
for processor in processors:
processed_chunk = processor(processed_chunk)
# Write processed chunk
if len(processed_chunk) > 0:
writer.write_points(processed_chunk)
total_processed += len(processed_chunk)
print(f"Processed {total_processed} points")
# Example processors
def normalize_intensity(points):
"""Normalize intensity values to 0-65535 range."""
if hasattr(points, 'intensity') and len(points) > 0:
max_val = points.intensity.max()
if max_val > 0:
points.intensity = (points.intensity / max_val * 65535).astype(np.uint16)
return points
def filter_outliers(points):
"""Remove statistical outliers based on Z coordinate."""
if len(points) == 0:
return points
z_mean = points.z.mean()
z_std = points.z.std()
# Keep points within 3 standard deviations
mask = np.abs(points.z - z_mean) <= 3 * z_std
return points[mask]
def ground_classification(points):
"""Simple ground classification based on Z percentile."""
if len(points) == 0:
return points
ground_threshold = np.percentile(points.z, 10)
points.classification[points.z <= ground_threshold] = 2 # Ground
return points
# Use pipeline
processors = [normalize_intensity, filter_outliers, ground_classification]
create_processing_pipeline('input.las', 'processed.las', processors)import laspy
import numpy as np
from concurrent.futures import ThreadPoolExecutor, as_completed
def parallel_chunk_processing(input_file: str,
output_file: str,
processor_func: Callable,
chunk_size: int = 50000,
max_workers: int = 4):
"""Process LAS file chunks in parallel."""
with laspy.open(input_file) as reader:
header = reader.header.copy()
# Read all chunks first (for parallel processing)
chunks = []
for chunk in reader.chunk_iterator(chunk_size):
chunks.append(chunk.copy()) # Copy to avoid memory mapping issues
print(f"Processing {len(chunks)} chunks with {max_workers} workers")
# Process chunks in parallel
processed_chunks = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all chunks for processing
future_to_chunk = {
executor.submit(processor_func, chunk): i
for i, chunk in enumerate(chunks)
}
# Collect results in order
results = [None] * len(chunks)
for future in as_completed(future_to_chunk):
chunk_idx = future_to_chunk[future]
try:
results[chunk_idx] = future.result()
except Exception as e:
print(f"Chunk {chunk_idx} failed: {e}")
results[chunk_idx] = chunks[chunk_idx] # Use original
# Write results
with laspy.open(output_file, mode='w', header=header) as writer:
total_written = 0
for result_chunk in results:
if result_chunk is not None and len(result_chunk) > 0:
writer.write_points(result_chunk)
total_written += len(result_chunk)
print(f"Wrote {total_written} processed points")
def intensive_processor(points):
"""Example computationally intensive processor."""
if len(points) == 0:
return points
# Simulate intensive computation (e.g., complex filtering)
# This would be replaced with actual processing logic
import time
time.sleep(0.01) # Simulate processing time
# Example: smooth Z coordinates using rolling mean
window_size = min(100, len(points))
if window_size > 1:
smoothed_z = np.convolve(points.z, np.ones(window_size)/window_size, mode='same')
points.z = smoothed_z.astype(points.z.dtype)
return points
# Use parallel processing
parallel_chunk_processing('large_input.las', 'processed_output.las', intensive_processor)Install with Tessl CLI
npx tessl i tessl/pypi-laspy