A fully-featured and blazing-fast Python API client to interact with Algolia's search-as-a-service platform.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Scalable data pipeline services for importing, transforming, and synchronizing large datasets from various sources into Algolia indices. Build automated workflows to keep your search data up-to-date.
class IngestionClient:
def __init__(
self,
app_id: Optional[str] = None,
api_key: Optional[str] = None,
transporter: Optional[Transporter] = None,
config: Optional[IngestionConfig] = None
) -> None: ...
async def close(self) -> None: ...
async def set_client_api_key(self, api_key: str) -> None: ...Configure and manage data sources for ingestion pipelines.
async def list_sources(
self,
request_options: Optional[Union[dict, RequestOptions]] = None
) -> ListSourcesResponse:
"""
List all configured data sources.
Returns:
ListSourcesResponse with source configurations
"""
async def create_source(
self,
source_create: Union[SourceCreate, dict],
request_options: Optional[Union[dict, RequestOptions]] = None
) -> Source:
"""
Create a new data source.
Parameters:
- source_create: Source configuration
- request_options: Additional request options
Returns:
Source object with created source details
"""
async def get_source(
self,
source_id: str,
request_options: Optional[Union[dict, RequestOptions]] = None
) -> Source:
"""
Get details of a specific data source.
Parameters:
- source_id: Source identifier
- request_options: Additional request options
Returns:
Source object with source details
"""
async def update_source(
self,
source_id: str,
source_update: Union[SourceUpdate, dict],
request_options: Optional[Union[dict, RequestOptions]] = None
) -> Source:
"""
Update an existing data source.
Parameters:
- source_id: Source identifier
- source_update: Updated source configuration
- request_options: Additional request options
Returns:
Source object with updated details
"""
async def delete_source(
self,
source_id: str,
request_options: Optional[Union[dict, RequestOptions]] = None
) -> DeleteSourceResponse:
"""
Delete a data source.
Parameters:
- source_id: Source identifier
- request_options: Additional request options
Returns:
DeleteSourceResponse with deletion confirmation
"""Configure data transformations and processing workflows.
async def list_transformations(
self,
request_options: Optional[Union[dict, RequestOptions]] = None
) -> ListTransformationsResponse:
"""
List all data transformations.
Returns:
ListTransformationsResponse with transformation configurations
"""
async def create_transformation(
self,
transformation_create: Union[TransformationCreate, dict],
request_options: Optional[Union[dict, RequestOptions]] = None
) -> Transformation:
"""
Create a new data transformation.
Parameters:
- transformation_create: Transformation configuration
- request_options: Additional request options
Returns:
Transformation object with created transformation details
"""
async def get_transformation(
self,
transformation_id: str,
request_options: Optional[Union[dict, RequestOptions]] = None
) -> Transformation:
"""
Get details of a specific transformation.
Parameters:
- transformation_id: Transformation identifier
- request_options: Additional request options
Returns:
Transformation object with transformation details
"""
async def update_transformation(
self,
transformation_id: str,
transformation_update: Union[TransformationUpdate, dict],
request_options: Optional[Union[dict, RequestOptions]] = None
) -> Transformation:
"""
Update an existing transformation.
Parameters:
- transformation_id: Transformation identifier
- transformation_update: Updated transformation configuration
- request_options: Additional request options
Returns:
Transformation object with updated details
"""
async def delete_transformation(
self,
transformation_id: str,
request_options: Optional[Union[dict, RequestOptions]] = None
) -> dict:
"""
Delete a transformation.
Parameters:
- transformation_id: Transformation identifier
- request_options: Additional request options
Returns:
Deletion confirmation response
"""Monitor and control ingestion tasks and runs.
async def list_tasks(
self,
request_options: Optional[Union[dict, RequestOptions]] = None
) -> ListTasksResponse:
"""
List all ingestion tasks.
Returns:
ListTasksResponse with task configurations
"""
async def create_task(
self,
task_create: Union[TaskCreate, dict],
request_options: Optional[Union[dict, RequestOptions]] = None
) -> Task:
"""
Create a new ingestion task.
Parameters:
- task_create: Task configuration
- request_options: Additional request options
Returns:
Task object with created task details
"""
async def run_task(
self,
task_id: str,
request_options: Optional[Union[dict, RequestOptions]] = None
) -> RunResponse:
"""
Execute an ingestion task.
Parameters:
- task_id: Task identifier
- request_options: Additional request options
Returns:
RunResponse with execution details
"""
async def get_run(
self,
run_id: str,
request_options: Optional[Union[dict, RequestOptions]] = None
) -> Run:
"""
Get details of a specific task run.
Parameters:
- run_id: Run identifier
- request_options: Additional request options
Returns:
Run object with run details and status
"""from algoliasearch.ingestion.client import IngestionClient
# Initialize client
client = IngestionClient("YOUR_APP_ID", "YOUR_API_KEY")
# Create a CSV data source
source_response = await client.create_source({
"type": "csv",
"name": "Product Catalog CSV",
"input": {
"url": "https://example.com/products.csv",
"format": {
"delimiter": ",",
"encoding": "utf-8"
}
}
})
# Create a transformation
transformation_response = await client.create_transformation({
"name": "Product Processing",
"code": """
function transform(record) {
return {
objectID: record.id,
name: record.product_name,
price: parseFloat(record.price),
category: record.category,
description: record.description
};
}
"""
})
# Create and run an ingestion task
task_response = await client.create_task({
"sourceID": source_response.source_id,
"destinationID": "products_index",
"transformationID": transformation_response.transformation_id,
"trigger": {
"type": "schedule",
"cron": "0 2 * * *" # Daily at 2 AM
}
})
# Run the task immediately
run_response = await client.run_task(task_response.task_id)
print(f"Task run started: {run_response.run_id}")class Source(BaseModel):
source_id: str
type: str
name: str
input: dict
authentication_id: Optional[str] = None
class SourceCreate(BaseModel):
type: str
name: str
input: dict
authentication_id: Optional[str] = None
class Transformation(BaseModel):
transformation_id: str
name: str
code: str
description: Optional[str] = None
class TransformationCreate(BaseModel):
name: str
code: str
description: Optional[str] = None
class Task(BaseModel):
task_id: str
source_id: str
destination_id: str
transformation_id: Optional[str] = None
trigger: dict
enabled: bool
class TaskCreate(BaseModel):
source_id: str
destination_id: str
transformation_id: Optional[str] = None
trigger: dict
enabled: Optional[bool] = True
class Run(BaseModel):
run_id: str
task_id: str
status: str
created_at: str
started_at: Optional[str] = None
finished_at: Optional[str] = None
progress: Optional[dict] = NoneInstall with Tessl CLI
npx tessl i tessl/pypi-algoliasearch