tessl i github:sickn33/antigravity-awesome-skills --skill clickhouse-ioClickHouse database patterns, query optimization, analytics, and data engineering best practices for high-performance analytical workloads.
ClickHouse-specific patterns for high-performance analytics and data engineering.
ClickHouse is a column-oriented database management system (DBMS) for online analytical processing (OLAP). It's optimized for fast analytical queries on large datasets.
Key Features:
CREATE TABLE markets_analytics (
date Date,
market_id String,
market_name String,
volume UInt64,
trades UInt32,
unique_traders UInt32,
avg_trade_size Float64,
created_at DateTime
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, market_id)
SETTINGS index_granularity = 8192;-- For data that may have duplicates (e.g., from multiple sources)
CREATE TABLE user_events (
event_id String,
user_id String,
event_type String,
timestamp DateTime,
properties String
) ENGINE = ReplacingMergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (user_id, event_id, timestamp)
PRIMARY KEY (user_id, event_id);-- For maintaining aggregated metrics
CREATE TABLE market_stats_hourly (
hour DateTime,
market_id String,
total_volume AggregateFunction(sum, UInt64),
total_trades AggregateFunction(count, UInt32),
unique_users AggregateFunction(uniq, String)
) ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMM(hour)
ORDER BY (hour, market_id);
-- Query aggregated data
SELECT
hour,
market_id,
sumMerge(total_volume) AS volume,
countMerge(total_trades) AS trades,
uniqMerge(unique_users) AS users
FROM market_stats_hourly
WHERE hour >= toStartOfHour(now() - INTERVAL 24 HOUR)
GROUP BY hour, market_id
ORDER BY hour DESC;-- ✅ GOOD: Use indexed columns first
SELECT *
FROM markets_analytics
WHERE date >= '2025-01-01'
AND market_id = 'market-123'
AND volume > 1000
ORDER BY date DESC
LIMIT 100;
-- ❌ BAD: Filter on non-indexed columns first
SELECT *
FROM markets_analytics
WHERE volume > 1000
AND market_name LIKE '%election%'
AND date >= '2025-01-01';-- ✅ GOOD: Use ClickHouse-specific aggregation functions
SELECT
toStartOfDay(created_at) AS day,
market_id,
sum(volume) AS total_volume,
count() AS total_trades,
uniq(trader_id) AS unique_traders,
avg(trade_size) AS avg_size
FROM trades
WHERE created_at >= today() - INTERVAL 7 DAY
GROUP BY day, market_id
ORDER BY day DESC, total_volume DESC;
-- ✅ Use quantile for percentiles (more efficient than percentile)
SELECT
quantile(0.50)(trade_size) AS median,
quantile(0.95)(trade_size) AS p95,
quantile(0.99)(trade_size) AS p99
FROM trades
WHERE created_at >= now() - INTERVAL 1 HOUR;-- Calculate running totals
SELECT
date,
market_id,
volume,
sum(volume) OVER (
PARTITION BY market_id
ORDER BY date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS cumulative_volume
FROM markets_analytics
WHERE date >= today() - INTERVAL 30 DAY
ORDER BY market_id, date;import { ClickHouse } from 'clickhouse'
const clickhouse = new ClickHouse({
url: process.env.CLICKHOUSE_URL,
port: 8123,
basicAuth: {
username: process.env.CLICKHOUSE_USER,
password: process.env.CLICKHOUSE_PASSWORD
}
})
// ✅ Batch insert (efficient)
async function bulkInsertTrades(trades: Trade[]) {
const values = trades.map(trade => `(
'${trade.id}',
'${trade.market_id}',
'${trade.user_id}',
${trade.amount},
'${trade.timestamp.toISOString()}'
)`).join(',')
await clickhouse.query(`
INSERT INTO trades (id, market_id, user_id, amount, timestamp)
VALUES ${values}
`).toPromise()
}
// ❌ Individual inserts (slow)
async function insertTrade(trade: Trade) {
// Don't do this in a loop!
await clickhouse.query(`
INSERT INTO trades VALUES ('${trade.id}', ...)
`).toPromise()
}// For continuous data ingestion
import { createWriteStream } from 'fs'
import { pipeline } from 'stream/promises'
async function streamInserts() {
const stream = clickhouse.insert('trades').stream()
for await (const batch of dataSource) {
stream.write(batch)
}
await stream.end()
}-- Create materialized view for hourly stats
CREATE MATERIALIZED VIEW market_stats_hourly_mv
TO market_stats_hourly
AS SELECT
toStartOfHour(timestamp) AS hour,
market_id,
sumState(amount) AS total_volume,
countState() AS total_trades,
uniqState(user_id) AS unique_users
FROM trades
GROUP BY hour, market_id;
-- Query the materialized view
SELECT
hour,
market_id,
sumMerge(total_volume) AS volume,
countMerge(total_trades) AS trades,
uniqMerge(unique_users) AS users
FROM market_stats_hourly
WHERE hour >= now() - INTERVAL 24 HOUR
GROUP BY hour, market_id;-- Check slow queries
SELECT
query_id,
user,
query,
query_duration_ms,
read_rows,
read_bytes,
memory_usage
FROM system.query_log
WHERE type = 'QueryFinish'
AND query_duration_ms > 1000
AND event_time >= now() - INTERVAL 1 HOUR
ORDER BY query_duration_ms DESC
LIMIT 10;-- Check table sizes
SELECT
database,
table,
formatReadableSize(sum(bytes)) AS size,
sum(rows) AS rows,
max(modification_time) AS latest_modification
FROM system.parts
WHERE active
GROUP BY database, table
ORDER BY sum(bytes) DESC;-- Daily active users
SELECT
toDate(timestamp) AS date,
uniq(user_id) AS daily_active_users
FROM events
WHERE timestamp >= today() - INTERVAL 30 DAY
GROUP BY date
ORDER BY date;
-- Retention analysis
SELECT
signup_date,
countIf(days_since_signup = 0) AS day_0,
countIf(days_since_signup = 1) AS day_1,
countIf(days_since_signup = 7) AS day_7,
countIf(days_since_signup = 30) AS day_30
FROM (
SELECT
user_id,
min(toDate(timestamp)) AS signup_date,
toDate(timestamp) AS activity_date,
dateDiff('day', signup_date, activity_date) AS days_since_signup
FROM events
GROUP BY user_id, activity_date
)
GROUP BY signup_date
ORDER BY signup_date DESC;-- Conversion funnel
SELECT
countIf(step = 'viewed_market') AS viewed,
countIf(step = 'clicked_trade') AS clicked,
countIf(step = 'completed_trade') AS completed,
round(clicked / viewed * 100, 2) AS view_to_click_rate,
round(completed / clicked * 100, 2) AS click_to_completion_rate
FROM (
SELECT
user_id,
session_id,
event_type AS step
FROM events
WHERE event_date = today()
)
GROUP BY session_id;-- User cohorts by signup month
SELECT
toStartOfMonth(signup_date) AS cohort,
toStartOfMonth(activity_date) AS month,
dateDiff('month', cohort, month) AS months_since_signup,
count(DISTINCT user_id) AS active_users
FROM (
SELECT
user_id,
min(toDate(timestamp)) OVER (PARTITION BY user_id) AS signup_date,
toDate(timestamp) AS activity_date
FROM events
)
GROUP BY cohort, month, months_since_signup
ORDER BY cohort, months_since_signup;// Extract, Transform, Load
async function etlPipeline() {
// 1. Extract from source
const rawData = await extractFromPostgres()
// 2. Transform
const transformed = rawData.map(row => ({
date: new Date(row.created_at).toISOString().split('T')[0],
market_id: row.market_slug,
volume: parseFloat(row.total_volume),
trades: parseInt(row.trade_count)
}))
// 3. Load to ClickHouse
await bulkInsertToClickHouse(transformed)
}
// Run periodically
setInterval(etlPipeline, 60 * 60 * 1000) // Every hour// Listen to PostgreSQL changes and sync to ClickHouse
import { Client } from 'pg'
const pgClient = new Client({ connectionString: process.env.DATABASE_URL })
pgClient.query('LISTEN market_updates')
pgClient.on('notification', async (msg) => {
const update = JSON.parse(msg.payload)
await clickhouse.insert('market_updates', [
{
market_id: update.id,
event_type: update.operation, // INSERT, UPDATE, DELETE
timestamp: new Date(),
data: JSON.stringify(update.new_data)
}
])
})Remember: ClickHouse excels at analytical workloads. Design tables for your query patterns, batch inserts, and leverage materialized views for real-time aggregations.
If you maintain this skill, you can claim it as your own. Once claimed, you can manage eval scenarios, bundle related skills, attach documentation or rules, and ensure cross-agent compatibility.