CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-connectorx

Load data from databases to dataframes, the fastest way.

86

1.04x

Quality

Pending

Does it follow best practices?

Impact

86%

1.04x

Average score across 10 eval scenarios

Overview
Eval results
Files

query-partitioning.mddocs/

Query Partitioning

Functionality for partitioning SQL queries to enable parallel data loading across multiple threads. Query partitioning allows ConnectorX to split large queries into smaller, parallelizable chunks based on a specified column's value range.

Capabilities

SQL Query Partitioning

Generate multiple SQL queries by partitioning on a specified column, enabling parallel execution.

def partition_sql(
    conn: str | ConnectionUrl,
    query: str,
    partition_on: str,
    partition_num: int,
    partition_range: tuple[int, int] | None = None,
) -> list[str]:
    """
    Partition a SQL query based on a column for parallel execution.

    Parameters:
    - conn: Database connection string or ConnectionUrl
    - query: Original SQL query to partition
    - partition_on: Column name to use for partitioning (must be numerical)
    - partition_num: Number of partitions to generate
    - partition_range: Optional explicit min/max values for partitioning

    Returns:
    List of partitioned SQL query strings
    """

Usage Examples:

import connectorx as cx

postgres_url = "postgresql://username:password@server:port/database"
base_query = "SELECT * FROM lineitem"

# Automatic range detection
partitioned_queries = cx.partition_sql(
    postgres_url,
    base_query,
    partition_on="l_orderkey",
    partition_num=4
)

# Result: List of 4 queries with WHERE clauses for different ranges
# ["SELECT * FROM (SELECT * FROM lineitem) WHERE l_orderkey >= 1 AND l_orderkey < 250",
#  "SELECT * FROM (SELECT * FROM lineitem) WHERE l_orderkey >= 250 AND l_orderkey < 500",
#  ...]

# Explicit range specification
partitioned_queries = cx.partition_sql(
    postgres_url,
    base_query,
    partition_on="l_orderkey",
    partition_num=3,
    partition_range=(100, 1000)
)

Partitioning Strategy

Automatic Range Detection

When partition_range is not specified, ConnectorX:

  1. Executes: SELECT MIN(partition_column), MAX(partition_column) FROM (original_query)
  2. Calculates equal-sized ranges based on min/max values
  3. Generates queries with appropriate WHERE clauses

Manual Range Specification

When partition_range is provided:

  1. Uses the specified min/max values directly
  2. Divides the range into partition_num equal parts
  3. Generates partitioned queries within the specified bounds

Query Transformation

Original query is wrapped and filtered:

-- Original
SELECT * FROM lineitem WHERE l_shipdate > '1995-01-01'

-- Becomes (for partition 1 of 4)  
SELECT * FROM (SELECT * FROM lineitem WHERE l_shipdate > '1995-01-01') 
WHERE l_orderkey >= 1 AND l_orderkey < 1500

Partitioning Requirements

Column Requirements

The partition column must:

  • Be numerical (integer, float, decimal)
  • Not contain NULL values
  • Have reasonable distribution for effective partitioning
  • Be included in the original query's accessible columns

Query Requirements

  • Must be SPJA queries (Select, Project, Join, Aggregate)
  • Complex subqueries and CTEs may not partition correctly
  • Window functions and advanced features may be incompatible

Supported Query Types:

# ✓ Simple SELECT
"SELECT * FROM table"

# ✓ Filtered SELECT  
"SELECT col1, col2 FROM table WHERE condition"

# ✓ Joins
"SELECT t1.*, t2.name FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id"

# ✓ Aggregation (with proper grouping)
"SELECT category, COUNT(*) FROM table GROUP BY category"

# ✗ Complex window functions
"SELECT *, ROW_NUMBER() OVER (PARTITION BY cat ORDER BY date) FROM table"

Integration with read_sql

Partitioned queries can be used directly with read_sql:

# Manual partitioning workflow
partitioned_queries = cx.partition_sql(
    postgres_url,
    "SELECT * FROM lineitem", 
    partition_on="l_orderkey",
    partition_num=4
)

# Execute partitioned queries in parallel
df = cx.read_sql(postgres_url, partitioned_queries)

# Or use automatic partitioning in read_sql
df = cx.read_sql(
    postgres_url,
    "SELECT * FROM lineitem",
    partition_on="l_orderkey", 
    partition_num=4
)

Performance Considerations

Optimal Partition Numbers

  • CPU cores: Generally 1-2x the number of available CPU cores
  • Database connections: Consider database connection limits
  • Data distribution: Ensure partitions contain similar amounts of data

Partition Column Selection

Choose columns that:

  • Have good distribution across the range
  • Are indexed for efficient filtering
  • Result in roughly equal partition sizes
  • Are not heavily skewed

Good partition columns:

  • Primary keys (auto-incrementing IDs)
  • Timestamp/date columns with uniform distribution
  • Well-distributed numerical columns

Poor partition columns:

  • Columns with many duplicate values
  • Heavily skewed distributions
  • Columns with large gaps in values

Install with Tessl CLI

npx tessl i tessl/pypi-connectorx

docs

connection-management.md

data-loading.md

federated-queries.md

index.md

metadata-retrieval.md

query-partitioning.md

tile.json