CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-laspy

Native Python ASPRS LAS read/write library for processing LiDAR point cloud data

Pending
Overview
Eval results
Files

io-handlers.mddocs/

I/O Handler Classes

Specialized reader, writer, and appender classes for different file access patterns including streaming, chunked processing, and memory mapping. These classes provide fine-grained control over LAS file I/O operations.

Capabilities

LAS File Reader

Streaming reader for efficient processing of large LAS files with chunked iteration support.

class LasReader:
    def __init__(self, source, closefd=True, laz_backend=None, read_evlrs=True, decompression_selection=None):
        """
        Initialize LAS reader.
        
        Parameters:
        - source: BinaryIO - LAS/LAZ file stream
        - closefd: bool - Whether to close file descriptor (default: True)
        - laz_backend: LazBackend or list - Compression backend(s) to try
        - read_evlrs: bool - Whether to read Extended VLRs (default: True)
        - decompression_selection: DecompressionSelection - Fields to decompress
        """
    
    @property
    def evlrs(self) -> Optional[VLRList]:
        """Extended Variable Length Records."""
    
    @property
    def header(self) -> LasHeader:
        """LAS file header."""
    
    def read_points(self, n: int) -> ScaleAwarePointRecord:
        """
        Read specified number of points from current position.
        
        Parameters:
        - n: int - Number of points to read
        
        Returns:
        ScaleAwarePointRecord: Points read from file
        """
    
    def read(self) -> LasData:
        """
        Read entire file into LasData container.
        
        Returns:
        LasData: Complete LAS data
        """
    
    def seek(self, pos: int, whence=io.SEEK_SET) -> int:
        """
        Seek to specific point position.
        
        Parameters:
        - pos: int - Point position to seek to
        - whence: int - Seek reference (SEEK_SET, SEEK_CUR, SEEK_END)
        
        Returns:
        int: New position
        """
    
    def chunk_iterator(self, points_per_iteration: int) -> PointChunkIterator:
        """
        Create iterator for chunked point reading.
        
        Parameters:
        - points_per_iteration: int - Points per chunk
        
        Returns:
        PointChunkIterator: Chunk iterator
        """
    
    def read_evlrs(self):
        """Read Extended VLRs if not already loaded."""
    
    def close(self):
        """Close reader and free resources."""
    
    def __enter__(self) -> LasReader: ...
    def __exit__(self, exc_type, exc_val, exc_tb): ...

class PointChunkIterator:
    def __init__(self, reader: LasReader, points_per_iteration: int):
        """
        Initialize chunk iterator.
        
        Parameters:
        - reader: LasReader - Reader to iterate over
        - points_per_iteration: int - Points per chunk
        """
    
    def __next__(self) -> ScaleAwarePointRecord:
        """Get next chunk of points."""
    
    def __iter__(self) -> PointChunkIterator: ...

Usage Examples:

import laspy

# Basic point reading
with laspy.open('data.las') as reader:
    print(f"File has {reader.header.point_count} points")
    
    # Read first 1000 points
    first_chunk = reader.read_points(1000)
    print(f"First chunk: {len(first_chunk)} points")
    
    # Read next 1000 points
    second_chunk = reader.read_points(1000)
    print(f"Second chunk: {len(second_chunk)} points")

# Chunked processing for large files
with laspy.open('large.laz') as reader:
    chunk_size = 100000
    total_ground_points = 0
    
    for chunk in reader.chunk_iterator(chunk_size):
        # Count ground points in this chunk
        ground_count = np.sum(chunk.classification == 2)
        total_ground_points += ground_count
        
        print(f"Chunk: {len(chunk)} points, {ground_count} ground points")
    
    print(f"Total ground points in file: {total_ground_points}")

# Seeking to specific positions
with laspy.open('data.las') as reader:
    # Jump to middle of file
    mid_point = reader.header.point_count // 2
    reader.seek(mid_point)
    
    # Read 100 points from middle
    middle_points = reader.read_points(100)
    print(f"Points from middle: {len(middle_points)}")

LAS File Writer

Writer for creating new LAS files with compression support and streaming capabilities.

class LasWriter:
    def __init__(self, dest, header: LasHeader, do_compress=None, laz_backend=None, closefd=True, encoding_errors="strict"):
        """
        Initialize LAS writer.
        
        Parameters:
        - dest: BinaryIO - Output stream
        - header: LasHeader - LAS header for new file
        - do_compress: bool - Force compression on/off (optional)
        - laz_backend: LazBackend - Compression backend (optional)
        - closefd: bool - Whether to close file descriptor (default: True)
        - encoding_errors: str - How to handle encoding errors (default: "strict")
        """
    
    def write_points(self, points: PackedPointRecord):
        """
        Write points to file.
        
        Parameters:
        - points: PackedPointRecord - Points to write
        """
    
    def write_evlrs(self, evlrs: VLRList):
        """
        Write Extended VLRs.
        
        Parameters:
        - evlrs: VLRList - Extended VLRs to write
        """
    
    def close(self):
        """Close writer and finalize file."""
    
    def __enter__(self) -> LasWriter: ...
    def __exit__(self, exc_type, exc_val, exc_tb): ...

