CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pypika

A SQL query builder API for Python that provides programmatic interface for constructing SQL queries.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

dialects.mddocs/

Database Dialects

Database-specific query builders providing specialized features and syntax for different SQL databases. Each dialect extends the base Query functionality with database-specific optimizations, functions, and SQL syntax variations.

Capabilities

MySQL Dialect

MySQL-specific query builder with support for MySQL syntax and features.

class MySQLQuery(Query):
    """MySQL-specific query builder."""
    
    def on_duplicate_key_update(self, **kwargs) -> MySQLQuery:
        """Add ON DUPLICATE KEY UPDATE clause for INSERT statements."""
        
    def on_duplicate_key_ignore(self) -> MySQLQuery:
        """Add ON DUPLICATE KEY IGNORE clause for INSERT statements."""

Usage Examples:

from pypika import MySQLQuery, Table

users = Table('users')

# ON DUPLICATE KEY UPDATE
query = (MySQLQuery.into(users)
         .columns(users.id, users.name, users.email)
         .insert(1, 'John Doe', 'john@example.com')
         .on_duplicate_key_update(
             name='John Doe Updated',
             email='john.updated@example.com'
         ))

# ON DUPLICATE KEY IGNORE
query = (MySQLQuery.into(users)
         .columns(users.name, users.email)
         .insert('Jane Doe', 'jane@example.com')
         .on_duplicate_key_ignore())

# MySQL-specific functions
from pypika.functions import Concat
query = MySQLQuery.from_(users).select(
    Concat(users.first_name, ' ', users.last_name).as_('full_name')
)

PostgreSQL Dialect

PostgreSQL-specific query builder with advanced PostgreSQL features.

class PostgreSQLQuery(Query):
    """PostgreSQL-specific query builder."""
    
    def on_conflict(self, *target_fields) -> OnConflictQueryBuilder:
        """Add ON CONFLICT clause for INSERT statements."""
        
    def returning(self, *terms) -> PostgreSQLQuery:
        """Add RETURNING clause."""
        
    def distinct_on(self, *terms) -> PostgreSQLQuery:
        """Add DISTINCT ON clause."""

class OnConflictQueryBuilder:
    def do_nothing(self) -> PostgreSQLQuery:
        """Add DO NOTHING to ON CONFLICT."""
        
    def do_update(self, **kwargs) -> PostgreSQLQuery:
        """Add DO UPDATE to ON CONFLICT."""

Usage Examples:

from pypika import PostgreSQLQuery, Table

users = Table('users')
orders = Table('orders')

# ON CONFLICT DO NOTHING
query = (PostgreSQLQuery.into(users)
         .columns(users.email, users.name)
         .insert('john@example.com', 'John Doe')
         .on_conflict(users.email)
         .do_nothing())

# ON CONFLICT DO UPDATE
query = (PostgreSQLQuery.into(users)
         .columns(users.email, users.name, users.last_login)
         .insert('john@example.com', 'John Doe', 'NOW()')
         .on_conflict(users.email)
         .do_update(
             name='John Doe Updated',
             last_login='NOW()'
         ))

# RETURNING clause
query = (PostgreSQLQuery.into(users)
         .columns(users.name, users.email)
         .insert('John Doe', 'john@example.com')
         .returning(users.id, users.created_at))

# UPDATE with RETURNING
query = (PostgreSQLQuery.update(users)
         .set(users.last_login, 'NOW()')
         .where(users.id == 1)
         .returning(users.id, users.last_login))

# DISTINCT ON
query = (PostgreSQLQuery.from_(orders)
         .distinct_on(orders.customer_id)
         .select(orders.customer_id, orders.order_date, orders.total)
         .orderby(orders.customer_id, orders.order_date.desc()))

# JSON operations (PostgreSQL specific)
from pypika import JSON
profile = users.profile  # JSON field
query = (PostgreSQLQuery.from_(users)
         .select(
             users.id,
             profile.get_text_value('name').as_('profile_name'),
             profile.get_path_json_value(['address', 'city']).as_('city')
         )
         .where(profile.has_key('email')))

Oracle Dialect

Oracle Database-specific query builder with Oracle SQL features.

class OracleQuery(Query):
    """Oracle Database-specific query builder."""

Usage Examples:

from pypika import OracleQuery, Table
from pypika.pseudocolumns import RowNum, SysDate

users = Table('users')

# Oracle-specific pseudocolumns
query = (OracleQuery.from_(users)
         .select(users.name, users.email, RowNum.as_('row_number'))
         .where(RowNum <= 10))

# Using SYSDATE
query = (OracleQuery.from_(users)
         .select('*')
         .where(users.created_at > SysDate - 30))

# Oracle date formatting
from pypika.functions import ToChar
query = (OracleQuery.from_(users)
         .select(
             users.name,
             ToChar(users.created_at, 'YYYY-MM-DD').as_('created_date')
         ))

Microsoft SQL Server Dialect

MSSQL-specific query builder with SQL Server features.

