Blazingly fast DataFrame library with 64-bit index support for handling datasets with more than 4.2 billion rows
—
Comprehensive I/O capabilities supporting 10+ file formats with both eager reading and lazy scanning for performance optimization. Polars provides efficient data ingestion and export across various formats with advanced features like predicate pushdown and schema inference.
Reading and scanning CSV files with extensive configuration options.
def read_csv(
source: str | Path | IO[str] | IO[bytes] | bytes,
*,
has_header: bool = True,
columns: list[int] | list[str] | None = None,
new_columns: list[str] | None = None,
dtypes: dict[int | str, DataType] | Sequence[DataType] | None = None,
separator: str = ",",
comment_prefix: str | None = None,
quote_char: str | None = '"',
skip_rows: int = 0,
skip_rows_after_header: int = 0,
row_index_name: str | None = None,
row_index_offset: int = 0,
sample_size: int = 1024,
eol_char: str = "\n",
null_values: str | Sequence[str] | dict[str, str] | None = None,
missing_utf8_is_empty_string: bool = False,
ignore_errors: bool = False,
try_parse_dates: bool = False,
n_threads: int | None = None,
infer_schema_length: int | None = N_INFER_DEFAULT,
batch_size: int | None = None,
n_rows: int | None = None,
encoding: CsvEncoding = "utf8",
low_memory: bool = False,
rechunk: bool = False,
skip_blank_lines: bool = True,
raise_if_empty: bool = True,
truncate_ragged_lines: bool = False,
decimal_comma: bool = False,
glob: bool = True
) -> DataFrame:
"""
Read CSV file into DataFrame.
Parameters:
- source: File path, URL, or file-like object
- has_header: First row contains column names
- columns: Columns to select by index or name
- new_columns: Override column names
- dtypes: Column data types
- separator: Field delimiter
- comment_prefix: Comment line prefix to skip
- quote_char: Quote character for strings
- skip_rows: Number of rows to skip at start
- skip_rows_after_header: Rows to skip after header
- row_index_name: Add row index column with this name
- row_index_offset: Start value for row index
- sample_size: Rows to sample for type inference
- eol_char: End-of-line character
- null_values: Values to interpret as null
- missing_utf8_is_empty_string: Treat invalid UTF-8 as empty
- ignore_errors: Continue on parse errors
- try_parse_dates: Attempt date parsing
- n_threads: Number of threads for parsing
- infer_schema_length: Rows to scan for schema inference
- batch_size: Batch size for processing
- n_rows: Maximum rows to read
- encoding: Text encoding
- low_memory: Use less memory (slower)
- rechunk: Rechunk to single chunk
- skip_blank_lines: Skip empty lines
- raise_if_empty: Raise error if no data
- truncate_ragged_lines: Handle inconsistent columns
- decimal_comma: Use comma as decimal separator
- glob: Use glob patterns for multiple files
Returns:
DataFrame with CSV data
"""
def scan_csv(
source: str | Path | list[str] | list[Path],
**kwargs
) -> LazyFrame:
"""
Lazy scan CSV file(s) for optimized processing.
Parameters:
Similar to read_csv but returns LazyFrame for deferred execution
Returns:
LazyFrame for lazy evaluation
"""High-performance columnar format operations with advanced features.
def read_parquet(
source: str | Path | IO[bytes] | bytes,
*,
columns: list[int] | list[str] | None = None,
n_rows: int | None = None,
row_index_name: str | None = None,
row_index_offset: int = 0,
parallel: ParallelStrategy = "auto",
use_statistics: bool = True,
hive_partitioning: bool | None = None,
glob: bool = True,
rechunk: bool = False,
low_memory: bool = False,
storage_options: dict[str, Any] | None = None,
credential_provider: CredentialProvider | None = None,
retries: int = 2,
file_cache_ttl: int | None = None
) -> DataFrame:
"""
Read Parquet file into DataFrame.
Parameters:
- source: File path, URL, or bytes
- columns: Columns to select
- n_rows: Maximum rows to read
- row_index_name: Add row index column
- row_index_offset: Row index start value
- parallel: Parallelization strategy
- use_statistics: Use Parquet statistics for optimization
- hive_partitioning: Enable Hive-style partitioning
- glob: Use glob patterns
- rechunk: Rechunk to single chunk
- low_memory: Use less memory
- storage_options: Cloud storage options
- credential_provider: Cloud credentials
- retries: Number of retry attempts
- file_cache_ttl: File cache time-to-live
Returns:
DataFrame with Parquet data
"""
def scan_parquet(
source: str | Path | list[str] | list[Path],
**kwargs
) -> LazyFrame:
"""Lazy scan Parquet file(s)."""
def read_parquet_metadata(source: str | Path | IO[bytes] | bytes) -> dict[str, Any]:
"""Read Parquet file metadata."""
def read_parquet_schema(source: str | Path | IO[bytes] | bytes) -> Schema:
"""Read Parquet file schema."""JSON and newline-delimited JSON file operations.
def read_json(
source: str | Path | IO[str] | IO[bytes] | bytes,
*,
schema: dict[str, DataType] | None = None,
schema_overrides: dict[str, DataType] | None = None,
infer_schema_length: int | None = N_INFER_DEFAULT
) -> DataFrame:
"""
Read JSON file into DataFrame.
Parameters:
- source: JSON file path or data
- schema: Expected schema
- schema_overrides: Override inferred types
- infer_schema_length: Rows for schema inference
Returns:
DataFrame with JSON data
"""
def read_ndjson(
source: str | Path | IO[str] | IO[bytes] | bytes,
*,
schema: dict[str, DataType] | None = None,
schema_overrides: dict[str, DataType] | None = None,
batch_size: int | None = None,
n_rows: int | None = None,
low_memory: bool = False,
rechunk: bool = False,
row_index_name: str | None = None,
row_index_offset: int = 0,
ignore_errors: bool = False
) -> DataFrame:
"""
Read newline-delimited JSON file.
Parameters:
- source: NDJSON file path or data
- schema: Expected schema
- schema_overrides: Override inferred types
- batch_size: Processing batch size
- n_rows: Maximum rows to read
- low_memory: Use less memory
- rechunk: Rechunk to single chunk
- row_index_name: Add row index column
- row_index_offset: Row index start value
- ignore_errors: Continue on parse errors
Returns:
DataFrame with NDJSON data
"""
def scan_ndjson(
source: str | Path | list[str] | list[Path],
**kwargs
) -> LazyFrame:
"""Lazy scan NDJSON file(s)."""Reading data from various databases using connection strings or objects.
def read_database(
query: str,
connection: str | ConnectionOrCursor,
*,
partition_on: str | None = None,
partition_range: tuple[int, int] | None = None,
partition_num: int | None = None,
protocol: str | None = None,
engine: DbReadEngine | None = None,
schema_overrides: dict[str, DataType] | None = None,
execute_options: dict[str, Any] | None = None
) -> DataFrame:
"""
Read database query results into DataFrame.
Parameters:
- query: SQL query string
- connection: Database connection string or object
- partition_on: Column for partitioned reading
- partition_range: Range for partitioned reading
- partition_num: Number of partitions
- protocol: Database protocol
- engine: Database engine to use
- schema_overrides: Override inferred types
- execute_options: Additional execution options
Returns:
DataFrame with query results
"""
def read_database_uri(
query: str,
uri: str,
*,
partition_on: str | None = None,
partition_range: tuple[int, int] | None = None,
partition_num: int | None = None,
protocol: str | None = None,
engine: DbReadEngine | None = None,
schema_overrides: dict[str, DataType] | None = None
) -> DataFrame:
"""
Read from database using URI connection string.
Parameters:
- query: SQL query string
- uri: Database URI
- Other parameters: Same as read_database
Returns:
DataFrame with query results
"""Apache Arrow IPC format operations for efficient cross-language data exchange.
def read_ipc(
source: str | Path | IO[bytes] | bytes,
*,
columns: list[int] | list[str] | None = None,
n_rows: int | None = None,
row_index_name: str | None = None,
row_index_offset: int = 0,
rechunk: bool = False,
memory_map: bool = True,
storage_options: dict[str, Any] | None = None,
credential_provider: CredentialProvider | None = None
) -> DataFrame:
"""
Read IPC/Arrow file into DataFrame.
Parameters:
- source: IPC file path or bytes
- columns: Columns to select
- n_rows: Maximum rows to read
- row_index_name: Add row index column
- row_index_offset: Row index start value
- rechunk: Rechunk to single chunk
- memory_map: Use memory mapping
- storage_options: Cloud storage options
- credential_provider: Cloud credentials
Returns:
DataFrame with IPC data
"""
def read_ipc_stream(
source: str | Path | IO[bytes] | bytes,
**kwargs
) -> DataFrame:
"""Read IPC stream format."""
def scan_ipc(
source: str | Path | list[str] | list[Path],
**kwargs
) -> LazyFrame:
"""Lazy scan IPC file(s)."""
def read_ipc_schema(source: str | Path | IO[bytes] | bytes) -> Schema:
"""Read IPC file schema."""Reading Excel and OpenDocument spreadsheet files.
def read_excel(
source: str | Path | IO[bytes] | bytes,
*,
sheet_id: int | Sequence[int] | None = None,
sheet_name: str | list[str] | None = None,
engine: ExcelSpreadsheetEngine | None = None,
engine_options: dict[str, Any] | None = None,
read_options: dict[str, Any] | None = None,
schema_overrides: dict[str, DataType] | None = None,
infer_schema_length: int | None = N_INFER_DEFAULT,
raise_if_empty: bool = True
) -> DataFrame | dict[str, DataFrame]:
"""
Read Excel file into DataFrame.
Parameters:
- source: Excel file path or bytes
- sheet_id: Sheet index(es) to read
- sheet_name: Sheet name(s) to read
- engine: Excel engine to use
- engine_options: Engine-specific options
- read_options: Reading options
- schema_overrides: Override inferred types
- infer_schema_length: Rows for schema inference
- raise_if_empty: Raise error if no data
Returns:
DataFrame or dict of DataFrames (if multiple sheets)
"""
def read_ods(
source: str | Path | IO[bytes] | bytes,
**kwargs
) -> DataFrame | dict[str, DataFrame]:
"""Read OpenDocument Spreadsheet file."""Cloud storage integration and advanced I/O features.
def read_avro(
source: str | Path | IO[bytes] | bytes,
*,
columns: list[int] | list[str] | None = None,
n_rows: int | None = None
) -> DataFrame:
"""Read Apache Avro file."""
def read_clipboard(**kwargs) -> DataFrame:
"""Read data from system clipboard."""
def scan_iceberg(
source: str,
**kwargs
) -> LazyFrame:
"""Lazy scan Apache Iceberg table."""
def scan_delta(
source: str,
*,
version: int | str | None = None,
storage_options: dict[str, str] | None = None,
delta_table_options: dict[str, Any] | None = None
) -> LazyFrame:
"""
Lazy scan Delta Lake table.
Parameters:
- source: Delta table path
- version: Table version to read
- storage_options: Cloud storage options
- delta_table_options: Delta table options
Returns:
LazyFrame for Delta table
"""
def read_delta(
source: str,
**kwargs
) -> DataFrame:
"""Read Delta Lake table."""
def scan_pyarrow_dataset(
source: str | Path,
**kwargs
) -> LazyFrame:
"""Lazy scan PyArrow dataset."""Advanced partitioning strategies and scan configuration.
class ScanCastOptions:
"""Options for casting during scan operations."""
def __init__(
self,
*,
enabled: bool = True,
dtypes: dict[str, DataType] | None = None,
strict: bool = True
):
"""
Configure scan casting.
Parameters:
- enabled: Enable automatic casting
- dtypes: Target data types
- strict: Strict casting mode
"""
class BasePartitionContext:
"""Base class for partition contexts."""
class KeyedPartitionContext(BasePartitionContext):
"""Partition context with key-based partitioning."""
class KeyedPartition:
"""Partition information for keyed partitioning."""
def __init__(self, key: Any, df: DataFrame):
"""
Create keyed partition.
Parameters:
- key: Partition key
- df: Partition DataFrame
"""
class PartitionByKey:
"""Partition strategy based on column values."""
def __init__(self, by: str | list[str]):
"""
Partition by column key(s).
Parameters:
- by: Column name(s) for partitioning
"""
class PartitionMaxSize:
"""Partition strategy based on maximum size."""
def __init__(self, max_size: int):
"""
Partition by maximum size.
Parameters:
- max_size: Maximum partition size
"""
class PartitionParted:
"""Information about partitioned data."""Authentication for cloud storage access.
class CredentialProvider:
"""Base credential provider."""
class CredentialProviderAWS(CredentialProvider):
"""AWS credential provider."""
def __init__(
self,
*,
access_key_id: str | None = None,
secret_access_key: str | None = None,
session_token: str | None = None,
region: str | None = None,
profile: str | None = None
):
"""
AWS credentials.
Parameters:
- access_key_id: AWS access key
- secret_access_key: AWS secret key
- session_token: AWS session token
- region: AWS region
- profile: AWS profile name
"""
class CredentialProviderAzure(CredentialProvider):
"""Azure credential provider."""
class CredentialProviderGCP(CredentialProvider):
"""Google Cloud credential provider."""
class CredentialProviderFunction(CredentialProvider):
"""Function-based credential provider."""
def __init__(self, func: Callable[[], CredentialProviderFunctionReturn]):
"""
Function-based credentials.
Parameters:
- func: Function returning credentials
"""
class CredentialProviderFunctionReturn:
"""Return type for credential function."""import polars as pl
# Read CSV with automatic type inference
df = pl.read_csv("data.csv")
# Read with specific options
df = pl.read_csv(
"data.csv",
separator=";",
null_values=["", "NULL", "N/A"],
try_parse_dates=True,
infer_schema_length=1000
)
# Read specific columns
df = pl.read_csv("data.csv", columns=["name", "age", "salary"])# Lazy scan for memory efficiency
lazy_df = (pl
.scan_csv("large_file.csv")
.filter(pl.col("amount") > 1000)
.select(["customer_id", "amount", "date"])
.group_by("customer_id")
.agg([
pl.col("amount").sum(),
pl.col("date").max()
])
)
# Execute when ready
result = lazy_df.collect()# Read multiple CSV files at once
df = pl.read_csv("data_*.csv", glob=True)
# Scan multiple Parquet files
lazy_df = pl.scan_parquet(["file1.parquet", "file2.parquet", "file3.parquet"])# Read from database
df = pl.read_database(
"SELECT * FROM customers WHERE age > 25",
"postgresql://user:pass@localhost:5432/db"
)
# Partitioned database reading for large tables
df = pl.read_database(
"SELECT * FROM large_table",
"postgresql://user:pass@localhost:5432/db",
partition_on="id",
partition_num=4
)# Read from S3 with credentials
df = pl.read_parquet(
"s3://bucket/data.parquet",
credential_provider=pl.CredentialProviderAWS(
access_key_id="key",
secret_access_key="secret",
region="us-east-1"
)
)
# Read from Azure Blob Storage
df = pl.read_csv(
"az://container/data.csv",
credential_provider=pl.CredentialProviderAzure()
)# Read specific Excel sheet
df = pl.read_excel("report.xlsx", sheet_name="Summary")
# Read multiple sheets
sheets = pl.read_excel("report.xlsx", sheet_id=[0, 1, 2])
summary_df = sheets["Summary"]
details_df = sheets["Details"]
# Excel with custom options
df = pl.read_excel(
"data.xlsx",
engine="openpyxl",
read_options={
"has_header": True,
"skip_rows": 2
},
schema_overrides={
"date": pl.Date,
"amount": pl.Decimal(10, 2)
}
)# DataFrame write methods
df.write_csv("output.csv")
df.write_parquet("output.parquet")
df.write_json("output.json")
df.write_ipc("output.arrow")
# LazyFrame collect and write
lazy_df.collect().write_parquet("result.parquet")
# Write with options
df.write_csv(
"output.csv",
separator="|",
quote_char="'",
null_value="NULL"
)# Define schema for consistent reading
schema = pl.Schema({
"id": pl.Int32,
"name": pl.String,
"amount": pl.Decimal(10, 2),
"timestamp": pl.Datetime("us", "UTC")
})
df = pl.read_csv("data.csv", schema=schema)
# Override specific column types
df = pl.read_csv(
"data.csv",
schema_overrides={
"customer_id": pl.String, # Keep as string
"amount": pl.Decimal(12, 4) # Higher precision
}
)Install with Tessl CLI
npx tessl i tessl/pypi-polars-u64-idx@1.33.1