or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

agent-framework.mdcore-framework.mddocument-processing.mddocument-stores.mdevaluation.mdindex.mdprompt-building.mdretrieval.mdtext-embeddings.mdtext-generation.md

core-framework.mddocs/

0

# Core Framework

1

2

Essential framework components for building pipelines, managing data flow, creating custom components, and handling serialization. The core framework provides the foundation for all Haystack functionality.

3

4

## Capabilities

5

6

### Pipeline Management

7

8

Create and orchestrate data flows between components using directed acyclic graphs (DAGs).

9

10

```python { .api }

11

class Pipeline:

12

def __init__(

13

self,

14

metadata: Optional[Dict[str, Any]] = None,

15

max_runs_per_component: int = 100,

16

connection_type_validation: bool = True

17

) -> None:

18

"""

19

Initialize a new Pipeline.

20

21

Args:

22

metadata: Arbitrary dictionary to store metadata about this Pipeline

23

max_runs_per_component: Maximum number of times a component can run in a single pipeline execution

24

connection_type_validation: Whether the pipeline will validate the types of the connections

25

"""

26

27

def add_component(self, name: str, instance: Any) -> None:

28

"""

29

Add a component to the pipeline.

30

31

Args:

32

name: Unique name for the component

33

instance: Component instance to add

34

"""

35

36

def connect(self, sender: str, receiver: str) -> None:

37

"""

38

Connect components in the pipeline.

39

40

Args:

41

sender: Output socket in format "component_name.output_name"

42

receiver: Input socket in format "component_name.input_name"

43

"""

44

45

def run(

46

self,

47

data: Dict[str, Any],

48

include_outputs_from: Optional[Set[str]] = None,

49

*,

50

break_point: Optional[Union[Breakpoint, AgentBreakpoint]] = None,

51

pipeline_snapshot: Optional[PipelineSnapshot] = None

52

) -> Dict[str, Any]:

53

"""

54

Run the pipeline with given input data.

55

56

Args:

57

data: Input data for pipeline components

58

include_outputs_from: Set of component names to include outputs from

59

break_point: Breakpoint configuration for debugging

60

pipeline_snapshot: Pipeline snapshot for resuming execution

61

62

Returns:

63

Dictionary containing outputs from all components

64

"""

65

66

def draw(

67

self,

68

*,

69

path: Path,

70

server_url: str = "https://mermaid.ink",

71

params: Optional[Dict] = None

72

) -> None:

73

"""

74

Draw a visual representation of the pipeline.

75

76

Args:

77

path: File path to save the visualization (required)

78

server_url: Mermaid server URL for rendering

79

params: Additional parameters for the rendering

80

"""

81

82

class AsyncPipeline:

83

def __init__(

84

self,

85

metadata: Optional[Dict[str, Any]] = None,

86

max_runs_per_component: int = 100,

87

connection_type_validation: bool = True

88

) -> None:

89

"""Initialize an asynchronous Pipeline."""

90

91

def add_component(self, name: str, instance: Any) -> None:

92

"""Add a component to the async pipeline."""

93

94

def connect(self, sender: str, receiver: str) -> None:

95

"""Connect components in the async pipeline."""

96

97

async def run(

98

self,

99

data: Dict[str, Any],

100

include_outputs_from: Optional[Set[str]] = None,

101

*,

102

break_point: Optional[Union[Breakpoint, AgentBreakpoint]] = None,

103

pipeline_snapshot: Optional[PipelineSnapshot] = None

104

) -> Dict[str, Any]:

105

"""

106

Run the async pipeline with given input data.

107

108

Args:

109

data: Input data for pipeline components

110

include_outputs_from: Set of component names to include outputs from

111

break_point: Breakpoint configuration for debugging

112

pipeline_snapshot: Pipeline snapshot for resuming execution

113

114

Returns:

115

Dictionary containing outputs from all components

116

"""

117

118

class PredefinedPipeline:

119

@classmethod

120

def from_template(cls, template_name: str, **kwargs) -> Pipeline:

121

"""

122

Create a pipeline from a predefined template.

123

124

Args:

125

template_name: Name of the template to use

126

**kwargs: Template-specific configuration

127

128

Returns:

129

Configured Pipeline instance

130

"""

131

```