class MSSQLQuery(Query):
    """Microsoft SQL Server-specific query builder."""
    
    def top(self, limit) -> MSSQLQuery:
        """Add TOP clause."""

Usage Examples:

from pypika import MSSQLQuery, Table

users = Table('users')
orders = Table('orders')

# TOP clause (SQL Server style)
query = (MSSQLQuery.from_(users)
         .top(10)
         .select('*')
         .orderby(users.created_at.desc()))

# WITH clause and CTE
user_stats = (MSSQLQuery.from_(orders)
              .select(orders.user_id, Count('*').as_('order_count'))
              .groupby(orders.user_id))

query = (MSSQLQuery.with_(user_stats, 'user_stats')
         .from_(users)
         .join('user_stats').on(users.id == 'user_stats.user_id')
         .select(users.name, 'user_stats.order_count'))

ClickHouse Dialect

Yandex ClickHouse-specific query builder with ClickHouse optimizations.

class ClickHouseQuery(Query):
    """ClickHouse-specific query builder."""

Usage Examples:

from pypika import ClickHouseQuery, Table

events = Table('events')

# ClickHouse-specific functions
from pypika.clickhouse.dates_and_times import ToYYYYMM, AddDays
from pypika.clickhouse.array import HasAny, Length
from pypika.clickhouse.type_conversion import ToString, ToInt64

# Date/time functions
query = (ClickHouseQuery.from_(events)
         .select(
             ToYYYYMM(events.created_at).as_('year_month'),
             AddDays(events.created_at, 30).as_('plus_30_days')
         ))

# Array functions
query = (ClickHouseQuery.from_(events)
         .select(
             events.user_id,
             Length(events.tags).as_('tag_count')
         )
         .where(HasAny(events.tags, ['important', 'urgent'])))

# Type conversion functions
query = (ClickHouseQuery.from_(events)
         .select(
             ToString(events.user_id).as_('user_id_str'),
             ToInt64(events.score).as_('score_int')
         ))

# Conditional functions
from pypika.clickhouse.condition import If, MultiIf
query = (ClickHouseQuery.from_(events)
         .select(
             events.user_id,
             If(events.score > 100, 'high', 'low').as_('score_category'),
             MultiIf(
                 events.score > 200, 'very_high',
                 events.score > 100, 'high',
                 events.score > 50, 'medium',
                 'low'
             ).as_('detailed_category')
         ))

SQLite Dialect

SQLite-specific query builder with SQLite features and limitations.

class SQLLiteQuery(Query):
    """SQLite-specific query builder."""

Usage Examples:

from pypika import SQLLiteQuery, Table

users = Table('users')

# SQLite-specific date handling
from pypika.functions import Date, DateTime
query = (SQLLiteQuery.from_(users)
         .select(
             users.name,
             Date(users.created_at).as_('created_date'),
             DateTime(users.created_at, '+1 month').as_('next_month')
         ))

# SQLite LIMIT/OFFSET
query = (SQLLiteQuery.from_(users)
         .select('*')
         .orderby(users.id)
         .limit(20)
         .offset(100))

# SQLite aggregate functions
from pypika.functions import Count, Sum, Avg
query = (SQLLiteQuery.from_(users)
         .select(
             Count('*').as_('total_users'),
             Avg(users.age).as_('avg_age')
         ))

Redshift Dialect

Amazon Redshift-specific query builder with Redshift optimizations.

class RedshiftQuery(Query):
    """Amazon Redshift-specific query builder."""

Usage Examples:

from pypika import RedshiftQuery, Table

events = Table('events')
users = Table('users')

# Redshift-specific window functions
from pypika.analytics import RowNumber, Rank
query = (RedshiftQuery.from_(events)
         .select(
             events.user_id,
             events.event_time,
             RowNumber().over(events.user_id).orderby(events.event_time).as_('event_sequence'),
             Rank().over().orderby(events.score.desc()).as_('global_rank')
         ))

# Redshift date functions
from pypika.functions import DateDiff, DateAdd
query = (RedshiftQuery.from_(users)
         .select(
             users.id,
             DateDiff('day', users.created_at, 'CURRENT_DATE').as_('account_age_days'),
             DateAdd('month', 1, users.created_at).as_('one_month_later')
         ))

Vertica Dialect

Vertica-specific query builder with Vertica analytical features.

class VerticaQuery(Query):
    """Vertica-specific query builder."""

Usage Examples:

from pypika import VerticaQuery, Table

sales = Table('sales')
products = Table('products')

# Vertica analytical functions
from pypika.analytics import Sum, Avg, FirstValue, LastValue
query = (VerticaQuery.from_(sales)
         .select(
             sales.product_id,
             sales.sale_date,
             sales.amount,
             Sum(sales.amount).over(sales.product_id).orderby(sales.sale_date).as_('running_total'),
             Avg(sales.amount).over().orderby(sales.sale_date).rows_between(-6, 0).as_('7_day_avg'),
             FirstValue(sales.amount).over(sales.product_id).orderby(sales.sale_date).as_('first_sale')
         ))

