or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-reading.mdindex.mdquery-operations.mdschema-management.mdtable-maintenance.mdtable-operations.mdtransaction-management.mdwriting-modification.md

query-operations.mddocs/

0

# Query Operations

1

2

SQL querying capabilities using Apache DataFusion integration for running analytical queries on Delta tables with full SQL support and high performance.

3

4

## Capabilities

5

6

### QueryBuilder Class

7

8

```python { .api }

9

class QueryBuilder:

10

def __init__(self) -> None: ...

11

12

def register(self, table_name: str, delta_table: DeltaTable) -> QueryBuilder: ...

13

14

def execute(self, sql: str) -> RecordBatchReader: ...

15

```

16

17

SQL query engine for Delta tables using Apache DataFusion.

18

19

### Query Results

20

21

```python { .api }

22

# RecordBatchReader methods for processing results

23

class RecordBatchReader:

24

def read_next_batch(self) -> RecordBatch | None: ...

25

26

def read_all(self) -> list[RecordBatch]: ...

27

28

def schema(self) -> ArrowSchema: ...

29

30

def __iter__(self) -> Iterator[RecordBatch]: ...

31

```

32

33

Streaming interface for query results.

34

35

## Usage Examples

36

37

### Basic Query Operations

38

39

```python

40

from deltalake import DeltaTable, QueryBuilder

41

42

# Load tables

43

customers_table = DeltaTable("path/to/customers")

44

orders_table = DeltaTable("path/to/orders")

45

46

# Create query builder

47

qb = QueryBuilder()

48

49

# Register tables for querying

50

qb.register("customers", customers_table)

51

qb.register("orders", orders_table)

52

53

# Execute simple query

54

result = qb.execute("SELECT * FROM customers WHERE age > 25")

55

56

# Process results

57

for batch in result:

58

df = batch.to_pandas()

59

print(f"Batch with {len(df)} rows:")

60

print(df.head())

61

```

62

63

### Complex SQL Queries

64

65

```python

66

# Join query

67

join_sql = """

68

SELECT

69

c.customer_id,

70

c.name,

71

c.email,

72

COUNT(o.order_id) as order_count,

73

SUM(o.total_amount) as total_spent

74

FROM customers c

75

LEFT JOIN orders o ON c.customer_id = o.customer_id

76

WHERE c.registration_date >= '2023-01-01'

77

GROUP BY c.customer_id, c.name, c.email

78

HAVING COUNT(o.order_id) > 0

79

ORDER BY total_spent DESC

80

LIMIT 100

81

"""

82

83

result = qb.execute(join_sql)

84

85

# Convert all results to pandas

86

all_batches = result.read_all()

87

combined_df = pd.concat([batch.to_pandas() for batch in all_batches], ignore_index=True)

88

print(f"Top customers by spending:")

89

print(combined_df.head(10))

90

```

91

92

### Aggregation Queries

93

94

```python

95

# Monthly sales aggregation

96

monthly_sales_sql = """

97

SELECT

98

DATE_TRUNC('month', order_date) as month,

99

COUNT(*) as order_count,

100

SUM(total_amount) as total_revenue,

101

AVG(total_amount) as avg_order_value,

102

COUNT(DISTINCT customer_id) as unique_customers

103

FROM orders

104

WHERE order_date >= '2023-01-01'

105

GROUP BY DATE_TRUNC('month', order_date)

106

ORDER BY month

107

"""

108

109

result = qb.execute(monthly_sales_sql)

110

monthly_df = pd.concat([batch.to_pandas() for batch in result], ignore_index=True)

111

112

print("Monthly sales summary:")

113

print(monthly_df)

114

```

115

116

### Window Functions

117

118