132

133

### Component System

134

135

Build custom components and integrate them into pipelines.

136

137

```python { .api }

138

@component

139

def custom_component(input_param: str) -> Dict[str, Any]:

140

"""

141

Decorator to create a Haystack component from a function.

142

143

The decorated function becomes a runnable component that can be added to pipelines.

144

145

Args:

146

input_param: Example input parameter

147

148

Returns:

149

Dictionary with component outputs

150

"""

151

152

class Component:

153

"""Base class for all Haystack components."""

154

155

def run(self, **kwargs) -> Dict[str, Any]:

156

"""

157

Execute the component logic.

158

159

Args:

160

**kwargs: Component input parameters

161

162

Returns:

163

Dictionary containing component outputs

164

"""

165

166

def to_dict(self) -> Dict[str, Any]:

167

"""Serialize component to dictionary."""

168

169

@classmethod

170

def from_dict(cls, data: Dict[str, Any]) -> "Component":

171

"""Deserialize component from dictionary."""

172

173

class SuperComponent:

174

"""Advanced component base class with additional capabilities."""

175

176

def run(self, **kwargs) -> Dict[str, Any]:

177

"""Execute the super component logic."""

178

179

@super_component

180

def advanced_component() -> None:

181

"""Decorator for creating super components."""

182

```

183

184

### Data Classes

185

186

Core data structures that flow between components in pipelines.

187

188

```python { .api }

189

class Document:

190

def __init__(

191

self,

192

id: str = "",

193

content: Optional[str] = None,

194

blob: Optional[ByteStream] = None,

195

meta: Dict[str, Any] = None,

196

score: Optional[float] = None,

197

embedding: Optional[List[float]] = None,

198

sparse_embedding: Optional[SparseEmbedding] = None

199

) -> None:

200

"""

201

Initialize a Document.

202

203

Args:

204

id: Unique document identifier

205

content: Text content of the document

206

blob: Binary data associated with the document

207

meta: Metadata dictionary

208

score: Relevance score (used in retrieval)

209

embedding: Vector embedding of the document

210

sparse_embedding: Sparse vector representation of the document

211

"""

212

213

id: str

214

content: Optional[str]

215

blob: Optional[ByteStream]

216

meta: Dict[str, Any]

217

score: Optional[float]

218

embedding: Optional[List[float]]

219

sparse_embedding: Optional[SparseEmbedding]

220

221

def to_dict(self) -> Dict[str, Any]:

222

"""Convert document to dictionary."""

223

224

@classmethod

225

def from_dict(cls, data: Dict[str, Any]) -> "Document":

226

"""Create document from dictionary."""

227

228

class Answer:

229

"""Protocol for answer types."""

230

query: str

231

data: str

232

meta: Dict[str, Any]

233

234

class GeneratedAnswer:

235

def __init__(

236

self,

237

data: str,

238

query: str = "",

239

documents: List[Document] = None,

240

meta: Dict[str, Any] = None

241

) -> None:

242

"""

243

Initialize a GeneratedAnswer.

244

245

Args:

246

data: Generated answer text

247

query: Original query

248

documents: Source documents used for generation

249

meta: Additional metadata

250

"""

251

252

data: str

253

query: str

254

documents: List[Document]

255

meta: Dict[str, Any]

256

257

class ExtractedAnswer:

258

def __init__(

259

self,

260

query: str,

261

score: Optional[float] = None,

262

data: str = "",

263

document: Optional[Document] = None,

264

context: Optional[str] = None,

265

offsets_in_document: List[Span] = None,

266

offsets_in_context: List[Span] = None,

267

meta: Dict[str, Any] = None

268

) -> None:

269

"""

270

Initialize an ExtractedAnswer.

271

272

Args:

273

query: Original query

274

score: Confidence score

275

data: Extracted answer text

276

document: Source document

277

context: Context around the answer

278

offsets_in_document: Character offsets in original document

279

offsets_in_context: Character offsets in context

280

meta: Additional metadata

281

"""

282

283

query: str

284

score: Optional[float]

285

data: str

286

document: Optional[Document]

287

context: Optional[str]

288

offsets_in_document: List[Span]

289

offsets_in_context: List[Span]

290

meta: Dict[str, Any]

291

```