Usage Examples:

import laspy
import numpy as np

# Create new LAS file
header = laspy.LasHeader(point_format=3, version=(1, 2))
header.scales = np.array([0.01, 0.01, 0.001])
header.offsets = np.array([0, 0, 0])

with laspy.open('output.las', mode='w', header=header) as writer:
    # Generate some test points
    n_points = 10000
    points = laspy.ScaleAwarePointRecord.zeros(n_points, header=header)
    
    # Set coordinates
    points.x = np.random.uniform(0, 1000, n_points)
    points.y = np.random.uniform(0, 1000, n_points)
    points.z = np.random.uniform(0, 100, n_points)
    points.classification = np.random.choice([1, 2, 3], n_points)
    
    # Write all points at once
    writer.write_points(points)

# Streaming write for large datasets
def generate_points(count, chunk_size=10000):
    """Generator for large point datasets."""
    for i in range(0, count, chunk_size):
        current_chunk_size = min(chunk_size, count - i)
        
        # Create chunk
        chunk = laspy.ScaleAwarePointRecord.zeros(current_chunk_size, header=header)
        chunk.x = np.random.uniform(0, 1000, current_chunk_size)
        chunk.y = np.random.uniform(0, 1000, current_chunk_size)
        chunk.z = np.random.uniform(0, 100, current_chunk_size)
        chunk.classification = np.random.choice([1, 2, 3], current_chunk_size)
        
        yield chunk

# Write large dataset in chunks
header = laspy.LasHeader(point_format=3)
with laspy.open('large_output.laz', mode='w', header=header, do_compress=True) as writer:
    total_points = 1000000
    
    for chunk in generate_points(total_points):
        writer.write_points(chunk)
        print(f"Written {len(chunk)} points")

LAS File Appender

Append points to existing LAS files while preserving original structure.

class LasAppender:
    def __init__(self, dest, laz_backend=None, closefd=True, encoding_errors="strict"):
        """
        Initialize LAS appender.
        
        Parameters:
        - dest: BinaryIO - LAS file to append to
        - laz_backend: LazBackend - Compression backend (optional)
        - closefd: bool - Whether to close file descriptor (default: True)
        - encoding_errors: str - How to handle encoding errors (default: "strict")
        """
    
    def append_points(self, points: PackedPointRecord):
        """
        Append points to existing file.
        
        Parameters:
        - points: PackedPointRecord - Points to append
        """
    
    def close(self):
        """Close appender and update file header."""
    
    def __enter__(self) -> LasAppender: ...
    def __exit__(self, exc_type, exc_val, exc_tb): ...

Usage Examples:

import laspy
import numpy as np

# Append points from multiple sources
def append_multiple_files(target_file, source_files):
    """Append points from multiple source files to target."""
    
    with laspy.open(target_file, mode='a') as appender:
        total_appended = 0
        
        for source_file in source_files:
            print(f"Processing {source_file}")
            
            with laspy.open(source_file) as reader:
                # Process in chunks to manage memory
                for chunk in reader.chunk_iterator(50000):
                    appender.append_points(chunk)
                    total_appended += len(chunk)
                    
            print(f"Appended {reader.header.point_count} points from {source_file}")
        
        print(f"Total points appended: {total_appended}")

# Usage
source_files = ['file1.las', 'file2.las', 'file3.las']
append_multiple_files('combined.las', source_files)

# Selective appending with filtering
with laspy.open('input.las') as reader:
    with laspy.open('target.las', mode='a') as appender:
        for chunk in reader.chunk_iterator(100000):
            # Only append ground points
            ground_points = chunk[chunk.classification == 2]
            if len(ground_points) > 0:
                appender.append_points(ground_points)
                print(f"Appended {len(ground_points)} ground points")

Memory-Mapped LAS Files

Memory-mapped access for efficient random access to large uncompressed LAS files.

class LasMMAP(LasData):
    def __init__(self, filename):
        """
        Memory-map LAS file.
        
        Parameters:
        - filename: str or Path - LAS file to memory-map
        
        Note: Only works with uncompressed LAS files
        """
    
    def close(self):
        """Close memory mapping."""
    
    def __enter__(self) -> LasMMAP: ...
    def __exit__(self, exc_type, exc_val, exc_tb): ...

Usage Examples:

import laspy
import numpy as np

# Memory-mapped random access
with laspy.mmap('large_file.las') as las:
    print(f"Memory-mapped file with {len(las)} points")
    
    # Random access to points
    indices = np.random.choice(len(las), 1000, replace=False)
    sample_points = las.points[indices]
    
    print(f"Sampled {len(sample_points)} random points")
    print(f"Sample coordinate range: {sample_points.x.min()}-{sample_points.x.max()}")
    
    # Spatial filtering (efficient with memory mapping)
    x_mask = (las.x >= 1000) & (las.x <= 2000)
    y_mask = (las.y >= 2000) & (las.y <= 3000)
    spatial_mask = x_mask & y_mask
    
    filtered_points = las.points[spatial_mask]
    print(f"Spatial filter found {len(filtered_points)} points")

