CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-chdb

chDB is an in-process SQL OLAP Engine powered by ClickHouse

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

udf.mddocs/

User-Defined Functions

Python UDF (User Defined Function) support enabling custom Python functions to be used within SQL queries. UDFs are registered using decorators and executed as external processes with automatic type handling and configuration management.

Capabilities

UDF Decorator

Register Python functions as SQL UDFs with automatic configuration generation.

def chdb_udf(return_type: str = "String"):
    """
    Decorator for registering Python functions as chDB UDFs.
    
    Parameters:
    - return_type: SQL return type ("String", "Int32", "Float64", etc.)
                   Must be valid ClickHouse data type
    
    Returns:
    Function decorator that registers the function as UDF
    
    Notes:
    - Functions must be stateless (no UDAFs supported)
    - All input parameters treated as strings (TabSeparated format)
    - Function called once per input row
    - Must import all required modules within function
    - Uses same Python interpreter as calling script
    """

UDF Generation Function

Programmatically generate UDF configurations and scripts.

def generate_udf(func_name: str, args: list, return_type: str, udf_body: str):
    """
    Generate UDF configuration and Python script files.
    
    Parameters:
    - func_name: Name of the UDF function
    - args: List of argument names
    - return_type: SQL return type string
    - udf_body: Python function implementation code
    
    Side Effects:
    - Creates {func_name}.py executable script in UDF path
    - Updates/creates udf_config.xml with function registration
    - Sets up automatic cleanup on process exit
    """

Usage Examples

Basic UDF Definition

from chdb.udf import chdb_udf
from chdb import query

# Simple arithmetic UDF
@chdb_udf()
def add_numbers(a, b):
    return str(int(a) + int(b))

# Use in SQL query
result = query("SELECT add_numbers('10', '20') as sum_result")
print(result)  # Returns: 30

UDF with Different Return Types

from chdb.udf import chdb_udf
from chdb import query

# UDF returning integer
@chdb_udf(return_type="Int32")
def multiply_int(x, y):
    return int(x) * int(y)

# UDF returning float
@chdb_udf(return_type="Float64")
def calculate_average(a, b, c):
    values = [float(a), float(b), float(c)]
    return sum(values) / len(values)

# UDF returning string (default)
@chdb_udf()
def format_name(first, last):
    return f"{first.title()} {last.upper()}"

# Use all UDFs in queries
result1 = query("SELECT multiply_int('5', '7') as product")
result2 = query("SELECT calculate_average('10.5', '20.3', '15.7') as avg")
result3 = query("SELECT format_name('john', 'doe') as formatted_name")

print("Product:", result1)
print("Average:", result2)
print("Formatted:", result3)

UDF with Complex Logic

from chdb.udf import chdb_udf
from chdb import query

@chdb_udf()
def validate_email(email):
    import re
    
    # Email validation pattern
    pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    
    if re.match(pattern, email):
        return "valid"
    else:
        return "invalid"

@chdb_udf()
def extract_domain(email):
    import re
    
    match = re.search(r'@([a-zA-Z0-9.-]+)', email)
    if match:
        return match.group(1)
    else:
        return "unknown"

# Use UDFs with data processing
result = query("""
    SELECT 
        email,
        validate_email(email) as is_valid,
        extract_domain(email) as domain
    FROM (
        SELECT 'user@example.com' as email
        UNION ALL
        SELECT 'invalid.email'
        UNION ALL
        SELECT 'admin@company.org'
    )
""", "Pretty")

print(result)

UDF with JSON Processing

from chdb.udf import chdb_udf
from chdb import query

@chdb_udf()
def parse_json_field(json_str, field_name):
    import json
    
    try:
        data = json.loads(json_str)
        return str(data.get(field_name, ""))
    except:
        return "error"

@chdb_udf(return_type="Int32")
def count_json_keys(json_str):
    import json
    
    try:
        data = json.loads(json_str)
        if isinstance(data, dict):
            return len(data)
        else:
            return 0
    except:
        return -1

# Query with JSON UDFs
result = query("""
    SELECT 
        json_data,
        parse_json_field(json_data, 'name') as name,
        parse_json_field(json_data, 'age') as age,
        count_json_keys(json_data) as key_count
    FROM (
        SELECT '{"name": "Alice", "age": 30, "city": "NYC"}' as json_data
        UNION ALL
        SELECT '{"name": "Bob", "age": 25}'
        UNION ALL
        SELECT 'invalid json'
    )
""", "Pretty")

print(result)

UDF with Data Transformations

from chdb.udf import chdb_udf
from chdb import query

