or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

ai-ml.mdcatalog.mddata-io.mddataframe-operations.mdexpressions.mdindex.mdsession.mdsql.mdudf.md

ai-ml.mddocs/

0

# AI/ML Functions

1

2

Built-in functions for AI and machine learning workflows including text embeddings, large language model generation, and model inference operations optimized for distributed processing.

3

4

## Capabilities

5

6

### Text Embeddings

7

8

Generate vector embeddings from text data for semantic search and similarity operations.

9

10

```python { .api }

11

def embed_text(

12

text: Expression,

13

*,

14

provider: str | Provider | None = None,

15

model: str | None = None,

16

**options: str,

17

) -> Expression:

18

"""

19

Generate text embeddings using specified model and provider.

20

21

Parameters:

22

- text: Text expression to embed

23

- provider: Provider to use for embedding model (e.g., "sentence_transformers", "openai")

24

- model: Model name/identifier for embedding generation

25

- options: Additional options to pass for the model

26

27

Returns:

28

Expression: Vector embedding expression

29

30

Examples:

31

>>> df.select(embed_text(col("description"), model="all-MiniLM-L6-v2"))

32

>>> df.select(embed_text(col("query"), provider="openai", model="text-embedding-ada-002"))

33

"""

34

```

35

36

### Image Embeddings

37

38

Generate vector embeddings from image data for visual similarity and search operations.

39

40

```python { .api }

41

def embed_image(

42

image: Expression,

43

*,

44

provider: str | Provider | None = None,

45

model: str | None = None,

46

**options: str,

47

) -> Expression:

48

"""

49

Generate image embeddings using specified model and provider.

50

51

Parameters:

52

- image: Image expression to embed

53

- provider: Provider to use for embedding model (e.g., "transformers", "openai")

54

- model: Model name/identifier for image embedding generation

55

- options: Additional options to pass for the model

56

57

Returns:

58

Expression: Vector embedding expression

59

60

Examples:

61

>>> df.select(embed_image(col("image"), provider="transformers", model="clip-vit-base-patch32"))

62

>>> df.select(embed_image(col("photo"), provider="openai", model="clip"))

63

"""

64

```

65

66

### Large Language Model Generation

67

68

Generate text using large language models for various NLP tasks.

69

70

```python { .api }

71

def llm_generate(

72

input_column: Expression,

73

model: str = "facebook/opt-125m",

74

provider: Literal["vllm", "openai"] = "vllm",

75

concurrency: int = 1,

76

batch_size: int | None = None,

77

num_cpus: int | None = None,

78

num_gpus: int | None = None,

79

**generation_config: dict[str, Any],

80

) -> Expression:

81

"""

82

Generate text using large language model with specified provider.

83

84

Parameters:

85

- input_column: Input text expression for generation

86

- model: Model identifier (default: "facebook/opt-125m")

87

- provider: LLM provider ("vllm" or "openai")

88

- concurrency: Number of concurrent model instances

89

- batch_size: Batch size for processing

90

- num_cpus: CPU resources to allocate

91

- num_gpus: GPU resources to allocate

92

- generation_config: Model parameters (temperature, max_tokens, etc.)

93

94

Returns:

95

Expression: Generated text expression

96

97

Examples:

98

>>> df.select(llm_generate(col("question"), model="gpt-4o", provider="openai"))

99

>>> df.select(llm_generate(col("prompt"), model="facebook/opt-125m", provider="vllm", temperature=0.7))

100

"""

101

```

102

103

### PyTorch Integration

104

105

Convert DataFrames to PyTorch datasets for deep learning workflows.

106

107

```python { .api }

108

class DataFrame:

109

def to_torch_map_dataset(

110

self,

111

shard_strategy: Optional[Literal["file"]] = None,

112

world_size: Optional[int] = None,

113

rank: Optional[int] = None,

114

) -> "torch.utils.data.Dataset":

115

"""

116

Convert DataFrame to PyTorch map-style dataset.

117

118

Parameters:

119

- shard_strategy: Strategy for distributed sharding

120

- world_size: Total number of processes for distributed training

121

- rank: Current process rank for distributed training

122

123

Returns:

124

torch.utils.data.Dataset: PyTorch map-style dataset

125

"""

126

127

def to_torch_iter_dataset(

128

self,

129

shard_strategy: Optional[Literal["file"]] = None,

130

world_size: Optional[int] = None,

131

rank: Optional[int] = None,

132

) -> "torch.utils.data.IterableDataset":

133

"""

134

Convert DataFrame to PyTorch iterable dataset.

135

136

Parameters:

137

- shard_strategy: Strategy for distributed sharding

138

- world_size: Total number of processes for distributed training

139

- rank: Current process rank for distributed training

140

141

Returns:

142

torch.utils.data.IterableDataset: PyTorch iterable dataset

143

"""

144

```

