0
# Core Framework
1
2
Essential framework components for building pipelines, managing data flow, creating custom components, and handling serialization. The core framework provides the foundation for all Haystack functionality.
3
4
## Capabilities
5
6
### Pipeline Management
7
8
Create and orchestrate data flows between components using directed acyclic graphs (DAGs).
9
10
```python { .api }
11
class Pipeline:
12
def __init__(
13
self,
14
metadata: Optional[Dict[str, Any]] = None,
15
max_runs_per_component: int = 100,
16
connection_type_validation: bool = True
17
) -> None:
18
"""
19
Initialize a new Pipeline.
20
21
Args:
22
metadata: Arbitrary dictionary to store metadata about this Pipeline
23
max_runs_per_component: Maximum number of times a component can run in a single pipeline execution
24
connection_type_validation: Whether the pipeline will validate the types of the connections
25
"""
26
27
def add_component(self, name: str, instance: Any) -> None:
28
"""
29
Add a component to the pipeline.
30
31
Args:
32
name: Unique name for the component
33
instance: Component instance to add
34
"""
35
36
def connect(self, sender: str, receiver: str) -> None:
37
"""
38
Connect components in the pipeline.
39
40
Args:
41
sender: Output socket in format "component_name.output_name"
42
receiver: Input socket in format "component_name.input_name"
43
"""
44
45
def run(
46
self,
47
data: Dict[str, Any],
48
include_outputs_from: Optional[Set[str]] = None,
49
*,
50
break_point: Optional[Union[Breakpoint, AgentBreakpoint]] = None,
51
pipeline_snapshot: Optional[PipelineSnapshot] = None
52
) -> Dict[str, Any]:
53
"""
54
Run the pipeline with given input data.
55
56
Args:
57
data: Input data for pipeline components
58
include_outputs_from: Set of component names to include outputs from
59
break_point: Breakpoint configuration for debugging
60
pipeline_snapshot: Pipeline snapshot for resuming execution
61
62
Returns:
63
Dictionary containing outputs from all components
64
"""
65
66
def draw(
67
self,
68
*,
69
path: Path,
70
server_url: str = "https://mermaid.ink",
71
params: Optional[Dict] = None
72
) -> None:
73
"""
74
Draw a visual representation of the pipeline.
75
76
Args:
77
path: File path to save the visualization (required)
78
server_url: Mermaid server URL for rendering
79
params: Additional parameters for the rendering
80
"""
81
82
class AsyncPipeline:
83
def __init__(
84
self,
85
metadata: Optional[Dict[str, Any]] = None,
86
max_runs_per_component: int = 100,
87
connection_type_validation: bool = True
88
) -> None:
89
"""Initialize an asynchronous Pipeline."""
90
91
def add_component(self, name: str, instance: Any) -> None:
92
"""Add a component to the async pipeline."""
93
94
def connect(self, sender: str, receiver: str) -> None:
95
"""Connect components in the async pipeline."""
96
97
async def run(
98
self,
99
data: Dict[str, Any],
100
include_outputs_from: Optional[Set[str]] = None,
101
*,
102
break_point: Optional[Union[Breakpoint, AgentBreakpoint]] = None,
103
pipeline_snapshot: Optional[PipelineSnapshot] = None
104
) -> Dict[str, Any]:
105
"""
106
Run the async pipeline with given input data.
107
108
Args:
109
data: Input data for pipeline components
110
include_outputs_from: Set of component names to include outputs from
111
break_point: Breakpoint configuration for debugging
112
pipeline_snapshot: Pipeline snapshot for resuming execution
113
114
Returns:
115
Dictionary containing outputs from all components
116
"""
117
118
class PredefinedPipeline:
119
@classmethod
120
def from_template(cls, template_name: str, **kwargs) -> Pipeline:
121
"""
122
Create a pipeline from a predefined template.
123
124
Args:
125
template_name: Name of the template to use
126
**kwargs: Template-specific configuration
127
128
Returns:
129
Configured Pipeline instance
130
"""
131
```
132
133
### Component System
134
135
Build custom components and integrate them into pipelines.
136
137
```python { .api }
138
@component
139
def custom_component(input_param: str) -> Dict[str, Any]:
140
"""
141
Decorator to create a Haystack component from a function.
142
143
The decorated function becomes a runnable component that can be added to pipelines.
144
145
Args:
146
input_param: Example input parameter
147
148
Returns:
149
Dictionary with component outputs
150
"""
151
152
class Component:
153
"""Base class for all Haystack components."""
154
155
def run(self, **kwargs) -> Dict[str, Any]:
156
"""
157
Execute the component logic.
158
159
Args:
160
**kwargs: Component input parameters
161
162
Returns:
163
Dictionary containing component outputs
164
"""
165
166
def to_dict(self) -> Dict[str, Any]:
167
"""Serialize component to dictionary."""
168
169
@classmethod
170
def from_dict(cls, data: Dict[str, Any]) -> "Component":
171
"""Deserialize component from dictionary."""
172
173
class SuperComponent:
174
"""Advanced component base class with additional capabilities."""
175
176
def run(self, **kwargs) -> Dict[str, Any]:
177
"""Execute the super component logic."""
178
179
@super_component
180
def advanced_component() -> None:
181
"""Decorator for creating super components."""
182
```
183
184
### Data Classes
185
186
Core data structures that flow between components in pipelines.
187
188
```python { .api }
189
class Document:
190
def __init__(
191
self,
192
id: str = "",
193
content: Optional[str] = None,
194
blob: Optional[ByteStream] = None,
195
meta: Dict[str, Any] = None,
196
score: Optional[float] = None,
197
embedding: Optional[List[float]] = None,
198
sparse_embedding: Optional[SparseEmbedding] = None
199
) -> None:
200
"""
201
Initialize a Document.
202
203
Args:
204
id: Unique document identifier
205
content: Text content of the document
206
blob: Binary data associated with the document
207
meta: Metadata dictionary
208
score: Relevance score (used in retrieval)
209
embedding: Vector embedding of the document
210
sparse_embedding: Sparse vector representation of the document
211
"""
212
213
id: str
214
content: Optional[str]
215
blob: Optional[ByteStream]
216
meta: Dict[str, Any]
217
score: Optional[float]
218
embedding: Optional[List[float]]
219
sparse_embedding: Optional[SparseEmbedding]
220
221
def to_dict(self) -> Dict[str, Any]:
222
"""Convert document to dictionary."""
223
224
@classmethod
225
def from_dict(cls, data: Dict[str, Any]) -> "Document":
226
"""Create document from dictionary."""
227
228
class Answer:
229
"""Protocol for answer types."""
230
query: str
231
data: str
232
meta: Dict[str, Any]
233
234
class GeneratedAnswer:
235
def __init__(
236
self,
237
data: str,
238
query: str = "",
239
documents: List[Document] = None,
240
meta: Dict[str, Any] = None
241
) -> None:
242
"""
243
Initialize a GeneratedAnswer.
244
245
Args:
246
data: Generated answer text
247
query: Original query
248
documents: Source documents used for generation
249
meta: Additional metadata
250
"""
251
252
data: str
253
query: str
254
documents: List[Document]
255
meta: Dict[str, Any]
256
257
class ExtractedAnswer:
258
def __init__(
259
self,
260
query: str,
261
score: Optional[float] = None,
262
data: str = "",
263
document: Optional[Document] = None,
264
context: Optional[str] = None,
265
offsets_in_document: List[Span] = None,
266
offsets_in_context: List[Span] = None,
267
meta: Dict[str, Any] = None
268
) -> None:
269
"""
270
Initialize an ExtractedAnswer.
271
272
Args:
273
query: Original query
274
score: Confidence score
275
data: Extracted answer text
276
document: Source document
277
context: Context around the answer
278
offsets_in_document: Character offsets in original document
279
offsets_in_context: Character offsets in context
280
meta: Additional metadata
281
"""
282
283
query: str
284
score: Optional[float]
285
data: str
286
document: Optional[Document]
287
context: Optional[str]
288
offsets_in_document: List[Span]
289
offsets_in_context: List[Span]
290
meta: Dict[str, Any]
291
```
292
293
### Serialization
294
295
Handle serialization and deserialization of components and data structures.
296
297
```python { .api }
298
def default_to_dict(obj: Any) -> Dict[str, Any]:
299
"""
300
Default serialization function for Haystack objects.
301
302
Args:
303
obj: Object to serialize
304
305
Returns:
306
Dictionary representation of the object
307
"""
308
309
def default_from_dict(data: Dict[str, Any]) -> Any:
310
"""
311
Default deserialization function for Haystack objects.
312
313
Args:
314
data: Dictionary to deserialize
315
316
Returns:
317
Deserialized object
318
"""
319
```
320
321
### Error Handling
322
323
Handle component and pipeline execution errors.
324
325
```python { .api }
326
class ComponentError(Exception):
327
"""Exception raised when a component encounters an error."""
328
329
def __init__(self, message: str, component_name: str = None) -> None:
330
"""
331
Initialize ComponentError.
332
333
Args:
334
message: Error description
335
component_name: Name of the component that raised the error
336
"""
337
338
class DeserializationError(Exception):
339
"""Exception raised during object deserialization."""
340
341
def __init__(self, message: str, data: Dict[str, Any] = None) -> None:
342
"""
343
Initialize DeserializationError.
344
345
Args:
346
message: Error description
347
data: Data that failed to deserialize
348
"""
349
```
350
351
## Usage Examples
352
353
### Creating a Custom Component
354
355
```python
356
from haystack import component
357
358
@component
359
def text_processor(text: str, operation: str = "upper") -> Dict[str, str]:
360
"""Process text with specified operation."""
361
if operation == "upper":
362
processed_text = text.upper()
363
elif operation == "lower":
364
processed_text = text.lower()
365
else:
366
processed_text = text
367
368
return {"processed_text": processed_text}
369
370
# Use in pipeline
371
from haystack import Pipeline
372
373
pipeline = Pipeline()
374
pipeline.add_component("processor", text_processor)
375
376
result = pipeline.run({"processor": {"text": "Hello World", "operation": "upper"}})
377
print(result["processor"]["processed_text"]) # "HELLO WORLD"
378
```
379
380
### Building a Simple Pipeline
381
382
```python
383
from haystack import Pipeline
384
from haystack.components.builders import PromptBuilder
385
from haystack.components.generators import OpenAIGenerator
386
387
# Create pipeline
388
pipeline = Pipeline()
389
390
# Add components
391
pipeline.add_component("prompt_builder", PromptBuilder(template="Answer: {{question}}"))
392
pipeline.add_component("generator", OpenAIGenerator())
393
394
# Connect components
395
pipeline.connect("prompt_builder.prompt", "generator.prompt")
396
397
# Run pipeline
398
result = pipeline.run({
399
"prompt_builder": {"question": "What is Python?"}
400
})
401
402
print(result["generator"]["replies"][0])
403
```
404
405
## Types
406
407
```python { .api }
408
from typing import Protocol, Dict, Any, List, Optional, Set, Union
409
from dataclasses import dataclass
410
from pathlib import Path
411
412
class Span:
413
start: int
414
end: int
415
416
class ByteStream:
417
"""Binary data stream for document content."""
418
data: bytes
419
420
class SparseEmbedding:
421
"""Sparse vector representation with indices and values."""
422
indices: List[int]
423
values: List[float]
424
425
class Breakpoint:
426
"""Breakpoint configuration for pipeline debugging."""
427
pass
428
429
class AgentBreakpoint:
430
"""Agent-specific breakpoint configuration."""
431
pass
432
433
class PipelineSnapshot:
434
"""Pipeline execution snapshot for resuming."""
435
pass
436
437
@dataclass
438
class ComponentInfo:
439
name: str
440
type: str
441
inputs: Dict[str, Any]
442
outputs: Dict[str, Any]
443
```