@chdb_udf()
def normalize_phone(phone):
    import re
    
    # Remove all non-digits
    digits = re.sub(r'\D', '', phone)
    
    # Format as (XXX) XXX-XXXX if US number
    if len(digits) == 10:
        return f"({digits[:3]}) {digits[3:6]}-{digits[6:]}"
    elif len(digits) == 11 and digits[0] == '1':
        return f"({digits[1:4]}) {digits[4:7]}-{digits[7:]}"
    else:
        return "invalid"

@chdb_udf()
def categorize_age(age_str):
    try:
        age = int(age_str)
        if age < 18:
            return "minor"
        elif age < 65:
            return "adult"
        else:
            return "senior"
    except:
        return "unknown"

# Apply transformations to dataset
result = query("""
    SELECT 
        name,
        normalize_phone(phone) as formatted_phone,
        categorize_age(age) as age_category
    FROM (
        SELECT 'Alice' as name, '555-123-4567' as phone, '32' as age
        UNION ALL
        SELECT 'Bob', '(555) 987 6543', '17'
        UNION ALL
        SELECT 'Charlie', '15559876543', '67'
    )
""", "Pretty")

print(result)

UDF with External Libraries

from chdb.udf import chdb_udf
from chdb import query

@chdb_udf(return_type="Float64")
def calculate_distance(lat1, lon1, lat2, lon2):
    import math
    
    # Convert to radians
    lat1, lon1, lat2, lon2 = map(math.radians, [float(lat1), float(lon1), float(lat2), float(lon2)])
    
    # Haversine formula
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
    c = 2 * math.asin(math.sqrt(a))
    
    # Earth radius in kilometers
    r = 6371
    
    return c * r

@chdb_udf()
def hash_string(input_str):
    import hashlib
    
    return hashlib.md5(input_str.encode()).hexdigest()

# Use UDFs with geographic and crypto functions
result = query("""
    SELECT 
        city1,
        city2,
        calculate_distance(lat1, lon1, lat2, lon2) as distance_km,
        hash_string(CONCAT(city1, '-', city2)) as route_hash
    FROM (
        SELECT 
            'New York' as city1, '40.7128' as lat1, '-74.0060' as lon1,
            'Los Angeles' as city2, '34.0522' as lat2, '-118.2437' as lon2
        UNION ALL
        SELECT 
            'Chicago', '41.8781', '-87.6298',
            'Houston', '29.7604', '-95.3698'
    )
""", "Pretty")

print(result)

Error Handling with UDFs

from chdb.udf import chdb_udf
from chdb import query, ChdbError

@chdb_udf()
def safe_divide(a, b):
    try:
        num = float(a)
        den = float(b)
        
        if den == 0:
            return "division_by_zero"
        
        return str(num / den)
    except ValueError:
        return "invalid_input"
    except Exception:
        return "error"

try:
    result = query("""
        SELECT 
            a, b,
            safe_divide(a, b) as result
        FROM (
            SELECT '10' as a, '2' as b
            UNION ALL
            SELECT '15', '0'
            UNION ALL
            SELECT 'abc', '5'
        )
    """, "Pretty")
    
    print(result)
    
except ChdbError as e:
    print(f"Query with UDF failed: {e}")

UDF Path Management

from chdb.udf import chdb_udf
from chdb import query
import tempfile
import os

# Custom UDF path
custom_udf_path = tempfile.mkdtemp()

@chdb_udf()
def custom_function(x):
    return f"processed_{x}"

# Use UDF with custom path
result = query(
    "SELECT custom_function('test') as output",
    udf_path=custom_udf_path
)

print(result)

# Cleanup custom UDF path when done
import shutil
shutil.rmtree(custom_udf_path)

Important UDF Guidelines

Function Requirements

  • Stateless: Functions must be stateless (no global state)
  • String inputs: All parameters received as strings
  • Self-contained: Import all required modules within the function
  • Error handling: Handle exceptions gracefully to avoid query failures

Performance Considerations

  • Process overhead: Each UDF call spawns a Python process
  • Serialization: Data converted between TabSeparated format
  • Best for: Complex logic that can't be expressed in SQL
  • Avoid for: Simple operations that SQL can handle efficiently

Type System

  • Input types: Always strings (converted from TabSeparated)
  • Return types: Must match declared return_type
  • Valid return types: Any ClickHouse data type
  • Type conversion: Automatic conversion between Python and SQL types

Install with Tessl CLI

npx tessl i tessl/pypi-chdb

docs

dataframe.md

dbapi.md

index.md

query-functions.md

sessions.md

udf.md

utils.md

tile.json