Discrete and continuous wavelet transforms for signal and image processing with comprehensive 1D, 2D, and nD transform support.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Utilities for converting between different coefficient representations, packing/unpacking coefficients, and working with coefficient arrays for efficient processing and storage.
Functions for converting between coefficient lists and single arrays for efficient processing.
def coeffs_to_array(coeffs, padding: int = 0, axes=None):
"""
Pack wavelet coefficients into a single array.
Parameters:
- coeffs: Coefficient list from wavedec, wavedec2, or wavedecn
- padding: Additional padding between coefficient arrays
- axes: Axes information (for nD coefficients)
Returns:
(coeff_arr, coeff_slices) tuple where:
- coeff_arr: 1D array containing all coefficients
- coeff_slices: List of slice objects for extracting individual coefficients
"""
def array_to_coeffs(arr, coeff_slices, output_format: str = 'wavedecn'):
"""
Unpack single array back to coefficient format.
Parameters:
- arr: 1D coefficient array from coeffs_to_array
- coeff_slices: Slice information from coeffs_to_array
- output_format: Output format ('wavedec', 'wavedec2', 'wavedecn')
Returns:
Coefficient list in specified format
"""import pywt
import numpy as np
import matplotlib.pyplot as plt
# Create test signal and perform multilevel DWT
np.random.seed(42)
signal = np.sin(2 * np.pi * 5 * np.linspace(0, 1, 1024)) + 0.3 * np.random.randn(1024)
coeffs = pywt.wavedec(signal, 'db4', level=6)
print(f"Original coefficient format: {len(coeffs)} arrays")
print(f"Coefficient array lengths: {[len(c) for c in coeffs]}")
print(f"Total coefficients: {sum(len(c) for c in coeffs)}")
# Pack coefficients into single array
coeff_arr, coeff_slices = pywt.coeffs_to_array(coeffs, padding=0)
print(f"\nPacked array shape: {coeff_arr.shape}")
print(f"Number of slices: {len(coeff_slices)}")
# Show slice information
for i, slice_info in enumerate(coeff_slices):
coeff_type = "Approximation" if i == 0 else f"Detail {len(coeffs)-i}"
print(f"{coeff_type:>15}: {slice_info}")
# Verify packing/unpacking
reconstructed_coeffs = pywt.array_to_coeffs(coeff_arr, coeff_slices, 'wavedec')
print(f"\nReconstruction verification:")
for i, (orig, recon) in enumerate(zip(coeffs, reconstructed_coeffs)):
error = np.max(np.abs(orig - recon))
print(f"Coefficient {i}: max error = {error:.2e}")
# Demonstrate coefficient processing in array format
def process_coeffs_in_array(coeff_arr, coeff_slices, processing_func):
"""Process coefficients while in array format."""
processed_arr = coeff_arr.copy()
# Skip approximation coefficients (first slice)
for i in range(1, len(coeff_slices)):
slice_obj = coeff_slices[i]
detail_coeffs = processed_arr[slice_obj]
processed_arr[slice_obj] = processing_func(detail_coeffs)
return processed_arr
# Example: Soft thresholding in array format
def soft_threshold(x, threshold=0.1):
return np.sign(x) * np.maximum(np.abs(x) - threshold, 0)
processed_arr = process_coeffs_in_array(coeff_arr, coeff_slices,
lambda x: soft_threshold(x, 0.05))
# Convert back and reconstruct
processed_coeffs = pywt.array_to_coeffs(processed_arr, coeff_slices, 'wavedec')
reconstructed_signal = pywt.waverec(processed_coeffs, 'db4')
# Visualization
plt.figure(figsize=(12, 8))
plt.subplot(3, 1, 1)
plt.plot(signal, 'b-', label='Original signal')
plt.title('Original Signal')
plt.legend()
plt.grid(True)
plt.subplot(3, 1, 2)
plt.plot(coeff_arr, 'g-', label='Packed coefficients')
plt.title('Coefficients in Array Format')
plt.xlabel('Coefficient Index')
plt.legend()
plt.grid(True)
# Mark boundaries between coefficient types
boundary_indices = [0]
for slice_obj in coeff_slices:
boundary_indices.append(slice_obj.stop)
for boundary in boundary_indices[1:-1]:
plt.axvline(x=boundary, color='r', linestyle='--', alpha=0.7)
plt.subplot(3, 1, 3)
plt.plot(signal, 'b-', alpha=0.7, label='Original')
plt.plot(reconstructed_signal, 'r-', label='Processed & reconstructed')
plt.title('Processed Signal (Soft Thresholding)')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
print(f"Processing SNR improvement: {10 * np.log10(np.var(signal) / np.var(signal - reconstructed_signal)):.2f} dB")Functions for converting coefficients to 1D arrays while preserving shape information.
def ravel_coeffs(coeffs, axes=None):
"""
Ravel coefficients to 1D array with shape preservation.
Parameters:
- coeffs: Coefficient list from multilevel decomposition
- axes: Axes information (for nD coefficients)
Returns:
(coeff_arr, coeff_slices, coeff_shapes) tuple where:
- coeff_arr: 1D raveled coefficient array
- coeff_slices: Slice information for extraction
- coeff_shapes: Original shapes of coefficient arrays
"""
def unravel_coeffs(arr, coeff_slices, coeff_shapes, output_format: str = 'wavedecn'):
"""
Unravel 1D array back to coefficient format with proper shapes.
Parameters:
- arr: 1D raveled coefficient array
- coeff_slices: Slice information from ravel_coeffs
- coeff_shapes: Shape information from ravel_coeffs
- output_format: Output format specification
Returns:
Coefficient list with proper shapes
"""import pywt
import numpy as np
# 2D image example
image = np.random.randn(128, 128)
coeffs_2d = pywt.wavedec2(image, 'db2', level=4)
print("2D Coefficient Structure:")
print(f"Approximation shape: {coeffs_2d[0].shape}")
for i, (cH, cV, cD) in enumerate(coeffs_2d[1:]):
level = len(coeffs_2d) - 1 - i
print(f"Level {level} details: H{cH.shape}, V{cV.shape}, D{cD.shape}")
# Ravel all coefficients
coeff_arr, coeff_slices, coeff_shapes = pywt.ravel_coeffs(coeffs_2d)
print(f"\nRaveled array shape: {coeff_arr.shape}")
print(f"Number of coefficient groups: {len(coeff_slices)}")
print(f"Coefficient shapes: {coeff_shapes}")
# Process in raveled format (e.g., apply ML model)
def simulate_ml_processing(arr):
"""Simulate machine learning processing of coefficients."""
# Example: Apply learned transform (simulated with random matrix)
np.random.seed(123)
# In practice, this would be a trained model
transform_matrix = np.eye(len(arr)) + 0.01 * np.random.randn(len(arr), len(arr))
return transform_matrix @ arr
processed_arr = simulate_ml_processing(coeff_arr)
# Unravel back to coefficient format
processed_coeffs = pywt.unravel_coeffs(processed_arr, coeff_slices, coeff_shapes, 'wavedec2')
processed_image = pywt.waverec2(processed_coeffs, 'db2')
print(f"\nImage reconstruction error: {np.max(np.abs(image - processed_image)):.6f}")
# 3D volume example
volume = np.random.randn(32, 32, 32)
coeffs_3d = pywt.wavedecn(volume, 'haar', level=3)
print(f"\n3D Volume Processing:")
print(f"Original volume shape: {volume.shape}")
print(f"Number of 3D coefficient groups: {len(coeffs_3d)}")
# Ravel 3D coefficients
coeff_arr_3d, coeff_slices_3d, coeff_shapes_3d = pywt.ravel_coeffs(coeffs_3d)
print(f"3D raveled array shape: {coeff_arr_3d.shape}")
# Simple processing: scale down high-frequency components
processed_arr_3d = coeff_arr_3d.copy()
# Assume first group is approximation, rest are details
total_coeffs = len(coeff_arr_3d)
approx_end = coeff_slices_3d[1].start if len(coeff_slices_3d) > 1 else total_coeffs
processed_arr_3d[approx_end:] *= 0.8 # Reduce detail coefficients
# Unravel and reconstruct
processed_coeffs_3d = pywt.unravel_coeffs(processed_arr_3d, coeff_slices_3d, coeff_shapes_3d, 'wavedecn')
processed_volume = pywt.waverecn(processed_coeffs_3d, 'haar')
print(f"3D volume reconstruction error: {np.max(np.abs(volume - processed_volume)):.6f}")Utilities for analyzing coefficient structures without computing transforms.
def wavedecn_shapes(shape, wavelet, mode: str = 'symmetric', level: int = None, axes=None):
"""
Get shapes of coefficient arrays without computing the transform.
Parameters:
- shape: Input data shape
- wavelet: Wavelet specification
- mode: Signal extension mode
- level: Decomposition level (default: maximum possible)
- axes: Axes for transform (default: all axes)
Returns:
List of coefficient shapes that would result from wavedecn
"""
def wavedecn_size(shapes):
"""
Calculate total number of coefficients from shapes.
Parameters:
- shapes: List of coefficient shapes from wavedecn_shapes
Returns:
Total number of coefficients
"""import pywt
import numpy as np
# Analyze coefficient structures for different input sizes
input_shapes = [(1024,), (512, 512), (64, 64, 64), (32, 32, 32, 100)]
wavelet = 'db4'
print("Coefficient Structure Analysis:")
print("=" * 60)
for shape in input_shapes:
ndim = len(shape)
max_level = pywt.dwtn_max_level(shape, wavelet) if ndim > 1 else pywt.dwt_max_level(shape[0], pywt.Wavelet(wavelet).dec_len)
print(f"\nInput shape: {shape}")
print(f"Maximum decomposition level: {max_level}")
# Analyze different decomposition levels
for level in range(1, min(max_level + 1, 6)): # Limit to 5 levels for display
if ndim == 1:
# Use regular DWT functions for 1D
test_coeffs = pywt.wavedec(np.zeros(shape), wavelet, level=level)
coeff_shapes = [c.shape for c in test_coeffs]
total_coeffs = sum(c.size for c in test_coeffs)
else:
# Use shape analysis for nD
coeff_shapes = pywt.wavedecn_shapes(shape, wavelet, level=level)
total_coeffs = pywt.wavedecn_size(coeff_shapes)
compression_ratio = np.prod(shape) / total_coeffs
print(f" Level {level}: {len(coeff_shapes)} groups, {total_coeffs} total coeffs, compression ratio: {compression_ratio:.3f}")
# Memory usage analysis
def analyze_memory_usage(shape, wavelet, level):
"""Analyze memory usage for different coefficient representations."""
# Original data size
original_size = np.prod(shape) * 8 # 8 bytes per float64
if len(shape) == 1:
coeffs = pywt.wavedec(np.zeros(shape), wavelet, level=level)
coeff_sizes = [c.size for c in coeffs]
else:
coeff_shapes = pywt.wavedecn_shapes(shape, wavelet, level=level)
coeff_sizes = [np.prod(s) if isinstance(s, tuple) else sum(np.prod(s) for s in s.values()) for s in coeff_shapes]
# Coefficient list memory
coeffs_size = sum(coeff_sizes) * 8
# Array format memory (coeffs_to_array)
array_size = sum(coeff_sizes) * 8 # Same as coeffs but contiguous
# Slice overhead (approximate)
slice_overhead = len(coeff_sizes) * 64 # Rough estimate
return {
'original': original_size,
'coeffs': coeffs_size,
'array': array_size + slice_overhead,
'compression': original_size / coeffs_size
}
# Memory analysis example
test_shape = (1024, 1024)
test_level = 5
memory_info = analyze_memory_usage(test_shape, 'db4', test_level)
print(f"\nMemory Usage Analysis for {test_shape} at level {test_level}:")
print(f"Original data: {memory_info['original'] / 1024**2:.2f} MB")
print(f"Coefficient lists: {memory_info['coeffs'] / 1024**2:.2f} MB")
print(f"Array format: {memory_info['array'] / 1024**2:.2f} MB")
print(f"Compression ratio: {memory_info['compression']:.3f}")
# Coefficient distribution analysis
def analyze_coefficient_distribution(coeffs):
"""Analyze the distribution of coefficient values."""
if isinstance(coeffs[0], np.ndarray):
# 1D coefficients
all_coeffs = np.concatenate(coeffs[1:]) # Skip approximation
else:
# 2D/nD coefficients - extract all detail coefficients
all_coeffs = []
for level_coeffs in coeffs[1:]:
if isinstance(level_coeffs, tuple):
# 2D case: (cH, cV, cD)
all_coeffs.extend([level_coeffs[0].ravel(), level_coeffs[1].ravel(), level_coeffs[2].ravel()])
else:
# nD case: dictionary
for key, coeff in level_coeffs.items():
all_coeffs.append(coeff.ravel())
all_coeffs = np.concatenate(all_coeffs)
return {
'mean': np.mean(all_coeffs),
'std': np.std(all_coeffs),
'min': np.min(all_coeffs),
'max': np.max(all_coeffs),
'zeros': np.sum(all_coeffs == 0) / len(all_coeffs) * 100,
'small': np.sum(np.abs(all_coeffs) < 0.01) / len(all_coeffs) * 100
}
# Test coefficient distribution
test_signal = np.sin(2 * np.pi * 5 * np.linspace(0, 1, 1024)) + 0.1 * np.random.randn(1024)
test_coeffs = pywt.wavedec(test_signal, 'db8', level=6)
dist_info = analyze_coefficient_distribution(test_coeffs)
print(f"\nCoefficient Distribution Analysis:")
print(f"Mean: {dist_info['mean']:.6f}")
print(f"Std: {dist_info['std']:.6f}")
print(f"Range: [{dist_info['min']:.6f}, {dist_info['max']:.6f}]")
print(f"Exact zeros: {dist_info['zeros']:.2f}%")
print(f"Small coefficients (|x| < 0.01): {dist_info['small']:.2f}%")Advanced utilities for coefficient processing and analysis.
import pywt
import numpy as np
import matplotlib.pyplot as plt
def coefficient_energy_analysis(coeffs):
"""Analyze energy distribution across coefficient levels."""
if isinstance(coeffs[0], np.ndarray):
# 1D case
energies = [np.sum(c**2) for c in coeffs]
labels = ['Approx'] + [f'Detail {len(coeffs)-i}' for i in range(1, len(coeffs))]
else:
# 2D case
energies = [np.sum(coeffs[0]**2)] # Approximation
labels = ['Approximation']
for i, (cH, cV, cD) in enumerate(coeffs[1:]):
level = len(coeffs) - 1 - i
energies.extend([np.sum(cH**2), np.sum(cV**2), np.sum(cD**2)])
labels.extend([f'H{level}', f'V{level}', f'D{level}'])
total_energy = sum(energies)
energy_percentages = [e/total_energy * 100 for e in energies]
return labels, energies, energy_percentages
# Test energy analysis
test_image = np.random.randn(256, 256)
coeffs_2d = pywt.wavedec2(test_image, 'db4', level=4)
labels, energies, percentages = coefficient_energy_analysis(coeffs_2d)
print("Energy Distribution Analysis:")
for label, energy, pct in zip(labels, energies, percentages):
print(f"{label:>12}: {energy:>10.2f} ({pct:>5.2f}%)")
# Visualize energy distribution
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.bar(range(len(labels)), percentages)
plt.xticks(range(len(labels)), labels, rotation=45)
plt.ylabel('Energy Percentage')
plt.title('Coefficient Energy Distribution')
plt.grid(True, alpha=0.3)
# Cumulative energy
cumulative = np.cumsum(percentages)
plt.subplot(1, 2, 2)
plt.plot(range(len(labels)), cumulative, 'o-')
plt.xticks(range(len(labels)), labels, rotation=45)
plt.ylabel('Cumulative Energy Percentage')
plt.title('Cumulative Energy Distribution')
plt.grid(True, alpha=0.3)
plt.axhline(y=90, color='r', linestyle='--', alpha=0.7, label='90% Energy')
plt.axhline(y=95, color='r', linestyle='--', alpha=0.7, label='95% Energy')
plt.legend()
plt.tight_layout()
plt.show()
# Find coefficient threshold for energy retention
def find_energy_threshold(coeffs, energy_target=0.9):
"""Find threshold that retains specified fraction of energy."""
# Collect all detail coefficients
all_details = []
if isinstance(coeffs[0], np.ndarray):
# 1D case
all_details = np.concatenate(coeffs[1:])
else:
# 2D case
for cH, cV, cD in coeffs[1:]:
all_details.extend([cH.ravel(), cV.ravel(), cD.ravel()])
all_details = np.concatenate(all_details)
# Sort by absolute value (descending)
sorted_coeffs = np.sort(np.abs(all_details))[::-1]
# Find threshold for energy retention
total_energy = np.sum(sorted_coeffs**2)
target_energy = energy_target * total_energy
cumulative_energy = np.cumsum(sorted_coeffs**2)
threshold_idx = np.argmax(cumulative_energy >= target_energy)
threshold = sorted_coeffs[threshold_idx]
return threshold, threshold_idx / len(sorted_coeffs)
threshold_90, coeff_fraction = find_energy_threshold(coeffs_2d, 0.9)
print(f"\nFor 90% energy retention:")
print(f"Threshold: {threshold_90:.6f}")
print(f"Coefficients kept: {coeff_fraction:.1%}")# Coefficient array formats
CoeffArray = np.ndarray # 1D array containing packed coefficients
CoeffSlices = List[slice] # Slice objects for extracting individual coefficients
CoeffShapes = List[Union[tuple, Dict[str, tuple]]] # Shape information for reconstruction
# Output format specifications
OutputFormat = Literal['wavedec', 'wavedec2', 'wavedecn']
# Coefficient analysis results
EnergyAnalysis = Tuple[List[str], List[float], List[float]] # (labels, energies, percentages)Install with Tessl CLI
npx tessl i tessl/pypi-pywavelets