Apify API client for Python providing access to web scraping and automation platform resources
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Task creation and management for reusable Actor configurations and scheduled executions. Tasks provide a way to save Actor configurations and inputs for repeated use.
class TaskClient:
def get(self) -> dict | None:
"""Get task information."""
def update(self, **kwargs) -> dict:
"""Update task configuration.
Args:
name (str, optional): Task name
actor_id (str, optional): Associated Actor ID
input (dict, optional): Default task input
options (dict, optional): Task execution options
**kwargs: Additional task parameters
"""
def delete(self) -> None:
"""Delete task."""
def start(self, **kwargs) -> dict:
"""Start task and return run object.
Args:
build (str, optional): Build to use
memory (int, optional): Memory allocation
timeout (int, optional): Timeout in seconds
**kwargs: Additional run parameters
"""
def call(self, **kwargs) -> dict | None:
"""Start task and wait for completion.
Args:
wait_secs (int, optional): Maximum wait time
**kwargs: Parameters passed to start()
"""
def get_input(self) -> dict | None:
"""Get default task input."""
def update_input(self, *, task_input: dict) -> dict:
"""Update default task input.
Args:
task_input: New input configuration
"""
def runs(self) -> RunCollectionClient:
"""Get task runs collection."""
def last_run(self, **kwargs) -> RunClient:
"""Get last task run client.
Args:
status (str, optional): Filter by run status
origin (str, optional): Filter by run origin
"""
def webhooks(self) -> WebhookCollectionClient:
"""Get task webhooks collection."""
class TaskClientAsync:
"""Async version of TaskClient with identical methods."""
class TaskCollectionClient:
def list(
self,
*,
limit: int | None = None,
offset: int | None = None,
desc: bool | None = None
) -> ListPage[dict]:
"""List tasks.
Args:
limit: Maximum number of tasks
offset: Pagination offset
desc: Sort in descending order
"""
def create(self, **kwargs) -> dict:
"""Create new task.
Args:
actor_id (str): Actor ID to create task for
name (str): Task name
input (dict, optional): Default input
options (dict, optional): Task options
**kwargs: Additional task configuration
"""
class TaskCollectionClientAsync:
"""Async version of TaskCollectionClient with identical methods."""from apify_client import ApifyClient
client = ApifyClient('your-api-token')
# Create reusable task
task = client.tasks().create(
actor_id='web-scraper-actor-id',
name='Daily Website Scraping',
input={
'startUrls': ['https://example.com/products'],
'maxPages': 100,
'outputFormat': 'json'
},
options={
'build': 'latest',
'memory': 2048,
'timeout': 3600
}
)
# Execute task
task_client = client.task(task['id'])
run = task_client.call()
print(f"Task execution completed: {run['status']}")Install with Tessl CLI
npx tessl i tessl/pypi-apify-client