145

146

### Device Management

147

148

Utility functions for optimal device selection in AI/ML workflows.

149

150

```python { .api }

151

def get_device() -> "torch.device":

152

"""

153

Get optimal PyTorch device (CUDA GPU if available, otherwise CPU).

154

155

Returns:

156

torch.device: Optimal device for computation

157

"""

158

```

159

160

### Provider Management

161

162

Functions for managing AI/ML providers and models.

163

164

```python { .api }

165

def load_openai(name: str | None = None, **options) -> Provider:

166

"""Load OpenAI provider with configuration."""

167

168

def load_sentence_transformers(name: str | None = None, **options) -> Provider:

169

"""Load Sentence Transformers provider with configuration."""

170

171

def load_transformers(name: str | None = None, **options) -> Provider:

172

"""Load Hugging Face Transformers provider with configuration."""

173

174

def load_lm_studio(name: str | None = None, **options) -> Provider:

175

"""Load LM Studio provider with configuration."""

176

177

def load_provider(provider: str, name: str | None = None, **options) -> Provider:

178

"""Load provider by name with configuration."""

179

```

180

181

## Usage Examples

182

183

### Image Similarity and Visual Search

184

```python

185

import daft

186

from daft import col

187

from daft.functions import embed_image

188

189

# Dataset with image paths or URLs

190

images_df = daft.from_pydict({

191

"image_id": [1, 2, 3, 4],

192

"image_path": [

193

"s3://bucket/images/product1.jpg",

194

"s3://bucket/images/product2.jpg",

195

"s3://bucket/images/product3.jpg",

196

"s3://bucket/images/query.jpg"

197

]

198

})

199

200

# Load and decode images

201

decoded_images = images_df.select(

202

col("image_id"),

203

col("image_path").url.download().image.decode().alias("image")

204

)

205

206

# Generate image embeddings for visual similarity

207

embedded_images = decoded_images.select(

208

col("image_id"),

209

col("image"),

210

embed_image(col("image"), provider="transformers", model="clip-vit-base-patch32").alias("image_embedding")

211

)

212

213

# Compute visual similarity (using cosine distance)

214

query_embedding = embedded_images.filter(col("image_id") == 4).collect()[0]["image_embedding"][0]

215

216

similarities = embedded_images.select(

217

col("image_id"),

218

col("image_embedding").embedding.cosine_distance(lit(query_embedding)).alias("similarity")

219

).filter(col("image_id") != 4).sort("similarity").collect()

220

```

221

222

### PyTorch Deep Learning Integration

223

```python

224

import torch

225

from torch.utils.data import DataLoader

226

227

# Create DataFrame with image data and labels

228

training_df = daft.from_pydict({

229

"image_path": ["train/cat1.jpg", "train/dog1.jpg", "train/cat2.jpg"],

230

"label": [0, 1, 0] # 0=cat, 1=dog

231

})

232

233

# Load and preprocess images

234

processed_df = training_df.select(

235

col("image_path").url.download().image.decode().image.resize(224, 224).alias("image"),

236

col("label")

237

)

238

239

# Convert to PyTorch dataset for training

240

torch_dataset = processed_df.to_torch_map_dataset()

241

242

# Create DataLoader for training

243

dataloader = DataLoader(torch_dataset, batch_size=32, shuffle=True)

244

245

# Use in PyTorch training loop

246

for batch in dataloader:

247

images = batch["image"]

248

labels = batch["label"]

249

# Training code here...

250

```

251

252

### Text Similarity and Semantic Search

253

```python

254

import daft

255

from daft import col

256

from daft.functions import embed_text

257

258

# Create dataset with text descriptions

259

documents = daft.from_pydict({

260

"doc_id": [1, 2, 3, 4],

261

"title": ["Machine Learning", "Deep Learning", "Data Science", "Statistics"],

262

"description": [

263

"Algorithms that learn from data",

264

"Neural networks with multiple layers",

265

"Extracting insights from data",

266

"Mathematical analysis of data"

267

]

268

})

269

270

# Generate embeddings for semantic search

271

embedded_docs = documents.select(

272

col("doc_id"),

273

col("title"),

274

col("description"),

275

embed_text(col("description"), "sentence-transformers/all-MiniLM-L6-v2").alias("embedding")

276

)

277

278

# Create query embedding

279

query_df = daft.from_pydict({"query": ["artificial intelligence algorithms"]})

280

query_embedded = query_df.select(

281

embed_text(col("query"), "sentence-transformers/all-MiniLM-L6-v2").alias("query_embedding")

282

)

283

284

# Compute similarity (cosine similarity would be implemented as UDF)

285

@daft.func

286

def cosine_similarity(vec1: list, vec2: list) -> float:

287

import numpy as np

288

v1, v2 = np.array(vec1), np.array(vec2)

289

return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))

290

291

# Find most similar documents

292

similarities = embedded_docs.select(

293

col("title"),

294

cosine_similarity(col("embedding"), query_embedded.collect()[0]["query_embedding"][0]).alias("similarity")

295

).collect()

296

```