292

293

### Serialization

294

295

Handle serialization and deserialization of components and data structures.

296

297

```python { .api }

298

def default_to_dict(obj: Any) -> Dict[str, Any]:

299

"""

300

Default serialization function for Haystack objects.

301

302

Args:

303

obj: Object to serialize

304

305

Returns:

306

Dictionary representation of the object

307

"""

308

309

def default_from_dict(data: Dict[str, Any]) -> Any:

310

"""

311

Default deserialization function for Haystack objects.

312

313

Args:

314

data: Dictionary to deserialize

315

316

Returns:

317

Deserialized object

318

"""

319

```

320

321

### Error Handling

322

323

Handle component and pipeline execution errors.

324

325

```python { .api }

326

class ComponentError(Exception):

327

"""Exception raised when a component encounters an error."""

328

329

def __init__(self, message: str, component_name: str = None) -> None:

330

"""

331

Initialize ComponentError.

332

333

Args:

334

message: Error description

335

component_name: Name of the component that raised the error

336

"""

337

338

class DeserializationError(Exception):

339

"""Exception raised during object deserialization."""

340

341

def __init__(self, message: str, data: Dict[str, Any] = None) -> None:

342

"""

343

Initialize DeserializationError.

344

345

Args:

346

message: Error description

347

data: Data that failed to deserialize

348

"""

349

```

350

351

## Usage Examples

352

353

### Creating a Custom Component

354

355

```python

356

from haystack import component

357

358

@component

359

def text_processor(text: str, operation: str = "upper") -> Dict[str, str]:

360

"""Process text with specified operation."""

361

if operation == "upper":

362

processed_text = text.upper()

363

elif operation == "lower":

364

processed_text = text.lower()

365

else:

366

processed_text = text

367

368

return {"processed_text": processed_text}

369

370

# Use in pipeline

371

from haystack import Pipeline

372

373

pipeline = Pipeline()

374

pipeline.add_component("processor", text_processor)

375

376

result = pipeline.run({"processor": {"text": "Hello World", "operation": "upper"}})

377

print(result["processor"]["processed_text"]) # "HELLO WORLD"

378

```

379

380

### Building a Simple Pipeline

381

382

```python

383

from haystack import Pipeline

384

from haystack.components.builders import PromptBuilder

385

from haystack.components.generators import OpenAIGenerator

386

387

# Create pipeline

388

pipeline = Pipeline()

389

390

# Add components

391

pipeline.add_component("prompt_builder", PromptBuilder(template="Answer: {{question}}"))

392

pipeline.add_component("generator", OpenAIGenerator())

393

394

# Connect components

395

pipeline.connect("prompt_builder.prompt", "generator.prompt")

396

397

# Run pipeline

398

result = pipeline.run({

399

"prompt_builder": {"question": "What is Python?"}

400

})

401

402

print(result["generator"]["replies"][0])

403

```

404

405

## Types

406

407

```python { .api }

408

from typing import Protocol, Dict, Any, List, Optional, Set, Union

409

from dataclasses import dataclass

410

from pathlib import Path

411

412

class Span:

413

start: int

414

end: int

415

416

class ByteStream:

417

"""Binary data stream for document content."""

418

data: bytes

419

420

class SparseEmbedding:

421

"""Sparse vector representation with indices and values."""

422

indices: List[int]

423

values: List[float]

424

425

class Breakpoint:

426

"""Breakpoint configuration for pipeline debugging."""

427

pass

428

429

class AgentBreakpoint:

430

"""Agent-specific breakpoint configuration."""

431

pass

432

433

class PipelineSnapshot:

434

"""Pipeline execution snapshot for resuming."""

435

pass

436

437

@dataclass

438

class ComponentInfo:

439

name: str

440

type: str

441

inputs: Dict[str, Any]

442

outputs: Dict[str, Any]

443

```