CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apify-client

Apify API client for Python providing access to web scraping and automation platform resources

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

runs.mddocs/

Run Management

Actor run lifecycle management including execution monitoring, result access, and advanced operations like metamorphosis, resurrection, and real-time status tracking.

Capabilities

Run Operations

Individual run management with comprehensive lifecycle control and monitoring capabilities.

class RunClient:
    def get(self) -> dict | None:
        """Get run information including status, statistics, and metadata."""
    
    def update(
        self, 
        *, 
        status_message: str | None = None, 
        is_status_message_terminal: bool | None = None,
        general_access: RunGeneralAccess | None = None
    ) -> dict:
        """Update run configuration.
        
        Args:
            status_message: Custom status message
            is_status_message_terminal: Whether status message is final
            general_access: Run access level (from apify_shared.consts)
        """
    
    def delete(self) -> None:
        """Delete run and its associated data."""
    
    def abort(self, *, gracefully: bool | None = None) -> dict:
        """Abort running Actor run.
        
        Args:
            gracefully: Whether to allow graceful shutdown
        """
    
    def wait_for_finish(self, *, wait_secs: int | None = None) -> dict | None:
        """Wait for run completion.
        
        Args:
            wait_secs: Maximum wait time in seconds
        """
    
    def metamorph(self, *, target_actor_id: str, **kwargs) -> dict:
        """Transform run into different Actor.
        
        Args:
            target_actor_id: ID of target Actor
            build: Target build ID
            content_type: Input content type
            **kwargs: Additional metamorphosis parameters
        """
    
    def resurrect(self, **kwargs) -> dict:
        """Resurrect finished run with same configuration.
        
        Args:
            build: Build ID to use
            memory: Memory allocation
            timeout: Run timeout
            **kwargs: Additional run parameters
        """
    
    def reboot(self) -> dict:
        """Reboot running Actor run."""
    
    def dataset(self) -> DatasetClient:
        """Get default dataset client for this run."""
    
    def key_value_store(self) -> KeyValueStoreClient:
        """Get default key-value store client for this run."""
    
    def request_queue(self) -> RequestQueueClient:
        """Get default request queue client for this run."""
    
    def log(self) -> LogClient:
        """Get log client for this run."""
    
    def get_streamed_log(self, **kwargs) -> StreamedLog:
        """Get streamed log instance for real-time log monitoring.
        
        Args:
            **kwargs: Streaming configuration parameters
        """
    
    def charge(self, event_name: str, **kwargs) -> None:
        """Charge for pay-per-event run.
        
        Args:
            event_name: Name of billable event
            **kwargs: Event parameters
        """
    
    def get_status_message_watcher(self, **kwargs) -> StatusMessageWatcher:
        """Get status message watcher for real-time status updates.
        
        Args:
            **kwargs: Watcher configuration parameters
        """

class RunClientAsync:
    """Async version of RunClient with identical methods."""

class RunCollectionClient:
    def list(
        self, 
        *, 
        limit: int | None = None, 
        offset: int | None = None, 
        desc: bool | None = None,
        status: str | None = None
    ) -> ListPage[dict]:
        """List Actor runs.
        
        Args:
            limit: Maximum number of runs
            offset: Pagination offset
            desc: Sort in descending order
            status: Filter by run status
        """

class RunCollectionClientAsync:
    """Async version of RunCollectionClient with identical methods."""

Real-time Monitoring Types

class StreamedLog:
    """Real-time log streaming interface."""
    
    def __iter__(self) -> Iterator[str]:
        """Iterate over log lines in real-time."""
    
    def close(self) -> None:
        """Close log stream."""

class StatusMessageWatcher:
    """Real-time status message monitoring interface."""
    
    def __iter__(self) -> Iterator[dict]:
        """Iterate over status message updates."""
    
    def close(self) -> None:
        """Close status watcher."""

Usage Examples

Basic Run Management

from apify_client import ApifyClient

client = ApifyClient('your-api-token')

# Start Actor and get run
actor = client.actor('john-doe/web-scraper')
run = actor.start(run_input={'startUrls': ['https://example.com']})

# Monitor run progress
run_client = client.run(run['id'])
run_info = run_client.get()

print(f"Run status: {run_info['status']}")
print(f"Started at: {run_info['startedAt']}")

# Wait for completion
final_run = run_client.wait_for_finish(wait_secs=300)
if final_run:
    print(f"Run finished with status: {final_run['status']}")
    print(f"Usage: {final_run['stats']}")