```python

119

# Ranking and window functions

120

ranking_sql = """

121

SELECT

122

customer_id,

123

order_date,

124

total_amount,

125

ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY order_date DESC) as order_rank,

126

SUM(total_amount) OVER (PARTITION BY customer_id) as customer_total,

127

LAG(total_amount, 1) OVER (PARTITION BY customer_id ORDER BY order_date) as previous_order_amount

128

FROM orders

129

WHERE order_date >= '2023-01-01'

130

ORDER BY customer_id, order_date DESC

131

"""

132

133

result = qb.execute(ranking_sql)

134

ranking_df = pd.concat([batch.to_pandas() for batch in result], ignore_index=True)

135

136

print("Customer order analysis:")

137

print(ranking_df.head(20))

138

```

139

140

### Time Series Analysis

141

142

```python

143

# Daily sales trend

144

daily_trend_sql = """

145

SELECT

146

order_date,

147

COUNT(*) as orders,

148

SUM(total_amount) as revenue,

149

AVG(total_amount) as avg_order_value,

150

SUM(COUNT(*)) OVER (ORDER BY order_date ROWS UNBOUNDED PRECEDING) as cumulative_orders,

151

SUM(SUM(total_amount)) OVER (ORDER BY order_date ROWS UNBOUNDED PRECEDING) as cumulative_revenue

152

FROM orders

153

WHERE order_date BETWEEN '2023-01-01' AND '2023-12-31'

154

GROUP BY order_date

155

ORDER BY order_date

156

"""

157

158

result = qb.execute(daily_trend_sql)

159

trend_df = pd.concat([batch.to_pandas() for batch in result], ignore_index=True)

160

161

print("Daily sales trend:")

162

print(trend_df.head())

163

```

164

165

### Subqueries and CTEs

166

167

```python

168

# Common Table Expression (CTE) example

169

cte_sql = """

170

WITH customer_stats AS (

171

SELECT

172

customer_id,

173

COUNT(*) as order_count,

174

SUM(total_amount) as total_spent,

175

AVG(total_amount) as avg_order_value,

176

MIN(order_date) as first_order_date,

177

MAX(order_date) as last_order_date

178

FROM orders

179

GROUP BY customer_id

180

),

181

customer_segments AS (

182

SELECT

183

customer_id,

184

order_count,

185

total_spent,

186

CASE

187

WHEN total_spent >= 1000 THEN 'High Value'

188

WHEN total_spent >= 500 THEN 'Medium Value'

189

ELSE 'Low Value'

190

END as customer_segment

191

FROM customer_stats

192

)

193

SELECT

194

cs.customer_segment,

195

COUNT(*) as customer_count,

196

AVG(cs.total_spent) as avg_total_spent,

197

AVG(cs.order_count) as avg_order_count

198

FROM customer_segments cs

199

GROUP BY cs.customer_segment

200

ORDER BY avg_total_spent DESC

201

"""

202

203

result = qb.execute(cte_sql)

204

segments_df = pd.concat([batch.to_pandas() for batch in result], ignore_index=True)

205

206

print("Customer segmentation:")

207

print(segments_df)

208

```

209

210

### Working with Multiple Tables

211

212

```python

213

# Register additional tables

214

products_table = DeltaTable("path/to/products")

215

order_items_table = DeltaTable("path/to/order_items")

216

217

qb.register("products", products_table)

218

qb.register("order_items", order_items_table)

219

220

# Complex multi-table query

221

multi_table_sql = """

222

SELECT

223

p.category,

224

p.product_name,

225

SUM(oi.quantity) as total_quantity_sold,

226

SUM(oi.quantity * oi.unit_price) as total_revenue,

227

COUNT(DISTINCT o.customer_id) as unique_customers,

228

AVG(oi.unit_price) as avg_unit_price

229

FROM products p

230

JOIN order_items oi ON p.product_id = oi.product_id

231

JOIN orders o ON oi.order_id = o.order_id

232

WHERE o.order_date >= '2023-01-01'

233

GROUP BY p.category, p.product_name

234

HAVING SUM(oi.quantity) >= 10

235

ORDER BY total_revenue DESC

236

LIMIT 50

237

"""

238

239

result = qb.execute(multi_table_sql)

240

products_df = pd.concat([batch.to_pandas() for batch in result], ignore_index=True)

241

242

print("Top products by revenue:")

243

print(products_df.head())

244

```

