Blazingly fast DataFrame library for legacy CPUs without AVX2 support
—
Extensive support for reading and writing data in various formats including CSV, Parquet, JSON, Arrow IPC, databases, Excel, and cloud storage with streaming capabilities for efficient processing of large datasets.
Read and write CSV files with extensive customization options for delimiters, encoding, and data types.
def read_csv(
source: str | Path | list[str] | list[Path] | BinaryIO,
*,
has_header: bool = True,
columns: list[int] | list[str] | None = None,
new_columns: list[str] | None = None,
dtypes: dict[str, type] | Sequence[type] | None = None,
separator: str = ",",
comment_prefix: str | None = None,
quote_char: str | None = '"',
skip_rows: int = 0,
skip_rows_after_header: int = 0,
row_index_name: str | None = None,
row_index_offset: int = 0,
sample_size: int = 1024,
eol_char: str = "\n",
raise_if_empty: bool = True,
truncate_ragged_lines: bool = False,
rechunk: bool = False,
schema_overrides: dict[str, type] | None = None,
null_values: str | list[str] | dict[str, str] | None = None,
missing_utf8_is_empty_string: bool = False,
max_rows: int | None = None,
encoding: str = "utf8",
try_parse_dates: bool = False,
n_threads: int | None = None,
infer_schema_length: int | None = 100,
batch_size: int = 8192,
n_rows: int | None = None,
low_memory: bool = False,
rechunk_end: bool = True,
skip_blank_lines: bool = True,
ignore_errors: bool = False
) -> DataFrame:
"""
Read CSV file(s) into DataFrame.
Parameters:
- source: File path(s) or file-like object
- has_header: Whether first row contains headers
- columns: Columns to select by index or name
- dtypes: Data types for columns
- separator: Field separator character
- quote_char: Quote character for fields
- null_values: Values to interpret as null
- encoding: Text encoding
- n_threads: Number of threads for parallel processing
Returns:
- DataFrame: Parsed CSV data
"""
def read_csv_batched(
source: str | Path | BinaryIO,
*,
batch_size: int = 50000,
**kwargs
) -> BatchedCsvReader:
"""
Read CSV file in batches for memory-efficient processing.
Parameters:
- source: File path or file-like object
- batch_size: Number of rows per batch
- **kwargs: Same parameters as read_csv
Returns:
- BatchedCsvReader: Iterator yielding DataFrame batches
"""
def scan_csv(
source: str | Path | list[str] | list[Path],
*,
has_header: bool = True,
separator: str = ",",
comment_prefix: str | None = None,
quote_char: str | None = '"',
skip_rows: int = 0,
dtypes: dict[str, type] | None = None,
null_values: str | list[str] | dict[str, str] | None = None,
missing_utf8_is_empty_string: bool = False,
cache: bool = True,
with_column_names: Callable[[list[str]], list[str]] | None = None,
infer_schema_length: int | None = 100,
n_rows: int | None = None,
encoding: str = "utf8",
low_memory: bool = False,
rechunk: bool = False,
skip_rows_after_header: int = 0,
row_index_name: str | None = None,
row_index_offset: int = 0,
try_parse_dates: bool = False,
eol_char: str = "\n",
raise_if_empty: bool = True,
truncate_ragged_lines: bool = False,
schema: dict[str, type] | None = None,
ignore_errors: bool = False
) -> LazyFrame:
"""
Scan CSV file(s) for lazy processing.
Returns:
- LazyFrame: Lazy representation of CSV data
"""Read and write Apache Parquet files with compression and metadata options.
def read_parquet(
source: str | Path | list[str] | list[Path] | BinaryIO,
*,
columns: list[int] | list[str] | None = None,
n_rows: int | None = None,
row_index_name: str | None = None,
row_index_offset: int = 0,
parallel: str = "auto",
use_statistics: bool = True,
hive_partitioning: bool | None = None,
hive_schema: dict[str, type] | None = None,
try_parse_hive_dates: bool = True,
glob: bool = True,
schema: dict[str, type] | None = None,
rechunk: bool = False,
low_memory: bool = False,
storage_options: dict[str, Any] | None = None,
credential_provider: CredentialProvider | None = None,
retries: int = 2,
use_pyarrow: bool = False,
pyarrow_options: dict[str, Any] | None = None,
memory_map: bool = True
) -> DataFrame:
"""
Read Parquet file(s) into DataFrame.
Parameters:
- source: File path(s) or file-like object
- columns: Columns to select
- parallel: Parallel reading mode ('auto', 'columns', 'row_groups', 'none')
- use_statistics: Use Parquet statistics for optimization
- hive_partitioning: Enable Hive-style partitioning
- storage_options: Cloud storage configuration
- credential_provider: Cloud credentials
Returns:
- DataFrame: Parquet data
"""
def scan_parquet(
source: str | Path | list[str] | list[Path],
*,
n_rows: int | None = None,
row_index_name: str | None = None,
row_index_offset: int = 0,
parallel: str = "auto",
use_statistics: bool = True,
hive_partitioning: bool | None = None,
hive_schema: dict[str, type] | None = None,
try_parse_hive_dates: bool = True,
glob: bool = True,
schema: dict[str, type] | None = None,
cache: bool = True,
cloud_options: dict[str, Any] | None = None,
credential_provider: CredentialProvider | None = None,
retries: int = 2
) -> LazyFrame:
"""
Scan Parquet file(s) for lazy processing.
Returns:
- LazyFrame: Lazy representation of Parquet data
"""
def read_parquet_schema(source: str | Path | BinaryIO) -> dict[str, type]:
"""
Read schema from Parquet file without loading data.
Parameters:
- source: File path or file-like object
Returns:
- dict[str, type]: Column names and types
"""
def read_parquet_metadata(source: str | Path | BinaryIO) -> dict[str, Any]:
"""
Read metadata from Parquet file.
Parameters:
- source: File path or file-like object
Returns:
- dict[str, Any]: Parquet metadata
"""Read and write JSON and newline-delimited JSON (NDJSON) files.
def read_json(
source: str | Path | IOBase | bytes,
*,
schema: dict[str, type] | None = None,
schema_overrides: dict[str, type] | None = None,
infer_schema_length: int | None = 100
) -> DataFrame:
"""
Read JSON file into DataFrame.
Parameters:
- source: JSON file path or content
- schema: Expected schema
- schema_overrides: Override inferred types
- infer_schema_length: Rows to scan for schema inference
Returns:
- DataFrame: JSON data
"""
def read_ndjson(
source: str | Path | IOBase | bytes,
*,
schema: dict[str, type] | None = None,
schema_overrides: dict[str, type] | None = None,
ignore_errors: bool = False
) -> DataFrame:
"""
Read newline-delimited JSON file into DataFrame.
Parameters:
- source: NDJSON file path or content
- schema: Expected schema
- ignore_errors: Skip malformed JSON lines
Returns:
- DataFrame: NDJSON data
"""
def scan_ndjson(
source: str | Path | list[str] | list[Path],
*,
schema: dict[str, type] | None = None,
ignore_errors: bool = False,
batch_size: int | None = None,
n_rows: int | None = None,
low_memory: bool = False,
rechunk: bool = False,
row_index_name: str | None = None,
row_index_offset: int = 0,
infer_schema_length: int | None = 100
) -> LazyFrame:
"""
Scan NDJSON file(s) for lazy processing.
Returns:
- LazyFrame: Lazy representation of NDJSON data
"""Read and write Apache Arrow IPC format for efficient columnar data exchange.
def read_ipc(
source: str | Path | BinaryIO,
*,
columns: list[int] | list[str] | None = None,
n_rows: int | None = None,
row_index_name: str | None = None,
row_index_offset: int = 0,
rechunk: bool = False,
memory_map: bool = True,
storage_options: dict[str, Any] | None = None,
credential_provider: CredentialProvider | None = None,
retries: int = 2
) -> DataFrame:
"""
Read Arrow IPC file into DataFrame.
Parameters:
- source: IPC file path or file-like object
- columns: Columns to select
- memory_map: Use memory mapping for better performance
- storage_options: Cloud storage configuration
Returns:
- DataFrame: IPC data
"""
def read_ipc_stream(
source: str | Path | BinaryIO,
*,
columns: list[int] | list[str] | None = None,
n_rows: int | None = None,
row_index_name: str | None = None,
row_index_offset: int = 0,
rechunk: bool = False,
storage_options: dict[str, Any] | None = None,
credential_provider: CredentialProvider | None = None,
retries: int = 2
) -> DataFrame:
"""
Read Arrow IPC stream into DataFrame.
Returns:
- DataFrame: IPC stream data
"""
def scan_ipc(
source: str | Path | list[str] | list[Path],
*,
n_rows: int | None = None,
cache: bool = True,
rechunk: bool = False,
row_index_name: str | None = None,
row_index_offset: int = 0,
storage_options: dict[str, Any] | None = None,
credential_provider: CredentialProvider | None = None,
retries: int = 2,
memory_map: bool = True
) -> LazyFrame:
"""
Scan IPC file(s) for lazy processing.
Returns:
- LazyFrame: Lazy representation of IPC data
"""
def read_ipc_schema(source: str | Path | BinaryIO) -> dict[str, type]:
"""
Read schema from IPC file without loading data.
Returns:
- dict[str, type]: Column names and types
"""Connect to and query various databases with full SQL support.
def read_database(
query: str | RawExpr,
connection: str | ConnectionProtocol,
*,
partition_on: str | None = None,
partition_range: tuple[int, int] | None = None,
partition_num: int | None = None,
protocol: str | None = None,
engine: str | None = None,
schema_overrides: dict[str, type] | None = None,
execute_options: dict[str, Any] | None = None,
iter_batches: bool = False,
batch_size: int | None = None
) -> DataFrame:
"""
Execute database query and return DataFrame.
Parameters:
- query: SQL query string
- connection: Database connection string or object
- partition_on: Column for parallel partitioning
- protocol: Database protocol ('adbc', 'connectorx')
- engine: Database engine
- schema_overrides: Override inferred column types
Returns:
- DataFrame: Query results
"""
def read_database_uri(
query: str | RawExpr,
uri: str,
*,
partition_on: str | None = None,
partition_range: tuple[int, int] | None = None,
partition_num: int | None = None,
protocol: str | None = None,
engine: str | None = None,
schema_overrides: dict[str, type] | None = None,
execute_options: dict[str, Any] | None = None
) -> DataFrame:
"""
Execute database query using URI connection string.
Parameters:
- query: SQL query string
- uri: Database URI connection string
Returns:
- DataFrame: Query results
"""Read Excel and OpenDocument spreadsheet files.
def read_excel(
source: str | Path | BinaryIO,
*,
sheet_id: int | None = None,
sheet_name: str | None = None,
engine: str | None = None,
engine_options: dict[str, Any] | None = None,
read_options: dict[str, Any] | None = None,
schema_overrides: dict[str, type] | None = None,
infer_schema_length: int | None = None,
raise_if_empty: bool = True
) -> DataFrame:
"""
Read Excel file into DataFrame.
Parameters:
- source: Excel file path or file-like object
- sheet_id: Sheet index to read
- sheet_name: Sheet name to read
- engine: Excel engine ('calamine', 'openpyxl', 'xlsx2csv')
- schema_overrides: Override inferred column types
Returns:
- DataFrame: Excel data
"""
def read_ods(
source: str | Path | BinaryIO,
*,
sheet_id: int | None = None,
sheet_name: str | None = None,
schema_overrides: dict[str, type] | None = None,
infer_schema_length: int | None = None,
raise_if_empty: bool = True
) -> DataFrame:
"""
Read OpenDocument Spreadsheet file into DataFrame.
Parameters:
- source: ODS file path or file-like object
- sheet_id: Sheet index to read
- sheet_name: Sheet name to read
Returns:
- DataFrame: ODS data
"""Support for additional data formats.
def read_avro(
source: str | Path | BinaryIO,
*,
columns: list[int] | list[str] | None = None,
n_rows: int | None = None
) -> DataFrame:
"""
Read Apache Avro file into DataFrame.
Parameters:
- source: Avro file path or file-like object
- columns: Columns to select
- n_rows: Number of rows to read
Returns:
- DataFrame: Avro data
"""
def read_clipboard(*, separator: str = "\t", **kwargs) -> DataFrame:
"""
Read data from system clipboard.
Parameters:
- separator: Field separator
- **kwargs: Additional CSV parsing options
Returns:
- DataFrame: Clipboard data
"""
def read_delta(
source: str | Path,
*,
version: int | str | datetime | None = None,
columns: list[str] | None = None,
storage_options: dict[str, str] | None = None,
delta_table_options: dict[str, Any] | None = None,
pyarrow_options: dict[str, Any] | None = None
) -> DataFrame:
"""
Read Delta Lake table into DataFrame.
Parameters:
- source: Delta table path
- version: Table version to read
- columns: Columns to select
- storage_options: Cloud storage configuration
Returns:
- DataFrame: Delta table data
"""
def scan_delta(
source: str | Path,
*,
version: int | str | datetime | None = None,
storage_options: dict[str, str] | None = None,
delta_table_options: dict[str, Any] | None = None,
pyarrow_options: dict[str, Any] | None = None
) -> LazyFrame:
"""
Scan Delta Lake table for lazy processing.
Returns:
- LazyFrame: Lazy representation of Delta table
"""Integration with cloud storage providers and object stores.
# Cloud credential providers
class CredentialProvider:
"""Base class for cloud credential providers"""
class CredentialProviderAWS:
def __init__(
self,
*,
access_key_id: str | None = None,
secret_access_key: str | None = None,
session_token: str | None = None,
region: str | None = None,
profile: str | None = None
):
"""
AWS credential provider.
Parameters:
- access_key_id: AWS access key
- secret_access_key: AWS secret key
- session_token: AWS session token
- region: AWS region
- profile: AWS CLI profile name
"""
class CredentialProviderAzure:
def __init__(
self,
*,
account_name: str | None = None,
account_key: str | None = None,
sas_token: str | None = None,
tenant_id: str | None = None,
client_id: str | None = None,
client_secret: str | None = None
):
"""
Azure credential provider.
Parameters:
- account_name: Storage account name
- account_key: Storage account key
- sas_token: Shared access signature token
"""
class CredentialProviderGCP:
def __init__(
self,
*,
service_account_path: str | None = None,
service_account_key: str | None = None,
project_id: str | None = None
):
"""
Google Cloud Platform credential provider.
Parameters:
- service_account_path: Path to service account JSON file
- service_account_key: Service account key JSON string
- project_id: GCP project ID
"""
class CredentialProviderFunction:
def __init__(self, func: Callable[[], dict[str, str]]):
"""
Function-based credential provider.
Parameters:
- func: Function returning credential dictionary
"""
# Cloud scanning
def scan_iceberg(
source: str,
*,
mode: str = "convert",
pyarrow_options: dict[str, Any] | None = None
) -> LazyFrame:
"""
Scan Apache Iceberg table for lazy processing.
Parameters:
- source: Iceberg table path or catalog reference
- mode: Scanning mode ('convert' or 'arrow')
Returns:
- LazyFrame: Lazy representation of Iceberg table
"""
def scan_pyarrow_dataset(
source: str | Path,
*,
schema: dict[str, type] | None = None,
allow_pyarrow_filter: bool = True,
cache: bool = True
) -> LazyFrame:
"""
Scan PyArrow dataset for lazy processing.
Parameters:
- source: Dataset path
- schema: Expected schema
- allow_pyarrow_filter: Enable PyArrow predicate pushdown
Returns:
- LazyFrame: Lazy representation of PyArrow dataset
"""Advanced configuration options for scanning operations.
class ScanCastOptions:
def __init__(
self,
*,
cast_time_unit: str | None = None,
cast_string_strict: bool = True
):
"""
Options for type casting during scanning.
Parameters:
- cast_time_unit: Time unit for temporal casts
- cast_string_strict: Strict string casting
"""
# Partitioning classes
class PartitionByKey:
def __init__(self, by: str | list[str]):
"""Partition by column values."""
class PartitionMaxSize:
def __init__(self, max_size: int):
"""Partition by maximum size."""
class PartitionParted:
def __init__(self, n_partitions: int):
"""Partition into fixed number of parts."""
# Context classes for advanced partitioning
class BasePartitionContext:
"""Base partition context"""
class KeyedPartitionContext(BasePartitionContext):
def __init__(self, key: Any): ...
class KeyedPartition:
def __init__(self, key: Any, partition: DataFrame): ...Utilities for deferred I/O operations.
def defer() -> Expr:
"""
Create deferred I/O expression for use in scan operations.
Returns:
- Expr: Deferred expression
"""import polars as pl
# Basic CSV reading
df = pl.read_csv("data.csv")
# CSV with custom options
df = pl.read_csv(
"data.csv",
separator=";",
has_header=True,
dtypes={"id": pl.Int32, "date": pl.Date},
null_values=["", "NULL", "N/A"]
)
# Lazy CSV scanning for large files
lazy_df = pl.scan_csv("large_file.csv").filter(pl.col("date") >= "2023-01-01")
result = lazy_df.collect()
# Batched reading for memory efficiency
reader = pl.read_csv_batched("huge_file.csv", batch_size=10000)
for batch in reader:
process_batch(batch)# Read Parquet file
df = pl.read_parquet("data.parquet")
# Parquet with column selection
df = pl.read_parquet("data.parquet", columns=["id", "name", "value"])
# Lazy Parquet scanning with predicate pushdown
lazy_df = (
pl.scan_parquet("partitioned/*.parquet")
.filter(pl.col("year") == 2023)
.select(["id", "amount"])
)
result = lazy_df.collect()
# Read Parquet metadata
schema = pl.read_parquet_schema("data.parquet")
metadata = pl.read_parquet_metadata("data.parquet")# Read from database
df = pl.read_database(
"SELECT * FROM customers WHERE active = true",
"postgresql://user:pass@localhost/db"
)
# Partitioned database reading
df = pl.read_database(
"SELECT * FROM large_table",
connection,
partition_on="id",
partition_num=4
)
# Using different protocols
df = pl.read_database(
"SELECT * FROM table",
connection,
protocol="adbc" # or "connectorx"
)# AWS S3
aws_creds = pl.CredentialProviderAWS(
access_key_id="ACCESS_KEY",
secret_access_key="SECRET_KEY",
region="us-east-1"
)
df = pl.read_parquet(
"s3://bucket/data.parquet",
credential_provider=aws_creds
)
# Azure Blob Storage
azure_creds = pl.CredentialProviderAzure(
account_name="account",
account_key="key"
)
df = pl.read_csv(
"az://container/data.csv",
credential_provider=azure_creds
)
# Google Cloud Storage
gcp_creds = pl.CredentialProviderGCP(
service_account_path="service-account.json"
)
df = pl.scan_parquet(
"gs://bucket/partitioned/*.parquet",
credential_provider=gcp_creds
)# Read Excel file
df = pl.read_excel("data.xlsx", sheet_name="Sheet1")
# Excel with specific engine
df = pl.read_excel(
"data.xlsx",
engine="openpyxl",
schema_overrides={"date": pl.Date}
)
# OpenDocument Spreadsheet
df = pl.read_ods("data.ods", sheet_id=0)# Read JSON
df = pl.read_json("data.json")
# Read NDJSON (newline-delimited JSON)
df = pl.read_ndjson("logs.jsonl")
# Lazy NDJSON scanning
lazy_df = pl.scan_ndjson("large_logs.jsonl").filter(
pl.col("timestamp") >= "2023-01-01"
)# Read Delta table
df = pl.read_delta("path/to/delta/table")
# Read specific version
df = pl.read_delta("delta/table", version=5)
# Lazy scanning with time travel
lazy_df = pl.scan_delta("delta/table", version="2023-01-01T00:00:00Z")# Scan with custom options
cast_options = pl.ScanCastOptions(
cast_time_unit="us",
cast_string_strict=False
)
lazy_df = pl.scan_csv(
"data.csv",
cast_options=cast_options
)
# Iceberg table scanning
lazy_df = pl.scan_iceberg("catalog.database.table")
# PyArrow dataset scanning
lazy_df = pl.scan_pyarrow_dataset("partitioned/dataset/")Install with Tessl CLI
npx tessl i tessl/pypi-polars-lts-cpu