else:
    print("Run timeout reached")

Advanced Run Operations

# Metamorphosis - transform running Actor into different Actor
run_client = client.run('current-run-id')

# Transform to data processing Actor
new_run = run_client.metamorph(
    target_actor_id='data-processor/clean-data',
    run_input={'source': 'transformed_data'}
)

print(f"Metamorphosed to new run: {new_run['id']}")

# Resurrect failed run
failed_run = client.run('failed-run-id')
resurrected = failed_run.resurrect(
    memory=4096,  # Increase memory
    timeout=7200  # Increase timeout
)

print(f"Resurrected run: {resurrected['id']}")

Real-time Monitoring

import time
from datetime import datetime

# Real-time log monitoring
run_client = client.run('active-run-id')

# Stream logs in real-time
with run_client.get_streamed_log() as log_stream:
    for log_line in log_stream:
        timestamp = datetime.now().strftime('%H:%M:%S')
        print(f"[{timestamp}] {log_line.strip()}")
        
        # Break on certain conditions
        if 'ERROR' in log_line:
            print("Error detected, investigating...")
            break

# Monitor status messages
with run_client.get_status_message_watcher() as status_watcher:
    for status_update in status_watcher:
        print(f"Status: {status_update['status']}")
        print(f"Message: {status_update.get('statusMessage', 'No message')}")
        
        if status_update['status'] in ['SUCCEEDED', 'FAILED', 'ABORTED']:
            break

Data Access and Processing

# Access run's default storage
run_client = client.run('completed-run-id')

# Get results from default dataset
dataset = run_client.dataset()
items = dataset.list_items()

print(f"Run produced {items.count} items")

# Process results
for item in items.items:
    # Process each scraped item
    process_scraped_data(item)

# Access metadata from key-value store
kvs = run_client.key_value_store()
input_data = kvs.get_record('INPUT')
output_data = kvs.get_record('OUTPUT')

print(f"Input: {input_data}")
print(f"Output: {output_data}")

# Get screenshot if available
screenshot = kvs.get_record_as_bytes('SCREENSHOT')
if screenshot:
    with open('run_screenshot.png', 'wb') as f:
        f.write(screenshot)

Run Analytics and Billing

# Monitor multiple runs
runs = client.runs().list(limit=100)

total_compute_units = 0
total_data_transfer = 0

for run in runs.items:
    run_client = client.run(run['id'])
    run_details = run_client.get()
    
    if run_details and run_details.get('stats'):
        stats = run_details['stats']
        total_compute_units += stats.get('computeUnits', 0)
        total_data_transfer += stats.get('dataTransfer', 0)
        
        print(f"Run {run['id']}: {stats.get('computeUnits', 0)} CU")

print(f"Total compute units: {total_compute_units}")
print(f"Total data transfer: {total_data_transfer} MB")

# Charge for pay-per-event run
pay_per_event_run = client.run('pay-per-event-run-id')

# Charge for custom events
pay_per_event_run.charge('api_call', count=150)
pay_per_event_run.charge('data_extraction', items=500)
pay_per_event_run.charge('image_processing', images=25)

Error Handling and Recovery

# Graceful run management with error handling
def manage_run_lifecycle(actor_id, run_input):
    client = ApifyClient('your-api-token')
    
    try:
        # Start run
        run = client.actor(actor_id).start(run_input=run_input)
        run_client = client.run(run['id'])
        
        # Monitor with timeout
        result = run_client.wait_for_finish(wait_secs=1800)  # 30 minutes
        
        if not result:
            # Timeout reached, abort gracefully
            print("Run timeout, aborting...")
            run_client.abort(gracefully=True)
            return None
            
        if result['status'] == 'FAILED':
            # Try resurrection with more resources
            print("Run failed, attempting resurrection...")
            resurrected = run_client.resurrect(
                memory=8192,
                timeout=3600
            )
            return resurrected
            
        return result
        
    except Exception as e:
        print(f"Run management error: {e}")
        # Clean up if needed
        try:
            run_client.abort()
        except:
            pass
        return None

# Use the function
result = manage_run_lifecycle('my-actor', {'url': 'https://example.com'})
if result:
    print(f"Run completed successfully: {result['id']}")

Install with Tessl CLI

npx tessl i tessl/pypi-apify-client

docs

actors.md

builds.md

index.md

logging.md

request-queues.md

runs.md

schedules.md

storage.md

store.md

tasks.md

users.md

webhooks.md

tile.json