CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-connectorx

Load data from databases to dataframes, the fastest way.

86

1.04x

Quality

Pending

Does it follow best practices?

Impact

86%

1.04x

Average score across 10 eval scenarios

Overview
Eval results
Files

data-loading.mddocs/

Data Loading

Primary functionality for executing SQL queries and loading data into various dataframe formats. ConnectorX's data loading provides high-performance, zero-copy data transfer from databases to Python dataframes with support for multiple output formats and parallel processing.

Capabilities

Primary Data Loading Function

The main function for loading data from databases into dataframes with extensive customization options.

def read_sql(
    conn: str | ConnectionUrl | dict[str, str] | dict[str, ConnectionUrl],
    query: list[str] | str,
    *,
    return_type: Literal["pandas", "polars", "arrow", "modin", "dask", "arrow_stream"] = "pandas",
    protocol: Protocol | None = None,
    partition_on: str | None = None,
    partition_range: tuple[int, int] | None = None,
    partition_num: int | None = None,
    index_col: str | None = None,
    strategy: str | None = None,
    pre_execution_query: list[str] | str | None = None,
    batch_size: int = 10000,
    **kwargs
) -> pd.DataFrame | mpd.DataFrame | dd.DataFrame | pl.DataFrame | pa.Table | pa.RecordBatchReader:
    """
    Run SQL query and download data from database into a dataframe.

    Parameters:
    - conn: Connection string, ConnectionUrl, or dict for federated queries
    - query: SQL query string or list of SQL queries
    - return_type: Output format ("pandas", "polars", "arrow", "modin", "dask", "arrow_stream")
    - protocol: Backend-specific transfer protocol ("csv", "binary", "cursor", "simple", "text")
    - partition_on: Column name for partitioning the result for parallel processing
    - partition_range: Value range tuple for the partition column
    - partition_num: Number of partitions to generate for parallel processing
    - index_col: Index column to set (pandas/modin/dask only)
    - strategy: Strategy for federated query rewriting
    - pre_execution_query: SQL queries to execute before main query
    - batch_size: Maximum batch size for arrow_stream return type

    Returns:
    DataFrame in the specified format
    """

Usage Examples:

import connectorx as cx

# Basic usage - single threaded
postgres_url = "postgresql://username:password@server:port/database"
query = "SELECT * FROM lineitem"
df = cx.read_sql(postgres_url, query)

# Parallel loading with automatic partitioning
df = cx.read_sql(
    postgres_url, 
    query,
    partition_on="l_orderkey",
    partition_num=10
)

# Explicit partition range
df = cx.read_sql(
    postgres_url,
    query,
    partition_on="l_orderkey", 
    partition_range=(1, 1000),
    partition_num=4
)

# Multiple return types
arrow_table = cx.read_sql(postgres_url, query, return_type="arrow")
polars_df = cx.read_sql(postgres_url, query, return_type="polars") 
modin_df = cx.read_sql(postgres_url, query, return_type="modin")

# Pre-execution queries for configuration
df = cx.read_sql(
    postgres_url,
    query,
    pre_execution_query="SET work_mem = '1GB'"
)

# Arrow streaming for large datasets
record_reader = cx.read_sql(
    postgres_url,
    query,
    return_type="arrow_stream",
    batch_size=50000
)

Pandas-Compatible Function

Convenience function with pandas.read_sql compatible interface.

def read_sql_pandas(
    sql: list[str] | str,
    con: str | ConnectionUrl | dict[str, str] | dict[str, ConnectionUrl],
    index_col: str | None = None,
    protocol: Protocol | None = None,
    partition_on: str | None = None,
    partition_range: tuple[int, int] | None = None,
    partition_num: int | None = None,
    pre_execution_queries: list[str] | str | None = None,
) -> pd.DataFrame:
    """
    Run SQL query with pandas.read_sql compatible interface.
    
    Parameters are in the same order as pandas.read_sql for easy migration.
    
    Returns:
    pandas DataFrame
    """

Usage Example:

# Drop-in replacement for pandas.read_sql
# from pandas import read_sql
from connectorx import read_sql_pandas as read_sql

postgres_url = "postgresql://username:password@server:port/database"
query = "SELECT * FROM lineitem"
df = read_sql(query, postgres_url)

Output Format Support

DataFrame Libraries

  • pandas: Default format, full compatibility with pandas ecosystem
  • polars: High-performance DataFrame library with lazy evaluation
  • arrow: PyArrow tables for interoperability and columnar processing
  • modin: Distributed pandas for large datasets
  • dask: Distributed computing with familiar pandas interface
  • arrow_stream: PyArrow RecordBatchReader for streaming large datasets

Protocol Options

Backend-specific protocols for optimal performance:

  • binary: Default high-performance protocol (most databases)
  • cursor: Server-side cursors (Redshift default)
  • text: Text-based transfer (ClickHouse default)
  • csv: CSV format transfer
  • simple: Simple query protocol

Parallel Processing

Automatic Partitioning

When partition_on is specified, ConnectorX automatically:

  1. Determines the min/max values of the partition column
  2. Splits the range into partition_num equal partitions
  3. Executes queries in parallel across multiple threads
  4. Combines results into a single dataframe

Manual Query Lists

For custom partitioning logic, provide a list of queries:

queries = [
    "SELECT * FROM lineitem WHERE l_orderkey <= 1000",
    "SELECT * FROM lineitem WHERE l_orderkey > 1000 AND l_orderkey <= 2000",
    "SELECT * FROM lineitem WHERE l_orderkey > 2000"
]
df = cx.read_sql(postgres_url, queries)

Error Handling

ConnectorX handles common database connection and query errors:

  • Connection errors: Database unreachable, authentication failures
  • Query errors: Invalid SQL syntax, missing tables/columns
  • Partition errors: Invalid partition column, insufficient partitions
  • Memory errors: Automatic optimization for large result sets

For federated queries, additional validation ensures query compatibility across databases.

Install with Tessl CLI

npx tessl i tessl/pypi-connectorx

docs

connection-management.md

data-loading.md

federated-queries.md

index.md

metadata-retrieval.md

query-partitioning.md

tile.json