or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

ab-testing.mdanalytics-insights.mddata-ingestion.mdindex.mdmonitoring-management.mdquery-suggestions.mdrecommendations.mdsearch-operations.md

data-ingestion.mddocs/

0

# Data Ingestion

1

2

Scalable data pipeline services for importing, transforming, and synchronizing large datasets from various sources into Algolia indices. Build automated workflows to keep your search data up-to-date.

3

4

## Capabilities

5

6

### Ingestion Client

7

8

```python { .api }

9

class IngestionClient:

10

def __init__(

11

self,

12

app_id: Optional[str] = None,

13

api_key: Optional[str] = None,

14

transporter: Optional[Transporter] = None,

15

config: Optional[IngestionConfig] = None

16

) -> None: ...

17

18

async def close(self) -> None: ...

19

async def set_client_api_key(self, api_key: str) -> None: ...

20

```

21

22

### Source Management

23

24

Configure and manage data sources for ingestion pipelines.

25

26

```python { .api }

27

async def list_sources(

28

self,

29

request_options: Optional[Union[dict, RequestOptions]] = None

30

) -> ListSourcesResponse:

31

"""

32

List all configured data sources.

33

34

Returns:

35

ListSourcesResponse with source configurations

36

"""

37

38

async def create_source(

39

self,

40

source_create: Union[SourceCreate, dict],

41

request_options: Optional[Union[dict, RequestOptions]] = None

42

) -> Source:

43

"""

44

Create a new data source.

45

46

Parameters:

47

- source_create: Source configuration

48

- request_options: Additional request options

49

50

Returns:

51

Source object with created source details

52

"""

53

54

async def get_source(

55

self,

56

source_id: str,

57

request_options: Optional[Union[dict, RequestOptions]] = None

58

) -> Source:

59

"""

60

Get details of a specific data source.

61

62

Parameters:

63

- source_id: Source identifier

64

- request_options: Additional request options

65

66

Returns:

67

Source object with source details

68

"""

69

70

async def update_source(

71

self,

72

source_id: str,

73

source_update: Union[SourceUpdate, dict],

74

request_options: Optional[Union[dict, RequestOptions]] = None

75

) -> Source:

76

"""

77

Update an existing data source.

78

79

Parameters:

80

- source_id: Source identifier

81

- source_update: Updated source configuration

82

- request_options: Additional request options

83

84

Returns:

85

Source object with updated details

86

"""

87

88

async def delete_source(

89

self,

90

source_id: str,

91

request_options: Optional[Union[dict, RequestOptions]] = None

92

) -> DeleteSourceResponse:

93

"""

94

Delete a data source.

95

96

Parameters:

97

- source_id: Source identifier

98

- request_options: Additional request options

99

100

Returns:

101

DeleteSourceResponse with deletion confirmation

102

"""

103

```

104

105

### Transformation Management

106

107

Configure data transformations and processing workflows.

108

109

```python { .api }

110

async def list_transformations(

111

self,

112

request_options: Optional[Union[dict, RequestOptions]] = None

113

) -> ListTransformationsResponse:

114

"""

115

List all data transformations.

116

117

Returns:

118

ListTransformationsResponse with transformation configurations

119

"""

120

121

async def create_transformation(

122

self,

123

transformation_create: Union[TransformationCreate, dict],

124

request_options: Optional[Union[dict, RequestOptions]] = None

125

) -> Transformation:

126

"""

127

Create a new data transformation.

128

129

Parameters:

130

- transformation_create: Transformation configuration

131

- request_options: Additional request options

132

133

Returns:

134

Transformation object with created transformation details

135

"""

136

137

async def get_transformation(

138

self,

139

transformation_id: str,

140

request_options: Optional[Union[dict, RequestOptions]] = None

141

) -> Transformation:

142

"""

143

Get details of a specific transformation.

144

145

Parameters:

146

- transformation_id: Transformation identifier

147

- request_options: Additional request options

148

149

Returns:

150

Transformation object with transformation details

151

"""

152

153

async def update_transformation(

154

self,

155

transformation_id: str,

156

transformation_update: Union[TransformationUpdate, dict],

157

request_options: Optional[Union[dict, RequestOptions]] = None

158

) -> Transformation:

159

"""

160

Update an existing transformation.

161

162

Parameters:

163

- transformation_id: Transformation identifier

164

- transformation_update: Updated transformation configuration

165

- request_options: Additional request options

166

167

Returns:

168

Transformation object with updated details

169

"""

170

171

async def delete_transformation(

172

self,

173

transformation_id: str,

174

request_options: Optional[Union[dict, RequestOptions]] = None

175

) -> dict:

176

"""

177

Delete a transformation.

178

179

Parameters:

180

- transformation_id: Transformation identifier

181

- request_options: Additional request options

182

183

Returns:

184

Deletion confirmation response

185

"""

186

```

