Load data from databases to dataframes, the fastest way.
86
Quality
Pending
Does it follow best practices?
Impact
86%
1.04xAverage score across 10 eval scenarios
Primary functionality for executing SQL queries and loading data into various dataframe formats. ConnectorX's data loading provides high-performance, zero-copy data transfer from databases to Python dataframes with support for multiple output formats and parallel processing.
The main function for loading data from databases into dataframes with extensive customization options.
def read_sql(
conn: str | ConnectionUrl | dict[str, str] | dict[str, ConnectionUrl],
query: list[str] | str,
*,
return_type: Literal["pandas", "polars", "arrow", "modin", "dask", "arrow_stream"] = "pandas",
protocol: Protocol | None = None,
partition_on: str | None = None,
partition_range: tuple[int, int] | None = None,
partition_num: int | None = None,
index_col: str | None = None,
strategy: str | None = None,
pre_execution_query: list[str] | str | None = None,
batch_size: int = 10000,
**kwargs
) -> pd.DataFrame | mpd.DataFrame | dd.DataFrame | pl.DataFrame | pa.Table | pa.RecordBatchReader:
"""
Run SQL query and download data from database into a dataframe.
Parameters:
- conn: Connection string, ConnectionUrl, or dict for federated queries
- query: SQL query string or list of SQL queries
- return_type: Output format ("pandas", "polars", "arrow", "modin", "dask", "arrow_stream")
- protocol: Backend-specific transfer protocol ("csv", "binary", "cursor", "simple", "text")
- partition_on: Column name for partitioning the result for parallel processing
- partition_range: Value range tuple for the partition column
- partition_num: Number of partitions to generate for parallel processing
- index_col: Index column to set (pandas/modin/dask only)
- strategy: Strategy for federated query rewriting
- pre_execution_query: SQL queries to execute before main query
- batch_size: Maximum batch size for arrow_stream return type
Returns:
DataFrame in the specified format
"""Usage Examples:
import connectorx as cx
# Basic usage - single threaded
postgres_url = "postgresql://username:password@server:port/database"
query = "SELECT * FROM lineitem"
df = cx.read_sql(postgres_url, query)
# Parallel loading with automatic partitioning
df = cx.read_sql(
postgres_url,
query,
partition_on="l_orderkey",
partition_num=10
)
# Explicit partition range
df = cx.read_sql(
postgres_url,
query,
partition_on="l_orderkey",
partition_range=(1, 1000),
partition_num=4
)
# Multiple return types
arrow_table = cx.read_sql(postgres_url, query, return_type="arrow")
polars_df = cx.read_sql(postgres_url, query, return_type="polars")
modin_df = cx.read_sql(postgres_url, query, return_type="modin")
# Pre-execution queries for configuration
df = cx.read_sql(
postgres_url,
query,
pre_execution_query="SET work_mem = '1GB'"
)
# Arrow streaming for large datasets
record_reader = cx.read_sql(
postgres_url,
query,
return_type="arrow_stream",
batch_size=50000
)Convenience function with pandas.read_sql compatible interface.
def read_sql_pandas(
sql: list[str] | str,
con: str | ConnectionUrl | dict[str, str] | dict[str, ConnectionUrl],
index_col: str | None = None,
protocol: Protocol | None = None,
partition_on: str | None = None,
partition_range: tuple[int, int] | None = None,
partition_num: int | None = None,
pre_execution_queries: list[str] | str | None = None,
) -> pd.DataFrame:
"""
Run SQL query with pandas.read_sql compatible interface.
Parameters are in the same order as pandas.read_sql for easy migration.
Returns:
pandas DataFrame
"""Usage Example:
# Drop-in replacement for pandas.read_sql
# from pandas import read_sql
from connectorx import read_sql_pandas as read_sql
postgres_url = "postgresql://username:password@server:port/database"
query = "SELECT * FROM lineitem"
df = read_sql(query, postgres_url)Backend-specific protocols for optimal performance:
When partition_on is specified, ConnectorX automatically:
partition_num equal partitionsFor custom partitioning logic, provide a list of queries:
queries = [
"SELECT * FROM lineitem WHERE l_orderkey <= 1000",
"SELECT * FROM lineitem WHERE l_orderkey > 1000 AND l_orderkey <= 2000",
"SELECT * FROM lineitem WHERE l_orderkey > 2000"
]
df = cx.read_sql(postgres_url, queries)ConnectorX handles common database connection and query errors:
For federated queries, additional validation ensures query compatibility across databases.
Install with Tessl CLI
npx tessl i tessl/pypi-connectorxdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10