297

298

### Content Generation and Augmentation

299

```python

300

from daft.functions import llm_generate

301

302

# Dataset with partial content to augment

303

content_df = daft.from_pydict({

304

"topic": ["Climate Change", "Space Exploration", "Renewable Energy"],

305

"outline": [

306

"Impact on global temperatures",

307

"Mars colonization plans",

308

"Solar and wind power efficiency"

309

]

310

})

311

312

# Generate detailed content

313

augmented_content = content_df.select(

314

col("topic"),

315

col("outline"),

316

llm_generate(

317

daft.sql_expr("'Write a detailed paragraph about: ' || outline"),

318

"gpt-3.5-turbo",

319

max_tokens=200,

320

temperature=0.7

321

).alias("detailed_content")

322

)

323

324

# Generate summaries

325

summaries = content_df.select(

326

col("topic"),

327

llm_generate(

328

daft.sql_expr("'Summarize in one sentence: ' || outline"),

329

"gpt-3.5-turbo",

330

max_tokens=50

331

).alias("summary")

332

)

333

```

334

335

### Question Answering System

336

```python

337

# FAQ dataset

338

faq_df = daft.from_pydict({

339

"category": ["Technical", "Billing", "Support", "General"],

340

"question": [

341

"How do I install the software?",

342

"What payment methods are accepted?",

343

"How to contact customer support?",

344

"What is your company mission?"

345

],

346

"context": [

347

"Installation guide and system requirements",

348

"Payment processing and billing information",

349

"Support channels and contact information",

350

"Company background and mission statement"

351

]

352

})

353

354

# Generate comprehensive answers

355

qa_responses = faq_df.select(

356

col("category"),

357

col("question"),

358

llm_generate(

359

daft.sql_expr("'Context: ' || context || '\nQuestion: ' || question || '\nAnswer:'"),

360

"gpt-4",

361

max_tokens=150,

362

temperature=0.3

363

).alias("answer")

364

)

365

```

366

367

### Text Classification and Analysis

368

```python

369

# Customer feedback dataset

370

feedback_df = daft.from_pydict({

371

"customer_id": [1, 2, 3, 4, 5],

372

"feedback": [

373

"Great product, very satisfied!",

374

"Terrible service, not recommended",

375

"Average experience, could be better",

376

"Excellent customer support team",

377

"Product quality is disappointing"

378

]

379

})

380

381

# Classify sentiment

382

sentiment_analysis = feedback_df.select(

383

col("customer_id"),

384

col("feedback"),

385

llm_generate(

386

daft.sql_expr("'Classify the sentiment of this feedback as Positive, Negative, or Neutral: ' || feedback"),

387

"gpt-3.5-turbo",

388

max_tokens=10,

389

temperature=0.1

390

).alias("sentiment"),

391

embed_text(col("feedback"), "sentence-transformers/all-MiniLM-L6-v2").alias("feedback_embedding")

392

)

393

394

# Extract key themes

395

theme_extraction = feedback_df.select(

396

col("customer_id"),

397

col("feedback"),

398

llm_generate(

399

daft.sql_expr("'Extract 3 key themes from this feedback: ' || feedback"),

400

"gpt-3.5-turbo",

401

max_tokens=50

402

).alias("themes")

403

)

404

```

405

406

### Multi-modal Content Processing

407

```python

408

# Dataset with mixed content types

409

content_df = daft.from_pydict({

410

"content_id": [1, 2, 3],

411

"content_type": ["article", "social_post", "review"],

412

"text": [

413

"Long-form article about technology trends",

414

"Short social media post #trending",

415

"Detailed product review with pros and cons"

416

]

417

})

418

419

# Generate type-specific processing

420

processed_content = content_df.select(

421

col("content_id"),

422

col("content_type"),

423

col("text"),

424

# Conditional processing based on content type

425

daft.when(col("content_type") == "article")

426

.then(llm_generate(

427

daft.sql_expr("'Create a compelling headline for this article: ' || text"),

428

"gpt-3.5-turbo"

429

))

430

.when(col("content_type") == "social_post")

431

.then(llm_generate(

432

daft.sql_expr("'Suggest 3 relevant hashtags for: ' || text"),

433

"gpt-3.5-turbo"

434

))

435

.otherwise(llm_generate(

436

daft.sql_expr("'Rate this review from 1-5 stars and explain: ' || text"),

437

"gpt-3.5-turbo"

438

))

439

.alias("processed_output")

440

)

441

```

