A full featured python library to read from and write to FITS files
—
FITS table data handling functionality supporting both binary and ASCII tables with advanced features including column operations, row filtering, variable-length columns, table modifications, and efficient data access patterns.
Handler for FITS binary table extensions with comprehensive table operations and data manipulation capabilities.
class TableHDU:
def read(self, columns=None, rows=None, vstorage='fixed', **kwargs):
"""
Read table data as structured array.
Parameters:
- columns: list/str, column names to read
- rows: list/array, specific rows to read
- vstorage: str, variable length storage ('fixed' or 'object')
- **kwargs: additional read options
Returns:
numpy structured array
"""
def read_column(self, col, rows=None, vstorage='fixed', **kwargs):
"""
Read single column data.
Parameters:
- col: str/int, column name or number
- rows: list/array, specific rows to read
- vstorage: str, variable length storage method
- **kwargs: additional options
Returns:
numpy array
"""
def read_columns(self, columns, rows=None, **kwargs):
"""
Read multiple columns efficiently.
Parameters:
- columns: list, column names to read
- rows: list/array, specific rows to read
- **kwargs: additional options
Returns:
numpy structured array
"""
def read_rows(self, rows, columns=None, **kwargs):
"""
Read specific rows.
Parameters:
- rows: list/array, row indices to read
- columns: list, columns to include
- **kwargs: additional options
Returns:
numpy structured array
"""
def read_slice(self, firstrow, lastrow, columns=None, **kwargs):
"""
Read row range slice.
Parameters:
- firstrow: int, first row index
- lastrow: int, last row index
- columns: list, columns to include
- **kwargs: additional options
Returns:
numpy structured array
"""
def write(self, data, firstrow=0, **kwargs):
"""
Write/overwrite table data.
Parameters:
- data: structured array/dict/list, table data
- firstrow: int, starting row for writing
- **kwargs: additional write options
"""
def write_column(self, column, data, firstrow=0, **kwargs):
"""
Write single column data.
Parameters:
- column: str/int, column name or number
- data: array-like, column data
- firstrow: int, starting row
- **kwargs: additional options
"""
def write_var_column(self, column, data, **kwargs):
"""
Write variable length column.
Parameters:
- column: str/int, column name or number
- data: list/object array, variable length data
- **kwargs: additional options
"""
def append(self, data, **kwargs):
"""
Append rows to existing table.
Parameters:
- data: structured array/dict/list, new rows
- **kwargs: additional options
"""
def insert_column(self, name, data, colnum=None, **kwargs):
"""
Insert new column into table.
Parameters:
- name: str, column name
- data: array-like, column data
- colnum: int, column position (default: append)
- **kwargs: column format options
"""
def delete_rows(self, rows):
"""
Delete specified rows.
Parameters:
- rows: list/array, row indices to delete
"""
def resize(self, nrows, front=False):
"""
Change table size.
Parameters:
- nrows: int, new number of rows
- front: bool, add/remove rows at front
"""
def where(self, expression, **kwargs):
"""
Query table with WHERE expression.
Parameters:
- expression: str, SQL-like WHERE clause
- **kwargs: additional options
Returns:
numpy array, row indices matching expression
"""
def get_nrows(self):
"""
Get number of rows.
Returns:
int, number of rows
"""
def get_colnames(self):
"""
Get column names.
Returns:
list of str, column names
"""
def get_colname(self, colnum):
"""
Get column name by number.
Parameters:
- colnum: int, column number (0-based)
Returns:
str, column name
"""
def get_vstorage(self):
"""
Get variable length storage method.
Returns:
str, storage method ('fixed' or 'object')
"""
def get_rec_dtype(self, **kwargs):
"""
Get record array dtype.
Parameters:
- **kwargs: dtype options
Returns:
numpy dtype, structured array dtype
"""
def get_rec_column_descr(self):
"""
Get column descriptions for structured array.
Returns:
list, column descriptors
"""
def __getitem__(self, key):
"""
Access table data by column/row.
Parameters:
- key: str/int/tuple/slice, column name, row index, or slice
Returns:
numpy array or structured array
"""
def __iter__(self):
"""
Iterate over table rows with buffering.
Yields:
numpy structured array, individual rows
"""
# Inherited from HDUBase
def get_info(self):
"""Get complete HDU information."""
def get_offsets(self):
"""Get byte offsets (header_start, data_start, data_end)."""
def get_extnum(self):
"""Get extension number."""
def get_extname(self):
"""Get extension name."""
def get_extver(self):
"""Get extension version."""
def get_exttype(self, num=False):
"""Get extension type."""
def read_header(self):
"""Read header as FITSHDR object."""
def write_key(self, name, value, comment=""):
"""Write single header keyword."""
def write_keys(self, records, clean=True):
"""Write multiple header keywords."""
def write_checksum(self):
"""Write DATASUM/CHECKSUM keywords."""
def verify_checksum(self):
"""Verify data integrity."""Handler for FITS ASCII table extensions, inheriting from TableHDU with ASCII-specific operations.
class AsciiTableHDU(TableHDU):
"""ASCII table HDU with same interface as TableHDU."""# Supported column data types:
# - 'L': Logical (boolean)
# - 'B': Unsigned byte (uint8)
# - 'I': 16-bit integer (int16)
# - 'J': 32-bit integer (int32)
# - 'K': 64-bit integer (int64)
# - 'A': Character string
# - 'E': Single precision float (float32)
# - 'D': Double precision float (float64)
# - 'C': Single precision complex (complex64)
# - 'M': Double precision complex (complex128)
# - 'P': Variable length array pointer
# - 'Q': Variable length array pointer (64-bit)BINARY_TBL = 2
ASCII_TBL = 1import fitsio
import numpy as np
# Read entire table
data = fitsio.read('catalog.fits', ext=1)
print(f"Table shape: {data.shape}")
print(f"Columns: {data.dtype.names}")
# Read specific columns
coords = fitsio.read('catalog.fits', ext=1, columns=['ra', 'dec'])
# Read specific rows
subset = fitsio.read('catalog.fits', ext=1, rows=[0, 10, 100])
# Read with FITS object for more control
with fitsio.FITS('catalog.fits') as fits:
table = fits[1]
# Get table information
nrows = table.get_nrows()
colnames = table.get_colnames()
print(f"Table has {nrows} rows and columns: {colnames}")
# Read different ways
all_data = table.read()
single_col = table.read_column('flux')
multi_col = table.read_columns(['x', 'y', 'flux'])
row_slice = table.read_slice(100, 200)import fitsio
with fitsio.FITS('catalog.fits') as fits:
table = fits[1]
# Column access
x_values = table['x'][:] # All x values
y_values = table['y'][:] # All y values
coords = table['x', 'y'][:] # Multiple columns
# Row access
first_100 = table[:100] # First 100 rows
every_10th = table[::10] # Every 10th row
specific_rows = table[[5, 15, 25]] # Specific row indices
# Combined access
subset = table['flux'][100:200] # Column slice
multi_subset = table['x', 'y'][50:150] # Multiple columns, row rangeimport fitsio
import numpy as np
# Create structured array
nrows = 1000
data = np.zeros(nrows, dtype=[
('id', 'i4'),
('ra', 'f8'),
('dec', 'f8'),
('flux', 'f4'),
('flag', 'bool'),
('name', 'U20')
])
# Fill with data
data['id'] = np.arange(nrows)
data['ra'] = np.random.uniform(0, 360, nrows)
data['dec'] = np.random.uniform(-90, 90, nrows)
data['flux'] = np.random.lognormal(0, 1, nrows)
data['flag'] = np.random.choice([True, False], nrows)
data['name'] = [f'object_{i:04d}' for i in range(nrows)]
# Write table
fitsio.write('new_catalog.fits', data)
# Write with header
header = fitsio.FITSHDR([
{'name': 'EXTNAME', 'value': 'CATALOG', 'comment': 'Table name'},
{'name': 'COORDSYS', 'value': 'EQUATORIAL', 'comment': 'Coordinate system'}
])
fitsio.write('catalog_with_header.fits', data, header=header)import fitsio
import numpy as np
with fitsio.FITS('catalog.fits', 'rw') as fits:
table = fits[1]
# Append new rows
new_data = np.zeros(50, dtype=table.get_rec_dtype())
new_data['id'] = np.arange(1000, 1050)
table.append(new_data)
# Insert new column
magnitudes = np.random.uniform(15, 25, table.get_nrows())
table.insert_column('magnitude', magnitudes)
# Overwrite specific rows
updated_rows = np.zeros(10, dtype=table.get_rec_dtype())
table.write(updated_rows, firstrow=100)
# Write to specific column
new_fluxes = np.random.lognormal(0, 1, table.get_nrows())
table.write_column('flux', new_fluxes)
# Delete rows
table.delete_rows([5, 15, 25]) # Delete specific rows
# Resize table
table.resize(800) # Truncate to 800 rowsimport fitsio
import numpy as np
# Create data with variable length arrays
nrows = 100
data = np.zeros(nrows, dtype=[
('id', 'i4'),
('measurements', 'O'), # Object array for variable length
('notes', 'O') # Object array for variable strings
])
data['id'] = np.arange(nrows)
# Variable length numeric arrays
for i in range(nrows):
n_measurements = np.random.randint(1, 20)
data['measurements'][i] = np.random.random(n_measurements)
data['notes'][i] = f"Object {i} has {n_measurements} measurements"
# Write with object storage
fitsio.write('variable_length.fits', data)
# Read with object storage
var_data = fitsio.read('variable_length.fits', vstorage='object')
print(f"First object measurements: {var_data['measurements'][0]}")
# Read with fixed storage (padded arrays)
fixed_data = fitsio.read('variable_length.fits', vstorage='fixed')import fitsio
with fitsio.FITS('catalog.fits') as fits:
table = fits[1]
# Query with WHERE clause
bright_stars = table.where("flux > 100.0")
northern = table.where("dec > 0")
complex_query = table.where("flux > 50 && dec > -30 && dec < 30")
# Use query results to read data
bright_data = table.read(rows=bright_stars)
# Combine with column selection
coords_north = table.read(columns=['ra', 'dec'], rows=northern)
# Multiple conditions
good_objects = table.where("flux > 10 && flag == 1")
print(f"Found {len(good_objects)} good objects")import fitsio
import numpy as np
# Create different data structures
# 1. List of arrays
x_data = np.random.random(100)
y_data = np.random.random(100)
names = [f'star_{i}' for i in range(100)]
# Write from list with names
fitsio.write('from_lists.fits', [x_data, y_data, names],
names=['x', 'y', 'name'])
# 2. Dictionary of arrays
data_dict = {
'x': x_data,
'y': y_data,
'name': names
}
fitsio.write('from_dict.fits', data_dict)
# 3. Record array (most flexible)
rec_data = np.rec.fromarrays([x_data, y_data, names],
names=['x', 'y', 'name'])
fitsio.write('from_recarray.fits', rec_data)import fitsio
# Efficient iteration over large tables
with fitsio.FITS('large_catalog.fits', iter_row_buffer=1000) as fits:
table = fits[1]
# Process table in chunks
for i, row in enumerate(table):
if i % 10000 == 0:
print(f"Processed {i} rows")
# Process individual row
if row['flux'] > 100:
# Do something with bright objects
pass
# Alternative: read in chunks manually
nrows = table.get_nrows()
chunk_size = 10000
for start in range(0, nrows, chunk_size):
end = min(start + chunk_size, nrows)
chunk = table.read_slice(start, end)
# Process chunk
bright = chunk[chunk['flux'] > 100]
print(f"Chunk {start}-{end}: found {len(bright)} bright objects")import fitsio
import numpy as np
with fitsio.FITS('catalog.fits', 'rw') as fits:
table = fits[1]
# Get complete column information
dtype = table.get_rec_dtype()
column_desc = table.get_rec_column_descr()
print(f"Table dtype: {dtype}")
print(f"Column descriptions: {column_desc}")
# Case sensitivity control
table.case_sensitive = True
table.lower = False
table.upper = False
# Work with array columns (multi-dimensional)
# Add column with 2D array data
array_col = np.random.random((table.get_nrows(), 3, 3))
table.insert_column('covariance', array_col)
# Read array column
cov_matrices = table['covariance'][:]
print(f"Covariance matrices shape: {cov_matrices.shape}")Install with Tessl CLI
npx tessl i tessl/pypi-fitsio