Native Python ASPRS LAS read/write library for processing LiDAR point cloud data
—
LAZ compression backend management with support for multiple compression libraries and selective field decompression for efficient processing. Laspy provides flexible compression options to optimize file size and processing performance.
Multiple compression backend support with automatic detection and fallback capabilities.
class LazBackend(Enum):
LazrsParallel = 0 # Multi-threaded lazrs backend
Lazrs = 1 # Single-threaded lazrs backend
Laszip = 2 # LASzip backend
@classmethod
def detect_available(cls) -> Tuple[LazBackend, ...]:
"""
Detect available compression backends on system.
Returns:
Tuple[LazBackend, ...]: Available backends in priority order
"""
def is_available(self) -> bool:
"""
Check if this backend is available.
Returns:
bool: True if backend can be used
"""
@property
def supports_append(self) -> bool:
"""
Check if backend supports append operations.
Returns:
bool: True if append is supported
"""
def create_reader(self, source, header, decompression_selection=None):
"""Create point reader using this backend."""
def create_writer(self, dest, header):
"""Create point writer using this backend."""
def create_appender(self, dest, header):
"""Create point appender using this backend (if supported)."""Usage Examples:
import laspy
from laspy import LazBackend
# Check available backends
available = LazBackend.detect_available()
print(f"Available backends: {[b.name for b in available]}")
# Check specific backend availability
if LazBackend.LazrsParallel.is_available():
print("Multi-threaded lazrs available")
backend = LazBackend.LazrsParallel
elif LazBackend.Lazrs.is_available():
print("Single-threaded lazrs available")
backend = LazBackend.Lazrs
elif LazBackend.Laszip.is_available():
print("LASzip available")
backend = LazBackend.Laszip
else:
print("No LAZ backends available")
backend = None
# Use specific backend for reading
if backend:
las = laspy.read('compressed.laz', laz_backend=backend)
print(f"Read {len(las.points)} points using {backend.name}")
# Check append support
for backend in available:
if backend.supports_append:
print(f"{backend.name} supports append operations")
else:
print(f"{backend.name} does not support append operations")Fine-grained control over which fields to decompress for memory and performance optimization.
class DecompressionSelection(IntFlag):
XY_RETURNS_CHANNEL = ... # X, Y coordinates and return information
Z = ... # Z coordinate
CLASSIFICATION = ... # Classification field
FLAGS = ... # Various flag fields
INTENSITY = ... # Intensity values
SCAN_ANGLE = ... # Scan angle
USER_DATA = ... # User data field
POINT_SOURCE_ID = ... # Point source ID
GPS_TIME = ... # GPS timestamp (if present)
RGB = ... # RGB color information (if present)
NIR = ... # Near-infrared (if present)
WAVEPACKET = ... # Wavepacket data (if present)
ALL_EXTRA_BYTES = ... # All extra byte dimensions
@classmethod
def all(cls) -> DecompressionSelection:
"""
Select all available fields for decompression.
Returns:
DecompressionSelection: All fields selected
"""
@classmethod
def base(cls) -> DecompressionSelection:
"""
Select base essential fields (XYZ, returns, classification).
Returns:
DecompressionSelection: Essential fields only
"""
@classmethod
def xy_returns_channel(cls) -> DecompressionSelection:
"""
Select only XY coordinates and return information.
Returns:
DecompressionSelection: Minimal coordinate fields
"""
def decompress_xy_returns_channel(self) -> DecompressionSelection:
"""Enable XY coordinates and return information decompression."""
def decompress_z(self) -> DecompressionSelection:
"""Enable Z coordinate decompression."""
def decompress_classification(self) -> DecompressionSelection:
"""Enable classification field decompression."""
def decompress_flags(self) -> DecompressionSelection:
"""Enable flag fields decompression."""
def decompress_intensity(self) -> DecompressionSelection:
"""Enable intensity decompression."""
def decompress_scan_angle(self) -> DecompressionSelection:
"""Enable scan angle decompression."""
def decompress_user_data(self) -> DecompressionSelection:
"""Enable user data decompression."""
def decompress_point_source_id(self) -> DecompressionSelection:
"""Enable point source ID decompression."""
def decompress_gps_time(self) -> DecompressionSelection:
"""Enable GPS time decompression."""
def decompress_rgb(self) -> DecompressionSelection:
"""Enable RGB color decompression."""
def decompress_nir(self) -> DecompressionSelection:
"""Enable near-infrared decompression."""
def decompress_wavepacket(self) -> DecompressionSelection:
"""Enable wavepacket data decompression."""
def decompress_all_extra_bytes(self) -> DecompressionSelection:
"""Enable all extra bytes decompression."""
def skip_xy_returns_channel(self) -> DecompressionSelection:
"""Disable XY coordinates and return information decompression."""
def skip_z(self) -> DecompressionSelection:
"""Disable Z coordinate decompression."""
def skip_classification(self) -> DecompressionSelection:
"""Disable classification field decompression."""
def skip_flags(self) -> DecompressionSelection:
"""Disable flag fields decompression."""
def skip_intensity(self) -> DecompressionSelection:
"""Disable intensity decompression."""
def skip_scan_angle(self) -> DecompressionSelection:
"""Disable scan angle decompression."""
def skip_user_data(self) -> DecompressionSelection:
"""Disable user data decompression."""
def skip_point_source_id(self) -> DecompressionSelection:
"""Disable point source ID decompression."""
def skip_gps_time(self) -> DecompressionSelection:
"""Disable GPS time decompression."""
def skip_rgb(self) -> DecompressionSelection:
"""Disable RGB color decompression."""
def skip_nir(self) -> DecompressionSelection:
"""Disable near-infrared decompression."""
def skip_wavepacket(self) -> DecompressionSelection:
"""Disable wavepacket data decompression."""
def skip_all_extra_bytes(self) -> DecompressionSelection:
"""Disable all extra bytes decompression."""
def is_set_xy_returns_channel(self) -> bool:
"""Check if XY coordinates and returns are selected."""
def is_set_z(self) -> bool:
"""Check if Z coordinate is selected."""
def is_set_classification(self) -> bool:
"""Check if classification field is selected."""
def is_set_flags(self) -> bool:
"""Check if flag fields are selected."""
def is_set_intensity(self) -> bool:
"""Check if intensity is selected."""
def is_set_scan_angle(self) -> bool:
"""Check if scan angle is selected."""
def is_set_user_data(self) -> bool:
"""Check if user data is selected."""
def is_set_point_source_id(self) -> bool:
"""Check if point source ID is selected."""
def is_set_gps_time(self) -> bool:
"""Check if GPS time is selected."""
def is_set_rgb(self) -> bool:
"""Check if RGB color is selected."""
def is_set_nir(self) -> bool:
"""Check if near-infrared is selected."""
def is_set_wavepacket(self) -> bool:
"""Check if wavepacket data is selected."""
def is_set_all_extra_bytes(self) -> bool:
"""Check if all extra bytes are selected."""
def is_set(self, flag) -> bool:
"""
Check if specific flag is set.
Parameters:
- flag: DecompressionSelection flag to check
Returns:
bool: True if flag is set
"""
def to_lazrs(self):
"""Convert to lazrs-specific decompression selection."""
def to_laszip(self) -> int:
"""Convert to LASzip-compatible integer selection."""Usage Examples:
import laspy
from laspy import DecompressionSelection
# Read only essential fields to save memory
selection = DecompressionSelection.base() # XYZ, returns, classification
las = laspy.read('large.laz', decompression_selection=selection)
print(f"Loaded {len(las.points)} points with base fields only")
# Custom field selection for specific analysis
selection = (DecompressionSelection.xy_returns_channel()
.decompress_z()
.decompress_intensity()
.decompress_rgb())
# Skip expensive fields like GPS time and extra bytes
las = laspy.read('detailed.laz', decompression_selection=selection)
# Check what fields are available
if selection.is_set_rgb():
print("RGB data will be decompressed")
# Access RGB data
colors = las.points[['red', 'green', 'blue']]
if not selection.is_set_gps_time():
print("GPS time will be skipped (saves memory)")
# Progressive decompression - start minimal, add fields as needed
selection = DecompressionSelection.xy_returns_channel()
# Read with minimal fields first
las = laspy.read('data.laz', decompression_selection=selection)
print(f"Initial load: XY coordinates only")
# If analysis needs Z data, reload with Z
if analysis_needs_height:
selection = selection.decompress_z()
las = laspy.read('data.laz', decompression_selection=selection)
print("Reloaded with Z coordinate")
# For color analysis, add RGB
if analysis_needs_color:
selection = selection.decompress_rgb()
las = laspy.read('data.laz', decompression_selection=selection)
print("Reloaded with RGB data")Utilities for working with compressed point formats and format conversion.
def is_point_format_compressed(point_format_id: int) -> bool:
"""
Check if point format uses compression.
Parameters:
- point_format_id: int - Point format ID to check
Returns:
bool: True if point format is compressed
"""
def compressed_id_to_uncompressed(point_format_id: int) -> int:
"""
Convert compressed point format ID to uncompressed equivalent.
Parameters:
- point_format_id: int - Compressed point format ID
Returns:
int: Uncompressed point format ID
"""
def uncompressed_id_to_compressed(point_format_id: int) -> int:
"""
Convert uncompressed point format ID to compressed equivalent.
Parameters:
- point_format_id: int - Uncompressed point format ID
Returns:
int: Compressed point format ID
"""Usage Examples:
import laspy
from laspy.compression import (
is_point_format_compressed,
compressed_id_to_uncompressed,
uncompressed_id_to_compressed
)
# Check point format compression status
for fmt_id in range(11): # Point formats 0-10
is_compressed = is_point_format_compressed(fmt_id)
print(f"Point format {fmt_id}: {'Compressed' if is_compressed else 'Uncompressed'}")
# Convert between compressed and uncompressed IDs
uncompressed_fmt = 3 # Standard point format 3
compressed_fmt = uncompressed_id_to_compressed(uncompressed_fmt)
print(f"Point format {uncompressed_fmt} compressed equivalent: {compressed_fmt}")
back_to_uncompressed = compressed_id_to_uncompressed(compressed_fmt)
print(f"Point format {compressed_fmt} uncompressed equivalent: {back_to_uncompressed}")
# Example: Force compression when writing
las = laspy.read('input.las')
current_fmt = las.header.point_format.id
if not is_point_format_compressed(current_fmt):
# Convert to compressed equivalent for writing
compressed_fmt = uncompressed_id_to_compressed(current_fmt)
converted = laspy.convert(las, point_format_id=compressed_fmt)
converted.write('compressed_output.laz', do_compress=True)
print(f"Converted format {current_fmt} to {compressed_fmt} and compressed")import laspy
import time
from laspy import LazBackend
def benchmark_backends(laz_file, chunk_size=100000):
"""Compare performance of different LAZ backends."""
available_backends = LazBackend.detect_available()
results = {}
for backend in available_backends:
print(f"Testing {backend.name}...")
start_time = time.time()
total_points = 0
try:
with laspy.open(laz_file, laz_backend=backend) as reader:
for chunk in reader.chunk_iterator(chunk_size):
total_points += len(chunk)
elapsed = time.time() - start_time
throughput = total_points / elapsed
results[backend.name] = {
'elapsed': elapsed,
'points': total_points,
'throughput': throughput
}
print(f" {total_points:,} points in {elapsed:.2f}s ({throughput:,.0f} points/sec)")
except Exception as e:
print(f" Failed: {e}")
results[backend.name] = {'error': str(e)}
return results
# Run benchmark
results = benchmark_backends('large_file.laz')
# Find fastest backend
fastest = None
best_throughput = 0
for backend_name, result in results.items():
if 'throughput' in result and result['throughput'] > best_throughput:
best_throughput = result['throughput']
fastest = backend_name
if fastest:
print(f"\nFastest backend: {fastest} ({best_throughput:,.0f} points/sec)")import laspy
from laspy import DecompressionSelection, LazBackend
def memory_efficient_processing(laz_file, output_file, processing_func):
"""Process large LAZ files with minimal memory usage."""
# Use most efficient backend
backend = LazBackend.detect_available()[0]
# Minimal field selection
selection = DecompressionSelection.base()
print(f"Processing {laz_file} with {backend.name}")
print(f"Decompression selection: base fields only")
with laspy.open(laz_file, laz_backend=backend, decompression_selection=selection) as reader:
header = reader.header.copy()
with laspy.open(output_file, mode='w', header=header,
laz_backend=backend, do_compress=True) as writer:
total_processed = 0
chunk_size = 50000 # Small chunks for low memory usage
for chunk in reader.chunk_iterator(chunk_size):
# Process chunk
processed_chunk = processing_func(chunk)
# Write immediately to minimize memory usage
if len(processed_chunk) > 0:
writer.write_points(processed_chunk)
total_processed += len(processed_chunk)
# Progress indication
if total_processed % 500000 == 0:
print(f"Processed {total_processed:,} points")
print(f"Completed: {total_processed:,} points processed")
def ground_filter(points):
"""Simple ground filtering example."""
if len(points) == 0:
return points
# Keep only ground and low vegetation
mask = (points.classification == 2) | (points.classification == 3)
return points[mask]
# Use memory-efficient processing
memory_efficient_processing('input.laz', 'filtered.laz', ground_filter)import laspy
import os
from laspy import LazBackend
def analyze_compression_ratio(input_las, backends=None):
"""Analyze compression ratios for different backends."""
if backends is None:
backends = LazBackend.detect_available()
# Get original file size
original_size = os.path.getsize(input_las)
print(f"Original LAS file: {original_size:,} bytes")
las = laspy.read(input_las)
results = {}
for backend in backends:
try:
# Write compressed file with this backend
temp_file = f"temp_{backend.name.lower()}.laz"
with laspy.open(temp_file, mode='w', header=las.header,
laz_backend=backend, do_compress=True) as writer:
writer.write_points(las.points)
# Check compressed size
compressed_size = os.path.getsize(temp_file)
ratio = original_size / compressed_size
results[backend.name] = {
'compressed_size': compressed_size,
'compression_ratio': ratio,
'space_saved': 1 - (compressed_size / original_size)
}
print(f"{backend.name}:")
print(f" Compressed size: {compressed_size:,} bytes")
print(f" Compression ratio: {ratio:.1f}:1")
print(f" Space saved: {results[backend.name]['space_saved']:.1%}")
# Clean up temp file
os.remove(temp_file)
except Exception as e:
print(f"{backend.name}: Failed - {e}")
results[backend.name] = {'error': str(e)}
return results
# Analyze different backends
compression_results = analyze_compression_ratio('sample.las')
# Find best compression
best_ratio = 0
best_backend = None
for backend_name, result in compression_results.items():
if 'compression_ratio' in result and result['compression_ratio'] > best_ratio:
best_ratio = result['compression_ratio']
best_backend = backend_name
if best_backend:
print(f"\nBest compression: {best_backend} ({best_ratio:.1f}:1)")import laspy
from laspy import DecompressionSelection
def process_fields_selectively(laz_file):
"""Demonstrate selective field processing for different analyses."""
# Analysis 1: Terrain analysis (needs XYZ + classification)
print("Terrain analysis...")
selection = (DecompressionSelection.xy_returns_channel()
.decompress_z()
.decompress_classification())
with laspy.open(laz_file, decompression_selection=selection) as reader:
total_ground = 0
for chunk in reader.chunk_iterator(100000):
ground_points = chunk[chunk.classification == 2]
total_ground += len(ground_points)
print(f"Found {total_ground:,} ground points")
# Analysis 2: Intensity analysis (needs XYZ + intensity)
print("Intensity analysis...")
selection = (DecompressionSelection.base()
.decompress_intensity())
with laspy.open(laz_file, decompression_selection=selection) as reader:
intensity_stats = []
for chunk in reader.chunk_iterator(100000):
if hasattr(chunk, 'intensity'):
intensity_stats.extend(chunk.intensity)
if intensity_stats:
import numpy as np
mean_intensity = np.mean(intensity_stats)
print(f"Mean intensity: {mean_intensity:.1f}")
# Analysis 3: Color analysis (needs RGB)
print("Color analysis...")
selection = (DecompressionSelection.xy_returns_channel()
.decompress_rgb())
with laspy.open(laz_file, decompression_selection=selection) as reader:
has_color = False
for chunk in reader.chunk_iterator(100000):
if hasattr(chunk, 'red'):
has_color = True
break
if has_color:
print("File contains RGB color information")
else:
print("File does not contain RGB color")
# Run selective analyses
process_fields_selectively('sample.laz')Install with Tessl CLI
npx tessl i tessl/pypi-laspy