245

246

### Streaming Query Processing

247

248

```python

249

def process_large_query_in_batches(sql: str, batch_processor=None):

250

"""Process large query results in batches to manage memory"""

251

result = qb.execute(sql)

252

253

total_rows = 0

254

batch_count = 0

255

256

for batch in result:

257

batch_count += 1

258

batch_rows = batch.num_rows

259

total_rows += batch_rows

260

261

print(f"Processing batch {batch_count} with {batch_rows} rows")

262

263

if batch_processor:

264

# Custom processing function

265

batch_processor(batch)

266

else:

267

# Default: convert to pandas and show summary

268

df = batch.to_pandas()

269

print(f" Batch summary: {df.describe()}")

270

271

print(f"Processed {total_rows} total rows in {batch_count} batches")

272

273

# Example batch processor

274

def save_batch_to_csv(batch):

275

df = batch.to_pandas()

276

filename = f"output_batch_{hash(str(batch))}.csv"

277

df.to_csv(filename, index=False)

278

print(f" Saved batch to {filename}")

279

280

# Use streaming processing

281

large_query_sql = "SELECT * FROM orders WHERE order_date >= '2020-01-01'"

282

process_large_query_in_batches(large_query_sql, save_batch_to_csv)

283

```

284

285

### Query Performance Optimization

286

287

```python

288

# Performance tips and optimized queries

289

290

# 1. Use column selection to reduce data transfer

291

optimized_sql = """

292

SELECT customer_id, total_amount, order_date

293

FROM orders

294

WHERE order_date >= '2023-01-01'

295

"""

296

297

# 2. Use partition pruning when possible

298

partition_pruned_sql = """

299

SELECT * FROM orders

300

WHERE year = '2023' AND month = '01' -- Assuming partitioned by year/month

301

"""

302

303

# 3. Use LIMIT for exploration

304

exploration_sql = """

305

SELECT * FROM customers

306

ORDER BY registration_date DESC

307

LIMIT 1000

308

"""

309

310

# 4. Use appropriate data types in filters

311

typed_filter_sql = """

312

SELECT * FROM orders

313

WHERE total_amount > CAST(100.0 AS DOUBLE)

314

AND order_date >= DATE '2023-01-01'

315

"""

316

317

# Execute optimized queries

318

for description, sql in [

319

("Optimized column selection", optimized_sql),

320

("Partition pruning", partition_pruned_sql),

321

("Limited exploration", exploration_sql),

322

("Typed filters", typed_filter_sql)

323

]:

324

print(f"\n{description}:")

325

result = qb.execute(sql)

326

first_batch = result.read_next_batch()

327

if first_batch:

328

print(f" Returned {first_batch.num_rows} rows")

329

```

330

331

### Error Handling and Debugging

332

333

```python

334

def safe_query_execution(sql: str):

335

"""Execute query with proper error handling"""

336

try:

337

print(f"Executing query: {sql[:100]}...")

338

result = qb.execute(sql)

339

340

# Get schema information

341

schema_info = result.schema()

342

print(f"Result schema: {schema_info}")

343

344

# Process results

345

row_count = 0

346

for batch in result:

347

row_count += batch.num_rows

348

349

print(f"Query completed successfully. Total rows: {row_count}")

350

return row_count

351

352

except Exception as e:

353

print(f"Query failed: {e}")

354

print(f"Query: {sql}")

355

return None

356

357

# Test queries with error handling

358

test_queries = [

359

"SELECT COUNT(*) FROM customers",

360

"SELECT * FROM non_existent_table", # This will fail

361

"SELECT invalid_column FROM orders", # This will fail

362

"SELECT customer_id, COUNT(*) FROM orders GROUP BY customer_id LIMIT 10"

363

]

364

365

for query in test_queries:

366

safe_query_execution(query)

367

print("-" * 50)

368

```