# In-place modification with memory mapping
def classify_by_height(las_file, ground_threshold=2.0):
    """Classify points by height using memory mapping."""
    
    with laspy.mmap(las_file) as las:
        print(f"Classifying {len(las)} points by height")
        
        # Find ground points (below threshold)
        ground_mask = las.z < ground_threshold
        
        # Modify classification in-place
        las.classification[ground_mask] = 2  # Ground class
        las.classification[~ground_mask] = 1  # Unclassified
        
        ground_count = np.sum(ground_mask)
        print(f"Classified {ground_count} points as ground")
        print(f"Classified {len(las) - ground_count} points as above-ground")

# Note: Memory mapping only works with uncompressed LAS files
classify_by_height('uncompressed.las')

Advanced I/O Patterns

Pipeline Processing

import laspy
from typing import Iterator, Callable

def create_processing_pipeline(input_file: str, 
                             output_file: str,
                             processors: list[Callable],
                             chunk_size: int = 100000):
    """Create processing pipeline for large LAS files."""
    
    with laspy.open(input_file) as reader:
        header = reader.header.copy()
        
        with laspy.open(output_file, mode='w', header=header) as writer:
            total_processed = 0
            
            for chunk in reader.chunk_iterator(chunk_size):
                # Apply all processors to chunk
                processed_chunk = chunk
                for processor in processors:
                    processed_chunk = processor(processed_chunk)
                
                # Write processed chunk
                if len(processed_chunk) > 0:
                    writer.write_points(processed_chunk)
                    total_processed += len(processed_chunk)
                
                print(f"Processed {total_processed} points")

# Example processors
def normalize_intensity(points):
    """Normalize intensity values to 0-65535 range."""
    if hasattr(points, 'intensity') and len(points) > 0:
        max_val = points.intensity.max()
        if max_val > 0:
            points.intensity = (points.intensity / max_val * 65535).astype(np.uint16)
    return points

def filter_outliers(points):
    """Remove statistical outliers based on Z coordinate."""
    if len(points) == 0:
        return points
        
    z_mean = points.z.mean()
    z_std = points.z.std()
    
    # Keep points within 3 standard deviations
    mask = np.abs(points.z - z_mean) <= 3 * z_std
    return points[mask]

def ground_classification(points):
    """Simple ground classification based on Z percentile."""
    if len(points) == 0:
        return points
        
    ground_threshold = np.percentile(points.z, 10)
    points.classification[points.z <= ground_threshold] = 2  # Ground
    return points

# Use pipeline
processors = [normalize_intensity, filter_outliers, ground_classification]
create_processing_pipeline('input.las', 'processed.las', processors)

Parallel Processing

import laspy
import numpy as np
from concurrent.futures import ThreadPoolExecutor, as_completed

def parallel_chunk_processing(input_file: str,
                            output_file: str, 
                            processor_func: Callable,
                            chunk_size: int = 50000,
                            max_workers: int = 4):
    """Process LAS file chunks in parallel."""
    
    with laspy.open(input_file) as reader:
        header = reader.header.copy()
        
        # Read all chunks first (for parallel processing)
        chunks = []
        for chunk in reader.chunk_iterator(chunk_size):
            chunks.append(chunk.copy())  # Copy to avoid memory mapping issues
        
        print(f"Processing {len(chunks)} chunks with {max_workers} workers")
        
        # Process chunks in parallel
        processed_chunks = []
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            # Submit all chunks for processing
            future_to_chunk = {
                executor.submit(processor_func, chunk): i 
                for i, chunk in enumerate(chunks)
            }
            
            # Collect results in order
            results = [None] * len(chunks)
            for future in as_completed(future_to_chunk):
                chunk_idx = future_to_chunk[future]
                try:
                    results[chunk_idx] = future.result()
                except Exception as e:
                    print(f"Chunk {chunk_idx} failed: {e}")
                    results[chunk_idx] = chunks[chunk_idx]  # Use original
        
        # Write results
        with laspy.open(output_file, mode='w', header=header) as writer:
            total_written = 0
            for result_chunk in results:
                if result_chunk is not None and len(result_chunk) > 0:
                    writer.write_points(result_chunk)
                    total_written += len(result_chunk)
            
            print(f"Wrote {total_written} processed points")

def intensive_processor(points):
    """Example computationally intensive processor."""
    if len(points) == 0:
        return points
    
    # Simulate intensive computation (e.g., complex filtering)
    # This would be replaced with actual processing logic
    import time
    time.sleep(0.01)  # Simulate processing time
    
    # Example: smooth Z coordinates using rolling mean
    window_size = min(100, len(points))
    if window_size > 1:
        smoothed_z = np.convolve(points.z, np.ones(window_size)/window_size, mode='same')
        points.z = smoothed_z.astype(points.z.dtype)
    
    return points

# Use parallel processing
parallel_chunk_processing('large_input.las', 'processed_output.las', intensive_processor)

Install with Tessl CLI

npx tessl i tessl/pypi-laspy

docs

compression.md

copc.md

core-io.md

data-containers.md

index.md

io-handlers.md

point-data.md

vlr.md

tile.json