0
# Data Ingestion
1
2
Scalable data pipeline services for importing, transforming, and synchronizing large datasets from various sources into Algolia indices. Build automated workflows to keep your search data up-to-date.
3
4
## Capabilities
5
6
### Ingestion Client
7
8
```python { .api }
9
class IngestionClient:
10
def __init__(
11
self,
12
app_id: Optional[str] = None,
13
api_key: Optional[str] = None,
14
transporter: Optional[Transporter] = None,
15
config: Optional[IngestionConfig] = None
16
) -> None: ...
17
18
async def close(self) -> None: ...
19
async def set_client_api_key(self, api_key: str) -> None: ...
20
```
21
22
### Source Management
23
24
Configure and manage data sources for ingestion pipelines.
25
26
```python { .api }
27
async def list_sources(
28
self,
29
request_options: Optional[Union[dict, RequestOptions]] = None
30
) -> ListSourcesResponse:
31
"""
32
List all configured data sources.
33
34
Returns:
35
ListSourcesResponse with source configurations
36
"""
37
38
async def create_source(
39
self,
40
source_create: Union[SourceCreate, dict],
41
request_options: Optional[Union[dict, RequestOptions]] = None
42
) -> Source:
43
"""
44
Create a new data source.
45
46
Parameters:
47
- source_create: Source configuration
48
- request_options: Additional request options
49
50
Returns:
51
Source object with created source details
52
"""
53
54
async def get_source(
55
self,
56
source_id: str,
57
request_options: Optional[Union[dict, RequestOptions]] = None
58
) -> Source:
59
"""
60
Get details of a specific data source.
61
62
Parameters:
63
- source_id: Source identifier
64
- request_options: Additional request options
65
66
Returns:
67
Source object with source details
68
"""
69
70
async def update_source(
71
self,
72
source_id: str,
73
source_update: Union[SourceUpdate, dict],
74
request_options: Optional[Union[dict, RequestOptions]] = None
75
) -> Source:
76
"""
77
Update an existing data source.
78
79
Parameters:
80
- source_id: Source identifier
81
- source_update: Updated source configuration
82
- request_options: Additional request options
83
84
Returns:
85
Source object with updated details
86
"""
87
88
async def delete_source(
89
self,
90
source_id: str,
91
request_options: Optional[Union[dict, RequestOptions]] = None
92
) -> DeleteSourceResponse:
93
"""
94
Delete a data source.
95
96
Parameters:
97
- source_id: Source identifier
98
- request_options: Additional request options
99
100
Returns:
101
DeleteSourceResponse with deletion confirmation
102
"""
103
```
104
105
### Transformation Management
106
107
Configure data transformations and processing workflows.
108
109
```python { .api }
110
async def list_transformations(
111
self,
112
request_options: Optional[Union[dict, RequestOptions]] = None
113
) -> ListTransformationsResponse:
114
"""
115
List all data transformations.
116
117
Returns:
118
ListTransformationsResponse with transformation configurations
119
"""
120
121
async def create_transformation(
122
self,
123
transformation_create: Union[TransformationCreate, dict],
124
request_options: Optional[Union[dict, RequestOptions]] = None
125
) -> Transformation:
126
"""
127
Create a new data transformation.
128
129
Parameters:
130
- transformation_create: Transformation configuration
131
- request_options: Additional request options
132
133
Returns:
134
Transformation object with created transformation details
135
"""
136
137
async def get_transformation(
138
self,
139
transformation_id: str,
140
request_options: Optional[Union[dict, RequestOptions]] = None
141
) -> Transformation:
142
"""
143
Get details of a specific transformation.
144
145
Parameters:
146
- transformation_id: Transformation identifier
147
- request_options: Additional request options
148
149
Returns:
150
Transformation object with transformation details
151
"""
152
153
async def update_transformation(
154
self,
155
transformation_id: str,
156
transformation_update: Union[TransformationUpdate, dict],
157
request_options: Optional[Union[dict, RequestOptions]] = None
158
) -> Transformation:
159
"""
160
Update an existing transformation.
161
162
Parameters:
163
- transformation_id: Transformation identifier
164
- transformation_update: Updated transformation configuration
165
- request_options: Additional request options
166
167
Returns:
168
Transformation object with updated details
169
"""
170
171
async def delete_transformation(
172
self,
173
transformation_id: str,
174
request_options: Optional[Union[dict, RequestOptions]] = None
175
) -> dict:
176
"""
177
Delete a transformation.
178
179
Parameters:
180
- transformation_id: Transformation identifier
181
- request_options: Additional request options
182
183
Returns:
184
Deletion confirmation response
185
"""
186
```
187
188
### Task and Run Management
189
190
Monitor and control ingestion tasks and runs.
191
192
```python { .api }
193
async def list_tasks(
194
self,
195
request_options: Optional[Union[dict, RequestOptions]] = None
196
) -> ListTasksResponse:
197
"""
198
List all ingestion tasks.
199
200
Returns:
201
ListTasksResponse with task configurations
202
"""
203
204
async def create_task(
205
self,
206
task_create: Union[TaskCreate, dict],
207
request_options: Optional[Union[dict, RequestOptions]] = None
208
) -> Task:
209
"""
210
Create a new ingestion task.
211
212
Parameters:
213
- task_create: Task configuration
214
- request_options: Additional request options
215
216
Returns:
217
Task object with created task details
218
"""
219
220
async def run_task(
221
self,
222
task_id: str,
223
request_options: Optional[Union[dict, RequestOptions]] = None
224
) -> RunResponse:
225
"""
226
Execute an ingestion task.
227
228
Parameters:
229
- task_id: Task identifier
230
- request_options: Additional request options
231
232
Returns:
233
RunResponse with execution details
234
"""
235
236
async def get_run(
237
self,
238
run_id: str,
239
request_options: Optional[Union[dict, RequestOptions]] = None
240
) -> Run:
241
"""
242
Get details of a specific task run.
243
244
Parameters:
245
- run_id: Run identifier
246
- request_options: Additional request options
247
248
Returns:
249
Run object with run details and status
250
"""
251
```
252
253
## Usage Example
254
255
```python
256
from algoliasearch.ingestion.client import IngestionClient
257
258
# Initialize client
259
client = IngestionClient("YOUR_APP_ID", "YOUR_API_KEY")
260
261
# Create a CSV data source
262
source_response = await client.create_source({
263
"type": "csv",
264
"name": "Product Catalog CSV",
265
"input": {
266
"url": "https://example.com/products.csv",
267
"format": {
268
"delimiter": ",",
269
"encoding": "utf-8"
270
}
271
}
272
})
273
274
# Create a transformation
275
transformation_response = await client.create_transformation({
276
"name": "Product Processing",
277
"code": """
278
function transform(record) {
279
return {
280
objectID: record.id,
281
name: record.product_name,
282
price: parseFloat(record.price),
283
category: record.category,
284
description: record.description
285
};
286
}
287
"""
288
})
289
290
# Create and run an ingestion task
291
task_response = await client.create_task({
292
"sourceID": source_response.source_id,
293
"destinationID": "products_index",
294
"transformationID": transformation_response.transformation_id,
295
"trigger": {
296
"type": "schedule",
297
"cron": "0 2 * * *" # Daily at 2 AM
298
}
299
})
300
301
# Run the task immediately
302
run_response = await client.run_task(task_response.task_id)
303
print(f"Task run started: {run_response.run_id}")
304
```
305
306
## Types
307
308
```python { .api }
309
class Source(BaseModel):
310
source_id: str
311
type: str
312
name: str
313
input: dict
314
authentication_id: Optional[str] = None
315
316
class SourceCreate(BaseModel):
317
type: str
318
name: str
319
input: dict
320
authentication_id: Optional[str] = None
321
322
class Transformation(BaseModel):
323
transformation_id: str
324
name: str
325
code: str
326
description: Optional[str] = None
327
328
class TransformationCreate(BaseModel):
329
name: str
330
code: str
331
description: Optional[str] = None
332
333
class Task(BaseModel):
334
task_id: str
335
source_id: str
336
destination_id: str
337
transformation_id: Optional[str] = None
338
trigger: dict
339
enabled: bool
340
341
class TaskCreate(BaseModel):
342
source_id: str
343
destination_id: str
344
transformation_id: Optional[str] = None
345
trigger: dict
346
enabled: Optional[bool] = True
347
348
class Run(BaseModel):
349
run_id: str
350
task_id: str
351
status: str
352
created_at: str
353
started_at: Optional[str] = None
354
finished_at: Optional[str] = None
355
progress: Optional[dict] = None
356
```