0
# AI/ML Functions
1
2
Built-in functions for AI and machine learning workflows including text embeddings, large language model generation, and model inference operations optimized for distributed processing.
3
4
## Capabilities
5
6
### Text Embeddings
7
8
Generate vector embeddings from text data for semantic search and similarity operations.
9
10
```python { .api }
11
def embed_text(
12
text: Expression,
13
*,
14
provider: str | Provider | None = None,
15
model: str | None = None,
16
**options: str,
17
) -> Expression:
18
"""
19
Generate text embeddings using specified model and provider.
20
21
Parameters:
22
- text: Text expression to embed
23
- provider: Provider to use for embedding model (e.g., "sentence_transformers", "openai")
24
- model: Model name/identifier for embedding generation
25
- options: Additional options to pass for the model
26
27
Returns:
28
Expression: Vector embedding expression
29
30
Examples:
31
>>> df.select(embed_text(col("description"), model="all-MiniLM-L6-v2"))
32
>>> df.select(embed_text(col("query"), provider="openai", model="text-embedding-ada-002"))
33
"""
34
```
35
36
### Image Embeddings
37
38
Generate vector embeddings from image data for visual similarity and search operations.
39
40
```python { .api }
41
def embed_image(
42
image: Expression,
43
*,
44
provider: str | Provider | None = None,
45
model: str | None = None,
46
**options: str,
47
) -> Expression:
48
"""
49
Generate image embeddings using specified model and provider.
50
51
Parameters:
52
- image: Image expression to embed
53
- provider: Provider to use for embedding model (e.g., "transformers", "openai")
54
- model: Model name/identifier for image embedding generation
55
- options: Additional options to pass for the model
56
57
Returns:
58
Expression: Vector embedding expression
59
60
Examples:
61
>>> df.select(embed_image(col("image"), provider="transformers", model="clip-vit-base-patch32"))
62
>>> df.select(embed_image(col("photo"), provider="openai", model="clip"))
63
"""
64
```
65
66
### Large Language Model Generation
67
68
Generate text using large language models for various NLP tasks.
69
70
```python { .api }
71
def llm_generate(
72
input_column: Expression,
73
model: str = "facebook/opt-125m",
74
provider: Literal["vllm", "openai"] = "vllm",
75
concurrency: int = 1,
76
batch_size: int | None = None,
77
num_cpus: int | None = None,
78
num_gpus: int | None = None,
79
**generation_config: dict[str, Any],
80
) -> Expression:
81
"""
82
Generate text using large language model with specified provider.
83
84
Parameters:
85
- input_column: Input text expression for generation
86
- model: Model identifier (default: "facebook/opt-125m")
87
- provider: LLM provider ("vllm" or "openai")
88
- concurrency: Number of concurrent model instances
89
- batch_size: Batch size for processing
90
- num_cpus: CPU resources to allocate
91
- num_gpus: GPU resources to allocate
92
- generation_config: Model parameters (temperature, max_tokens, etc.)
93
94
Returns:
95
Expression: Generated text expression
96
97
Examples:
98
>>> df.select(llm_generate(col("question"), model="gpt-4o", provider="openai"))
99
>>> df.select(llm_generate(col("prompt"), model="facebook/opt-125m", provider="vllm", temperature=0.7))
100
"""
101
```
102
103
### PyTorch Integration
104
105
Convert DataFrames to PyTorch datasets for deep learning workflows.
106
107
```python { .api }
108
class DataFrame:
109
def to_torch_map_dataset(
110
self,
111
shard_strategy: Optional[Literal["file"]] = None,
112
world_size: Optional[int] = None,
113
rank: Optional[int] = None,
114
) -> "torch.utils.data.Dataset":
115
"""
116
Convert DataFrame to PyTorch map-style dataset.
117
118
Parameters:
119
- shard_strategy: Strategy for distributed sharding
120
- world_size: Total number of processes for distributed training
121
- rank: Current process rank for distributed training
122
123
Returns:
124
torch.utils.data.Dataset: PyTorch map-style dataset
125
"""
126
127
def to_torch_iter_dataset(
128
self,
129
shard_strategy: Optional[Literal["file"]] = None,
130
world_size: Optional[int] = None,
131
rank: Optional[int] = None,
132
) -> "torch.utils.data.IterableDataset":
133
"""
134
Convert DataFrame to PyTorch iterable dataset.
135
136
Parameters:
137
- shard_strategy: Strategy for distributed sharding
138
- world_size: Total number of processes for distributed training
139
- rank: Current process rank for distributed training
140
141
Returns:
142
torch.utils.data.IterableDataset: PyTorch iterable dataset
143
"""
144
```
145
146
### Device Management
147
148
Utility functions for optimal device selection in AI/ML workflows.
149
150
```python { .api }
151
def get_device() -> "torch.device":
152
"""
153
Get optimal PyTorch device (CUDA GPU if available, otherwise CPU).
154
155
Returns:
156
torch.device: Optimal device for computation
157
"""
158
```
159
160
### Provider Management
161
162
Functions for managing AI/ML providers and models.
163
164
```python { .api }
165
def load_openai(name: str | None = None, **options) -> Provider:
166
"""Load OpenAI provider with configuration."""
167
168
def load_sentence_transformers(name: str | None = None, **options) -> Provider:
169
"""Load Sentence Transformers provider with configuration."""
170
171
def load_transformers(name: str | None = None, **options) -> Provider:
172
"""Load Hugging Face Transformers provider with configuration."""
173
174
def load_lm_studio(name: str | None = None, **options) -> Provider:
175
"""Load LM Studio provider with configuration."""
176
177
def load_provider(provider: str, name: str | None = None, **options) -> Provider:
178
"""Load provider by name with configuration."""
179
```
180
181
## Usage Examples
182
183
### Image Similarity and Visual Search
184
```python
185
import daft
186
from daft import col
187
from daft.functions import embed_image
188
189
# Dataset with image paths or URLs
190
images_df = daft.from_pydict({
191
"image_id": [1, 2, 3, 4],
192
"image_path": [
193
"s3://bucket/images/product1.jpg",
194
"s3://bucket/images/product2.jpg",
195
"s3://bucket/images/product3.jpg",
196
"s3://bucket/images/query.jpg"
197
]
198
})
199
200
# Load and decode images
201
decoded_images = images_df.select(
202
col("image_id"),
203
col("image_path").url.download().image.decode().alias("image")
204
)
205
206
# Generate image embeddings for visual similarity
207
embedded_images = decoded_images.select(
208
col("image_id"),
209
col("image"),
210
embed_image(col("image"), provider="transformers", model="clip-vit-base-patch32").alias("image_embedding")
211
)
212
213
# Compute visual similarity (using cosine distance)
214
query_embedding = embedded_images.filter(col("image_id") == 4).collect()[0]["image_embedding"][0]
215
216
similarities = embedded_images.select(
217
col("image_id"),
218
col("image_embedding").embedding.cosine_distance(lit(query_embedding)).alias("similarity")
219
).filter(col("image_id") != 4).sort("similarity").collect()
220
```
221
222
### PyTorch Deep Learning Integration
223
```python
224
import torch
225
from torch.utils.data import DataLoader
226
227
# Create DataFrame with image data and labels
228
training_df = daft.from_pydict({
229
"image_path": ["train/cat1.jpg", "train/dog1.jpg", "train/cat2.jpg"],
230
"label": [0, 1, 0] # 0=cat, 1=dog
231
})
232
233
# Load and preprocess images
234
processed_df = training_df.select(
235
col("image_path").url.download().image.decode().image.resize(224, 224).alias("image"),
236
col("label")
237
)
238
239
# Convert to PyTorch dataset for training
240
torch_dataset = processed_df.to_torch_map_dataset()
241
242
# Create DataLoader for training
243
dataloader = DataLoader(torch_dataset, batch_size=32, shuffle=True)
244
245
# Use in PyTorch training loop
246
for batch in dataloader:
247
images = batch["image"]
248
labels = batch["label"]
249
# Training code here...
250
```
251
252
### Text Similarity and Semantic Search
253
```python
254
import daft
255
from daft import col
256
from daft.functions import embed_text
257
258
# Create dataset with text descriptions
259
documents = daft.from_pydict({
260
"doc_id": [1, 2, 3, 4],
261
"title": ["Machine Learning", "Deep Learning", "Data Science", "Statistics"],
262
"description": [
263
"Algorithms that learn from data",
264
"Neural networks with multiple layers",
265
"Extracting insights from data",
266
"Mathematical analysis of data"
267
]
268
})
269
270
# Generate embeddings for semantic search
271
embedded_docs = documents.select(
272
col("doc_id"),
273
col("title"),
274
col("description"),
275
embed_text(col("description"), "sentence-transformers/all-MiniLM-L6-v2").alias("embedding")
276
)
277
278
# Create query embedding
279
query_df = daft.from_pydict({"query": ["artificial intelligence algorithms"]})
280
query_embedded = query_df.select(
281
embed_text(col("query"), "sentence-transformers/all-MiniLM-L6-v2").alias("query_embedding")
282
)
283
284
# Compute similarity (cosine similarity would be implemented as UDF)
285
@daft.func
286
def cosine_similarity(vec1: list, vec2: list) -> float:
287
import numpy as np
288
v1, v2 = np.array(vec1), np.array(vec2)
289
return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
290
291
# Find most similar documents
292
similarities = embedded_docs.select(
293
col("title"),
294
cosine_similarity(col("embedding"), query_embedded.collect()[0]["query_embedding"][0]).alias("similarity")
295
).collect()
296
```
297
298
### Content Generation and Augmentation
299
```python
300
from daft.functions import llm_generate
301
302
# Dataset with partial content to augment
303
content_df = daft.from_pydict({
304
"topic": ["Climate Change", "Space Exploration", "Renewable Energy"],
305
"outline": [
306
"Impact on global temperatures",
307
"Mars colonization plans",
308
"Solar and wind power efficiency"
309
]
310
})
311
312
# Generate detailed content
313
augmented_content = content_df.select(
314
col("topic"),
315
col("outline"),
316
llm_generate(
317
daft.sql_expr("'Write a detailed paragraph about: ' || outline"),
318
"gpt-3.5-turbo",
319
max_tokens=200,
320
temperature=0.7
321
).alias("detailed_content")
322
)
323
324
# Generate summaries
325
summaries = content_df.select(
326
col("topic"),
327
llm_generate(
328
daft.sql_expr("'Summarize in one sentence: ' || outline"),
329
"gpt-3.5-turbo",
330
max_tokens=50
331
).alias("summary")
332
)
333
```
334
335
### Question Answering System
336
```python
337
# FAQ dataset
338
faq_df = daft.from_pydict({
339
"category": ["Technical", "Billing", "Support", "General"],
340
"question": [
341
"How do I install the software?",
342
"What payment methods are accepted?",
343
"How to contact customer support?",
344
"What is your company mission?"
345
],
346
"context": [
347
"Installation guide and system requirements",
348
"Payment processing and billing information",
349
"Support channels and contact information",
350
"Company background and mission statement"
351
]
352
})
353
354
# Generate comprehensive answers
355
qa_responses = faq_df.select(
356
col("category"),
357
col("question"),
358
llm_generate(
359
daft.sql_expr("'Context: ' || context || '\nQuestion: ' || question || '\nAnswer:'"),
360
"gpt-4",
361
max_tokens=150,
362
temperature=0.3
363
).alias("answer")
364
)
365
```
366
367
### Text Classification and Analysis
368
```python
369
# Customer feedback dataset
370
feedback_df = daft.from_pydict({
371
"customer_id": [1, 2, 3, 4, 5],
372
"feedback": [
373
"Great product, very satisfied!",
374
"Terrible service, not recommended",
375
"Average experience, could be better",
376
"Excellent customer support team",
377
"Product quality is disappointing"
378
]
379
})
380
381
# Classify sentiment
382
sentiment_analysis = feedback_df.select(
383
col("customer_id"),
384
col("feedback"),
385
llm_generate(
386
daft.sql_expr("'Classify the sentiment of this feedback as Positive, Negative, or Neutral: ' || feedback"),
387
"gpt-3.5-turbo",
388
max_tokens=10,
389
temperature=0.1
390
).alias("sentiment"),
391
embed_text(col("feedback"), "sentence-transformers/all-MiniLM-L6-v2").alias("feedback_embedding")
392
)
393
394
# Extract key themes
395
theme_extraction = feedback_df.select(
396
col("customer_id"),
397
col("feedback"),
398
llm_generate(
399
daft.sql_expr("'Extract 3 key themes from this feedback: ' || feedback"),
400
"gpt-3.5-turbo",
401
max_tokens=50
402
).alias("themes")
403
)
404
```
405
406
### Multi-modal Content Processing
407
```python
408
# Dataset with mixed content types
409
content_df = daft.from_pydict({
410
"content_id": [1, 2, 3],
411
"content_type": ["article", "social_post", "review"],
412
"text": [
413
"Long-form article about technology trends",
414
"Short social media post #trending",
415
"Detailed product review with pros and cons"
416
]
417
})
418
419
# Generate type-specific processing
420
processed_content = content_df.select(
421
col("content_id"),
422
col("content_type"),
423
col("text"),
424
# Conditional processing based on content type
425
daft.when(col("content_type") == "article")
426
.then(llm_generate(
427
daft.sql_expr("'Create a compelling headline for this article: ' || text"),
428
"gpt-3.5-turbo"
429
))
430
.when(col("content_type") == "social_post")
431
.then(llm_generate(
432
daft.sql_expr("'Suggest 3 relevant hashtags for: ' || text"),
433
"gpt-3.5-turbo"
434
))
435
.otherwise(llm_generate(
436
daft.sql_expr("'Rate this review from 1-5 stars and explain: ' || text"),
437
"gpt-3.5-turbo"
438
))
439
.alias("processed_output")
440
)
441
```
442
443
### Batch Processing with AI Functions
444
```python
445
# Large dataset for batch AI processing
446
large_dataset = daft.read_parquet("s3://bucket/customer-reviews/*.parquet")
447
448
# Efficient batch processing
449
batch_ai_results = (large_dataset
450
.filter(col("review_text").is_not_null())
451
.select(
452
col("review_id"),
453
col("product_id"),
454
col("review_text"),
455
# Generate embeddings for similarity search
456
embed_text(col("review_text"), "all-MiniLM-L6-v2").alias("review_embedding"),
457
# Extract sentiment
458
llm_generate(
459
daft.sql_expr("'Sentiment (Positive/Negative/Neutral):' || review_text"),
460
"gpt-3.5-turbo",
461
max_tokens=10
462
).alias("sentiment"),
463
# Generate summary for long reviews
464
daft.when(daft.col("review_text").str_length() > 500)
465
.then(llm_generate(
466
daft.sql_expr("'Summarize this review in 2 sentences: ' || review_text"),
467
"gpt-3.5-turbo",
468
max_tokens=100
469
))
470
.otherwise(col("review_text"))
471
.alias("summary")
472
)
473
.repartition(10) # Distribute processing
474
.collect()
475
)
476
```
477
478
### Custom Model Integration
479
```python
480
# Using custom or specialized models
481
specialized_df = daft.from_pydict({
482
"medical_text": [
483
"Patient presents with fever and cough",
484
"Symptoms include headache and fatigue",
485
"No significant medical history reported"
486
]
487
})
488
489
# Medical text analysis with domain-specific model
490
medical_analysis = specialized_df.select(
491
col("medical_text"),
492
embed_text(col("medical_text"), "clinical-bert-base").alias("clinical_embedding"),
493
llm_generate(
494
daft.sql_expr("'Extract medical entities from: ' || medical_text"),
495
"biomedical-llm",
496
temperature=0.1
497
).alias("medical_entities")
498
)
499
```
500
501
### Performance Optimization
502
```python
503
# Optimize AI function calls for large datasets
504
optimized_processing = (large_dataset
505
.filter(col("text_length") > 10) # Pre-filter short texts
506
.repartition(20) # Distribute load
507
.select(
508
col("id"),
509
# Batch similar operations together
510
embed_text(col("title") + " " + col("content"), "sentence-transformers/all-MiniLM-L6-v2").alias("combined_embedding")
511
)
512
.cache() # Cache results for reuse
513
)
514
515
# Use cached embeddings for multiple downstream tasks
516
similarity_results = optimized_processing.select(
517
col("id"),
518
col("combined_embedding")
519
# Additional similarity computations would use cached embeddings
520
)
521
```
522
523
## AI Provider Configuration
524
525
```python { .api }
526
class Provider:
527
"""AI model provider interface for custom model integration."""
528
529
def configure(self, **kwargs) -> None:
530
"""Configure provider with authentication and settings."""
531
532
class Embedding:
533
"""Embedding type for vector representations."""
534
535
def __init__(self, values: List[float]): ...
536
537
def similarity(self, other: "Embedding") -> float:
538
"""Compute similarity with another embedding."""
539
```
540
541
## Integration Patterns
542
543
### Combining AI Functions with DataFrames
544
```python
545
# Chain AI operations with standard DataFrame operations
546
ai_pipeline = (df
547
.filter(col("text").is_not_null())
548
.select(
549
col("id"),
550
embed_text(col("text"), "all-MiniLM-L6-v2").alias("embedding")
551
)
552
.groupby("category")
553
.agg(
554
col("embedding").list().alias("category_embeddings"),
555
col("id").count().alias("doc_count")
556
)
557
.filter(col("doc_count") > 5)
558
.collect()
559
)
560
```
561
562
### Error Handling and Fallbacks
563
```python
564
# Robust AI processing with fallbacks
565
@daft.func
566
def safe_llm_generate(text: str, model: str) -> str:
567
try:
568
# This would integrate with actual LLM API
569
return f"Generated content for: {text[:50]}..."
570
except Exception as e:
571
return "Generation failed"
572
573
robust_generation = df.select(
574
col("input_text"),
575
safe_llm_generate(col("input_text"), "gpt-3.5-turbo").alias("generated")
576
)
577
```
578
579
AI/ML functions in Daft are designed for scalable, distributed processing of AI workloads with built-in optimizations for batch processing and resource management.