442

443

### Batch Processing with AI Functions

444

```python

445

# Large dataset for batch AI processing

446

large_dataset = daft.read_parquet("s3://bucket/customer-reviews/*.parquet")

447

448

# Efficient batch processing

449

batch_ai_results = (large_dataset

450

.filter(col("review_text").is_not_null())

451

.select(

452

col("review_id"),

453

col("product_id"),

454

col("review_text"),

455

# Generate embeddings for similarity search

456

embed_text(col("review_text"), "all-MiniLM-L6-v2").alias("review_embedding"),

457

# Extract sentiment

458

llm_generate(

459

daft.sql_expr("'Sentiment (Positive/Negative/Neutral):' || review_text"),

460

"gpt-3.5-turbo",

461

max_tokens=10

462

).alias("sentiment"),

463

# Generate summary for long reviews

464

daft.when(daft.col("review_text").str_length() > 500)

465

.then(llm_generate(

466

daft.sql_expr("'Summarize this review in 2 sentences: ' || review_text"),

467

"gpt-3.5-turbo",

468

max_tokens=100

469

))

470

.otherwise(col("review_text"))

471

.alias("summary")

472

)

473

.repartition(10) # Distribute processing

474

.collect()

475

)

476

```

477

478

### Custom Model Integration

479

```python

480

# Using custom or specialized models

481

specialized_df = daft.from_pydict({

482

"medical_text": [

483

"Patient presents with fever and cough",

484

"Symptoms include headache and fatigue",

485

"No significant medical history reported"

486

]

487

})

488

489

# Medical text analysis with domain-specific model

490

medical_analysis = specialized_df.select(

491

col("medical_text"),

492

embed_text(col("medical_text"), "clinical-bert-base").alias("clinical_embedding"),

493

llm_generate(

494

daft.sql_expr("'Extract medical entities from: ' || medical_text"),

495

"biomedical-llm",

496

temperature=0.1

497

).alias("medical_entities")

498

)

499

```

500

501

### Performance Optimization

502

```python

503

# Optimize AI function calls for large datasets

504

optimized_processing = (large_dataset

505

.filter(col("text_length") > 10) # Pre-filter short texts

506

.repartition(20) # Distribute load

507

.select(

508

col("id"),

509

# Batch similar operations together

510

embed_text(col("title") + " " + col("content"), "sentence-transformers/all-MiniLM-L6-v2").alias("combined_embedding")

511

)

512

.cache() # Cache results for reuse

513

)

514

515

# Use cached embeddings for multiple downstream tasks

516

similarity_results = optimized_processing.select(

517

col("id"),

518

col("combined_embedding")

519

# Additional similarity computations would use cached embeddings

520

)

521

```

522

523

## AI Provider Configuration

524

525

```python { .api }

526

class Provider:

527

"""AI model provider interface for custom model integration."""

528

529

def configure(self, **kwargs) -> None:

530

"""Configure provider with authentication and settings."""

531

532

class Embedding:

533

"""Embedding type for vector representations."""

534

535

def __init__(self, values: List[float]): ...

536

537

def similarity(self, other: "Embedding") -> float:

538

"""Compute similarity with another embedding."""

539

```

540

541

## Integration Patterns

542

543

### Combining AI Functions with DataFrames

544

```python

545

# Chain AI operations with standard DataFrame operations

546

ai_pipeline = (df

547

.filter(col("text").is_not_null())

548

.select(

549

col("id"),

550

embed_text(col("text"), "all-MiniLM-L6-v2").alias("embedding")

551

)

552

.groupby("category")

553

.agg(

554

col("embedding").list().alias("category_embeddings"),

555

col("id").count().alias("doc_count")

556

)

557

.filter(col("doc_count") > 5)

558

.collect()

559

)

560

```

561

562

### Error Handling and Fallbacks

563

```python

564

# Robust AI processing with fallbacks

565

@daft.func

566

def safe_llm_generate(text: str, model: str) -> str:

567

try:

568

# This would integrate with actual LLM API

569

return f"Generated content for: {text[:50]}..."

570

except Exception as e:

571

return "Generation failed"

572

573

robust_generation = df.select(

574

col("input_text"),

575

safe_llm_generate(col("input_text"), "gpt-3.5-turbo").alias("generated")

576

)

577

```

578

579

AI/ML functions in Daft are designed for scalable, distributed processing of AI workloads with built-in optimizations for batch processing and resource management.