# Vertica time series functions
from pypika.functions import TimestampAdd
query = (VerticaQuery.from_(sales)
         .select(
             sales.product_id,
             TimestampAdd('hour', 1, sales.sale_timestamp).as_('plus_one_hour')
         ))

Snowflake Dialect

Snowflake-specific query builder with cloud data warehouse optimizations.

class SnowflakeQuery(Query):
    """Snowflake-specific query builder."""

Usage Examples:

from pypika import SnowflakeQuery, Table

users = Table('users')
orders = Table('orders')

# Basic Snowflake query
query = (SnowflakeQuery.from_(users)
         .select(users.id, users.name, users.email)
         .where(users.active == True))

# Snowflake-specific date functions
from pypika.functions import DateDiff, DateAdd
query = (SnowflakeQuery.from_(users)
         .select(
             users.id,
             DateDiff('day', users.created_at, 'CURRENT_DATE').as_('account_age_days'),
             DateAdd('month', 1, users.created_at).as_('one_month_later')
         ))

# Window functions with Snowflake syntax
from pypika.analytics import RowNumber, Avg
query = (SnowflakeQuery.from_(orders)
         .select(
             orders.customer_id,
             orders.order_date,
             orders.amount,
             RowNumber().over(orders.customer_id).orderby(orders.order_date).as_('order_sequence'),
             Avg(orders.amount).over().orderby(orders.order_date).rows_between(-2, 0).as_('3_order_avg')
         ))

Dialect Selection

Choosing the appropriate dialect based on your database system.

Usage Examples:

from pypika import (
    MySQLQuery, PostgreSQLQuery, OracleQuery, MSSQLQuery,
    ClickHouseQuery, RedshiftQuery, SQLLiteQuery, VerticaQuery,
    Dialects
)

# Dialect-specific table creation
def create_user_table(dialect='postgresql'):
    users = Table('users')
    
    if dialect == 'postgresql':
        return (PostgreSQLQuery.create_table(users)
                .columns(
                    Column('id', 'SERIAL', primary_key=True),
                    Column('email', 'VARCHAR(255)', nullable=False, unique=True),
                    Column('profile', 'JSONB'),
                    Column('created_at', 'TIMESTAMP', default='NOW()')
                ))
    
    elif dialect == 'mysql':
        return (MySQLQuery.create_table(users)
                .columns(
                    Column('id', 'INT AUTO_INCREMENT', primary_key=True),
                    Column('email', 'VARCHAR(255)', nullable=False, unique=True),
                    Column('profile', 'JSON'),
                    Column('created_at', 'TIMESTAMP', default='CURRENT_TIMESTAMP')
                ))
    
    elif dialect == 'sqlite':
        return (SQLLiteQuery.create_table(users)
                .columns(
                    Column('id', 'INTEGER', primary_key=True, autoincrement=True),
                    Column('email', 'TEXT', nullable=False, unique=True),
                    Column('profile', 'TEXT'),
                    Column('created_at', 'DATETIME', default='CURRENT_TIMESTAMP')
                ))

# Dynamic dialect selection
def get_query_class(database_type):
    dialect_map = {
        'postgresql': PostgreSQLQuery,
        'mysql': MySQLQuery,
        'oracle': OracleQuery,
        'mssql': MSSQLQuery,
        'clickhouse': ClickHouseQuery,
        'redshift': RedshiftQuery,
        'sqlite': SQLLiteQuery,
        'vertica': VerticaQuery
    }
    return dialect_map.get(database_type, Query)

# Usage with dynamic selection
database_type = 'postgresql'  # From configuration
QueryClass = get_query_class(database_type)
users = Table('users')

query = QueryClass.from_(users).select('*').where(users.active == True)

Dialect Enumerations

class Dialects:
    VERTICA: str
    CLICKHOUSE: str
    ORACLE: str
    MSSQL: str
    MYSQL: str
    POSTGRESQL: str
    REDSHIFT: str
    SQLLITE: str
    SNOWFLAKE: str

Usage Examples:

from pypika import Dialects, Interval

# Using dialect enumeration
current_dialect = Dialects.POSTGRESQL

# Dialect-specific interval formatting
if current_dialect == Dialects.POSTGRESQL:
    interval = Interval(days=30, dialect=Dialects.POSTGRESQL)
elif current_dialect == Dialects.MYSQL:
    interval = Interval(days=30, dialect=Dialects.MYSQL)

# Dialect-aware function selection
def get_date_function(dialect):
    if dialect == Dialects.POSTGRESQL:
        return 'NOW()'
    elif dialect == Dialects.MYSQL:
        return 'NOW()'
    elif dialect == Dialects.ORACLE:
        return 'SYSDATE'
    elif dialect == Dialects.MSSQL:
        return 'GETDATE()'
    else:
        return 'CURRENT_TIMESTAMP'

Install with Tessl CLI

npx tessl i tessl/pypi-pypika

docs

analytics.md

core-queries.md

dialects.md

functions.md

index.md

parameters-types.md

tables-schema.md

terms-expressions.md

tile.json