Load data from databases to dataframes, the fastest way.
npx @tessl/cli install tessl/pypi-connectorx@0.4.0ConnectorX is a high-performance data loading library that enables users to efficiently transfer data from various databases directly into Python dataframes. Built in Rust with Python bindings, it follows a zero-copy principle to achieve significant performance improvements - up to 21x faster than traditional solutions while using 3x less memory. The library supports multiple database sources and dataframe formats with features like automatic parallelization through partition-based loading and federated queries across multiple databases.
pip install connectorximport connectorx as cxCommon individual imports:
from connectorx import read_sql, get_meta, partition_sql, ConnectionUrlAll available imports:
from connectorx import (
read_sql,
read_sql_pandas,
get_meta,
partition_sql,
ConnectionUrl,
remove_ending_semicolon,
try_import_module,
Protocol,
__version__
)import connectorx as cx
# Basic data loading from PostgreSQL
postgres_url = "postgresql://username:password@server:port/database"
query = "SELECT * FROM lineitem"
df = cx.read_sql(postgres_url, query)
# Parallel loading with partitioning
df_parallel = cx.read_sql(
postgres_url,
query,
partition_on="l_orderkey",
partition_num=10
)
# Load to different dataframe formats
arrow_table = cx.read_sql(postgres_url, query, return_type="arrow")
polars_df = cx.read_sql(postgres_url, query, return_type="polars")ConnectorX follows a zero-copy architecture built on Rust's performance characteristics:
Primary functionality for executing SQL queries and loading data into various dataframe formats. Supports single-threaded and parallel execution with automatic partitioning.
def read_sql(
conn: str | ConnectionUrl | dict[str, str] | dict[str, ConnectionUrl],
query: list[str] | str,
*,
return_type: Literal["pandas", "polars", "arrow", "modin", "dask", "arrow_stream"] = "pandas",
protocol: Protocol | None = None,
partition_on: str | None = None,
partition_range: tuple[int, int] | None = None,
partition_num: int | None = None,
index_col: str | None = None,
strategy: str | None = None,
pre_execution_query: list[str] | str | None = None,
batch_size: int = 10000,
**kwargs
) -> pd.DataFrame | mpd.DataFrame | dd.DataFrame | pl.DataFrame | pa.Table | pa.RecordBatchReader
def read_sql_pandas(
sql: list[str] | str,
con: str | ConnectionUrl | dict[str, str] | dict[str, ConnectionUrl],
index_col: str | None = None,
protocol: Protocol | None = None,
partition_on: str | None = None,
partition_range: tuple[int, int] | None = None,
partition_num: int | None = None,
pre_execution_queries: list[str] | str | None = None,
) -> pd.DataFrameFunctionality for partitioning SQL queries to enable parallel data loading across multiple threads.
def partition_sql(
conn: str | ConnectionUrl,
query: str,
partition_on: str,
partition_num: int,
partition_range: tuple[int, int] | None = None,
) -> list[str]Retrieve schema information and metadata from SQL queries without loading the full dataset.
def get_meta(
conn: str | ConnectionUrl,
query: str,
protocol: Protocol | None = None,
) -> pd.DataFrameHelper utilities for building and managing database connection strings across different database backends.
class ConnectionUrl(Generic[_BackendT], str):
# For SQLite databases
def __new__(
cls,
*,
backend: Literal["sqlite"],
db_path: str | Path,
) -> ConnectionUrl[Literal["sqlite"]]: ...
# For BigQuery databases
def __new__(
cls,
*,
backend: Literal["bigquery"],
db_path: str | Path,
) -> ConnectionUrl[Literal["bigquery"]]: ...
# For server-based databases
def __new__(
cls,
*,
backend: _ServerBackendT,
username: str,
password: str = "",
server: str,
port: int,
database: str = "",
database_options: dict[str, str] | None = None,
) -> ConnectionUrl[_ServerBackendT]: ...
# For raw connection strings
def __new__(
cls,
raw_connection: str,
) -> ConnectionUrl: ...Execute queries across multiple databases in a single statement, with automatic join optimization and query rewriting.
def read_sql(
conn: dict[str, str] | dict[str, ConnectionUrl],
query: str,
*,
strategy: str | None = None,
**kwargs
) -> pd.DataFrame | pl.DataFrame | pa.TableHelper functions for SQL query processing and module management.
def remove_ending_semicolon(query: str) -> str:
"""Remove trailing semicolon from SQL query if present."""
def try_import_module(name: str) -> Any:
"""Import a module with helpful error message if not found."""
def rewrite_conn(
conn: str | ConnectionUrl,
protocol: Protocol | None = None
) -> tuple[str, Protocol]:
"""Rewrite connection string for backend compatibility."""
__version__: str # Package version stringProtocol = Literal["csv", "binary", "cursor", "simple", "text"]
# Type variables for connection URL backends
_BackendT = TypeVar("_BackendT")
_ServerBackendT = TypeVar(
"_ServerBackendT",
bound=Literal[
"redshift",
"clickhouse",
"postgres",
"postgresql",
"mysql",
"mssql",
"oracle",
"duckdb",
],
)
# Internal types from Rust bindings
_DataframeInfos = dict[str, Any] # Pandas DataFrame reconstruction info
_ArrowInfos = tuple[list[str], list[Any]] # Arrow table reconstruction info
# Type checking imports (only available when TYPE_CHECKING is True)
if TYPE_CHECKING:
import pandas as pd
import polars as pl
import modin.pandas as mpd
import dask.dataframe as dd
import pyarrow as papostgresql://)mysql://)sqlite://)mssql://)oracle://)bigquery://)redshift://)clickhouse://)