Apify API client for Python providing access to web scraping and automation platform resources
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Actor run lifecycle management including execution monitoring, result access, and advanced operations like metamorphosis, resurrection, and real-time status tracking.
Individual run management with comprehensive lifecycle control and monitoring capabilities.
class RunClient:
def get(self) -> dict | None:
"""Get run information including status, statistics, and metadata."""
def update(
self,
*,
status_message: str | None = None,
is_status_message_terminal: bool | None = None,
general_access: RunGeneralAccess | None = None
) -> dict:
"""Update run configuration.
Args:
status_message: Custom status message
is_status_message_terminal: Whether status message is final
general_access: Run access level (from apify_shared.consts)
"""
def delete(self) -> None:
"""Delete run and its associated data."""
def abort(self, *, gracefully: bool | None = None) -> dict:
"""Abort running Actor run.
Args:
gracefully: Whether to allow graceful shutdown
"""
def wait_for_finish(self, *, wait_secs: int | None = None) -> dict | None:
"""Wait for run completion.
Args:
wait_secs: Maximum wait time in seconds
"""
def metamorph(self, *, target_actor_id: str, **kwargs) -> dict:
"""Transform run into different Actor.
Args:
target_actor_id: ID of target Actor
build: Target build ID
content_type: Input content type
**kwargs: Additional metamorphosis parameters
"""
def resurrect(self, **kwargs) -> dict:
"""Resurrect finished run with same configuration.
Args:
build: Build ID to use
memory: Memory allocation
timeout: Run timeout
**kwargs: Additional run parameters
"""
def reboot(self) -> dict:
"""Reboot running Actor run."""
def dataset(self) -> DatasetClient:
"""Get default dataset client for this run."""
def key_value_store(self) -> KeyValueStoreClient:
"""Get default key-value store client for this run."""
def request_queue(self) -> RequestQueueClient:
"""Get default request queue client for this run."""
def log(self) -> LogClient:
"""Get log client for this run."""
def get_streamed_log(self, **kwargs) -> StreamedLog:
"""Get streamed log instance for real-time log monitoring.
Args:
**kwargs: Streaming configuration parameters
"""
def charge(self, event_name: str, **kwargs) -> None:
"""Charge for pay-per-event run.
Args:
event_name: Name of billable event
**kwargs: Event parameters
"""
def get_status_message_watcher(self, **kwargs) -> StatusMessageWatcher:
"""Get status message watcher for real-time status updates.
Args:
**kwargs: Watcher configuration parameters
"""
class RunClientAsync:
"""Async version of RunClient with identical methods."""
class RunCollectionClient:
def list(
self,
*,
limit: int | None = None,
offset: int | None = None,
desc: bool | None = None,
status: str | None = None
) -> ListPage[dict]:
"""List Actor runs.
Args:
limit: Maximum number of runs
offset: Pagination offset
desc: Sort in descending order
status: Filter by run status
"""
class RunCollectionClientAsync:
"""Async version of RunCollectionClient with identical methods."""class StreamedLog:
"""Real-time log streaming interface."""
def __iter__(self) -> Iterator[str]:
"""Iterate over log lines in real-time."""
def close(self) -> None:
"""Close log stream."""
class StatusMessageWatcher:
"""Real-time status message monitoring interface."""
def __iter__(self) -> Iterator[dict]:
"""Iterate over status message updates."""
def close(self) -> None:
"""Close status watcher."""from apify_client import ApifyClient
client = ApifyClient('your-api-token')
# Start Actor and get run
actor = client.actor('john-doe/web-scraper')
run = actor.start(run_input={'startUrls': ['https://example.com']})
# Monitor run progress
run_client = client.run(run['id'])
run_info = run_client.get()
print(f"Run status: {run_info['status']}")
print(f"Started at: {run_info['startedAt']}")
# Wait for completion
final_run = run_client.wait_for_finish(wait_secs=300)
if final_run:
print(f"Run finished with status: {final_run['status']}")
print(f"Usage: {final_run['stats']}")
else:
print("Run timeout reached")# Metamorphosis - transform running Actor into different Actor
run_client = client.run('current-run-id')
# Transform to data processing Actor
new_run = run_client.metamorph(
target_actor_id='data-processor/clean-data',
run_input={'source': 'transformed_data'}
)
print(f"Metamorphosed to new run: {new_run['id']}")
# Resurrect failed run
failed_run = client.run('failed-run-id')
resurrected = failed_run.resurrect(
memory=4096, # Increase memory
timeout=7200 # Increase timeout
)
print(f"Resurrected run: {resurrected['id']}")import time
from datetime import datetime
# Real-time log monitoring
run_client = client.run('active-run-id')
# Stream logs in real-time
with run_client.get_streamed_log() as log_stream:
for log_line in log_stream:
timestamp = datetime.now().strftime('%H:%M:%S')
print(f"[{timestamp}] {log_line.strip()}")
# Break on certain conditions
if 'ERROR' in log_line:
print("Error detected, investigating...")
break
# Monitor status messages
with run_client.get_status_message_watcher() as status_watcher:
for status_update in status_watcher:
print(f"Status: {status_update['status']}")
print(f"Message: {status_update.get('statusMessage', 'No message')}")
if status_update['status'] in ['SUCCEEDED', 'FAILED', 'ABORTED']:
break# Access run's default storage
run_client = client.run('completed-run-id')
# Get results from default dataset
dataset = run_client.dataset()
items = dataset.list_items()
print(f"Run produced {items.count} items")
# Process results
for item in items.items:
# Process each scraped item
process_scraped_data(item)
# Access metadata from key-value store
kvs = run_client.key_value_store()
input_data = kvs.get_record('INPUT')
output_data = kvs.get_record('OUTPUT')
print(f"Input: {input_data}")
print(f"Output: {output_data}")
# Get screenshot if available
screenshot = kvs.get_record_as_bytes('SCREENSHOT')
if screenshot:
with open('run_screenshot.png', 'wb') as f:
f.write(screenshot)# Monitor multiple runs
runs = client.runs().list(limit=100)
total_compute_units = 0
total_data_transfer = 0
for run in runs.items:
run_client = client.run(run['id'])
run_details = run_client.get()
if run_details and run_details.get('stats'):
stats = run_details['stats']
total_compute_units += stats.get('computeUnits', 0)
total_data_transfer += stats.get('dataTransfer', 0)
print(f"Run {run['id']}: {stats.get('computeUnits', 0)} CU")
print(f"Total compute units: {total_compute_units}")
print(f"Total data transfer: {total_data_transfer} MB")
# Charge for pay-per-event run
pay_per_event_run = client.run('pay-per-event-run-id')
# Charge for custom events
pay_per_event_run.charge('api_call', count=150)
pay_per_event_run.charge('data_extraction', items=500)
pay_per_event_run.charge('image_processing', images=25)# Graceful run management with error handling
def manage_run_lifecycle(actor_id, run_input):
client = ApifyClient('your-api-token')
try:
# Start run
run = client.actor(actor_id).start(run_input=run_input)
run_client = client.run(run['id'])
# Monitor with timeout
result = run_client.wait_for_finish(wait_secs=1800) # 30 minutes
if not result:
# Timeout reached, abort gracefully
print("Run timeout, aborting...")
run_client.abort(gracefully=True)
return None
if result['status'] == 'FAILED':
# Try resurrection with more resources
print("Run failed, attempting resurrection...")
resurrected = run_client.resurrect(
memory=8192,
timeout=3600
)
return resurrected
return result
except Exception as e:
print(f"Run management error: {e}")
# Clean up if needed
try:
run_client.abort()
except:
pass
return None
# Use the function
result = manage_run_lifecycle('my-actor', {'url': 'https://example.com'})
if result:
print(f"Run completed successfully: {result['id']}")Install with Tessl CLI
npx tessl i tessl/pypi-apify-client