187

188

### Task and Run Management

189

190

Monitor and control ingestion tasks and runs.

191

192

```python { .api }

193

async def list_tasks(

194

self,

195

request_options: Optional[Union[dict, RequestOptions]] = None

196

) -> ListTasksResponse:

197

"""

198

List all ingestion tasks.

199

200

Returns:

201

ListTasksResponse with task configurations

202

"""

203

204

async def create_task(

205

self,

206

task_create: Union[TaskCreate, dict],

207

request_options: Optional[Union[dict, RequestOptions]] = None

208

) -> Task:

209

"""

210

Create a new ingestion task.

211

212

Parameters:

213

- task_create: Task configuration

214

- request_options: Additional request options

215

216

Returns:

217

Task object with created task details

218

"""

219

220

async def run_task(

221

self,

222

task_id: str,

223

request_options: Optional[Union[dict, RequestOptions]] = None

224

) -> RunResponse:

225

"""

226

Execute an ingestion task.

227

228

Parameters:

229

- task_id: Task identifier

230

- request_options: Additional request options

231

232

Returns:

233

RunResponse with execution details

234

"""

235

236

async def get_run(

237

self,

238

run_id: str,

239

request_options: Optional[Union[dict, RequestOptions]] = None

240

) -> Run:

241

"""

242

Get details of a specific task run.

243

244

Parameters:

245

- run_id: Run identifier

246

- request_options: Additional request options

247

248

Returns:

249

Run object with run details and status

250

"""

251

```

252

253

## Usage Example

254

255

```python

256

from algoliasearch.ingestion.client import IngestionClient

257

258

# Initialize client

259

client = IngestionClient("YOUR_APP_ID", "YOUR_API_KEY")

260

261

# Create a CSV data source

262

source_response = await client.create_source({

263

"type": "csv",

264

"name": "Product Catalog CSV",

265

"input": {

266

"url": "https://example.com/products.csv",

267

"format": {

268

"delimiter": ",",

269

"encoding": "utf-8"

270

}

271

}

272

})

273

274

# Create a transformation

275

transformation_response = await client.create_transformation({

276

"name": "Product Processing",

277

"code": """

278

function transform(record) {

279

return {

280

objectID: record.id,

281

name: record.product_name,

282

price: parseFloat(record.price),

283

category: record.category,

284

description: record.description

285

};

286

}

287

"""

288

})

289

290

# Create and run an ingestion task

291

task_response = await client.create_task({

292

"sourceID": source_response.source_id,

293

"destinationID": "products_index",

294

"transformationID": transformation_response.transformation_id,

295

"trigger": {

296

"type": "schedule",

297

"cron": "0 2 * * *" # Daily at 2 AM

298

}

299

})

300

301

# Run the task immediately

302

run_response = await client.run_task(task_response.task_id)

303

print(f"Task run started: {run_response.run_id}")

304

```

305

306

## Types

307

308

```python { .api }

309

class Source(BaseModel):

310

source_id: str

311

type: str

312

name: str

313

input: dict

314

authentication_id: Optional[str] = None

315

316

class SourceCreate(BaseModel):

317

type: str

318

name: str

319

input: dict

320

authentication_id: Optional[str] = None

321

322

class Transformation(BaseModel):

323

transformation_id: str

324

name: str

325

code: str

326

description: Optional[str] = None

327

328

class TransformationCreate(BaseModel):

329

name: str

330

code: str

331

description: Optional[str] = None

332

333

class Task(BaseModel):

334

task_id: str

335

source_id: str

336

destination_id: str

337

transformation_id: Optional[str] = None

338

trigger: dict

339

enabled: bool

340

341

class TaskCreate(BaseModel):

342

source_id: str

343

destination_id: str

344

transformation_id: Optional[str] = None

345

trigger: dict

346

enabled: Optional[bool] = True

347

348

class Run(BaseModel):

349

run_id: str

350

task_id: str

351

status: str

352

created_at: str

353

started_at: Optional[str] = None

354

finished_at: Optional[str] = None

355

progress: Optional[dict] = None

356

```