CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apify-client

Apify API client for Python providing access to web scraping and automation platform resources

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

request-queues.mddocs/

Request Queue Management

Request queue operations for managing crawling workflows and Actor communication. Request queues provide distributed, persistent storage for web crawling requests with advanced features like deduplication, prioritization, and request locking.

Capabilities

Request Queue Operations

Individual request queue management with comprehensive request handling capabilities.

class RequestQueueClient:
    def get(self) -> dict | None:
        """Get request queue information."""
    
    def update(self, *, name: str | None = None, general_access: StorageGeneralAccess | None = None) -> dict:
        """Update queue configuration.
        
        Args:
            name: Queue name
            general_access: Storage access level (from apify_shared.consts)
        """
    
    def delete(self) -> None:
        """Delete queue."""
    
    def list_head(self, *, limit: int | None = None) -> dict:
        """Get requests from queue head.
        
        Args:
            limit: Maximum number of requests to return
        """
    
    def list_and_lock_head(self, *, lock_secs: int, limit: int | None = None) -> dict:
        """Get and lock requests from head.
        
        Args:
            lock_secs: Lock duration in seconds
            limit: Maximum number of requests to return
        """
    
    def add_request(self, request: dict, *, forefront: bool | None = None) -> dict:
        """Add single request to queue.
        
        Args:
            request: Request object with url, method, headers, etc.
            forefront: Whether to add to front of queue
        """
    
    def get_request(self, request_id: str) -> dict | None:
        """Get specific request by ID.
        
        Args:
            request_id: Request identifier
        """
    
    def update_request(self, request: dict, *, forefront: bool | None = None) -> dict:
        """Update existing request.
        
        Args:
            request: Updated request object
            forefront: Whether to move to front of queue
        """
    
    def delete_request(self, request_id: str) -> None:
        """Delete request by ID.
        
        Args:
            request_id: Request identifier
        """
    
    def prolong_request_lock(
        self, 
        request_id: str, 
        *, 
        forefront: bool | None = None, 
        lock_secs: int
    ) -> dict:
        """Extend request lock duration.
        
        Args:
            request_id: Request identifier
            forefront: Whether to move to front when unlocked
            lock_secs: New lock duration in seconds
        """
    
    def delete_request_lock(self, request_id: str, *, forefront: bool | None = None) -> None:
        """Remove request lock.
        
        Args:
            request_id: Request identifier
            forefront: Whether to move to front of queue
        """
    
    def batch_add_requests(self, requests: list[dict], **kwargs) -> BatchAddRequestsResult:
        """Add multiple requests in batches.
        
        Args:
            requests: List of request objects
            forefront: Whether to add to front of queue
            **kwargs: Additional batch parameters
        """
    
    def batch_delete_requests(self, requests: list[dict]) -> dict:
        """Delete multiple requests.
        
        Args:
            requests: List of request objects with IDs
        """
    
    def list_requests(
        self, 
        *, 
        limit: int | None = None, 
        exclusive_start_id: str | None = None
    ) -> dict:
        """List all requests in queue.
        
        Args:
            limit: Maximum number of requests
            exclusive_start_id: ID to start listing from
        """
    
    def unlock_requests(self) -> dict:
        """Unlock all requests locked by this client."""

class RequestQueueClientAsync:
    """Async version of RequestQueueClient with identical methods."""

class RequestQueueCollectionClient:
    def list(self, **kwargs) -> ListPage[dict]:
        """List request queues.
        
        Args:
            unnamed (bool, optional): Include unnamed queues
            limit (int, optional): Maximum number of items
            offset (int, optional): Offset for pagination
            desc (bool, optional): Sort in descending order
        """
    
    def get_or_create(self, *, name: str | None = None) -> dict:
        """Get or create request queue.
        
        Args:
            name: Queue name
        """

class RequestQueueCollectionClientAsync:
    """Async version of RequestQueueCollectionClient with identical methods."""

Batch Operations Result Types

class BatchAddRequestsResult:
    """Result of batch add requests operation."""
    
    added_requests: list[dict]
    """Successfully added requests."""
    
    unprocessed_requests: list[dict] 
    """Requests that could not be processed."""
    
    processed_requests: int
    """Total number of processed requests."""
    
    was_limit_reached: bool
    """Whether the operation hit rate limits."""

Usage Examples

Basic Request Queue Operations

from apify_client import ApifyClient

client = ApifyClient('your-api-token')

# Create or get request queue
queue = client.request_queues().get_or_create(name='crawling-queue')
queue_client = client.request_queue(queue['id'])

# Add requests to queue
requests = [
    {
        'url': 'https://example.com',
        'method': 'GET',
        'headers': {'User-Agent': 'My Bot 1.0'}
    },
    {
        'url': 'https://example.org/api/data',
        'method': 'POST',
        'headers': {'Content-Type': 'application/json'},
        'payload': '{"query": "search term"}'
    }
]

for request in requests:
    result = queue_client.add_request(request)
    print(f"Added request: {result['id']}")

# Get requests from head (FIFO)
head_requests = queue_client.list_head(limit=10)
print(f"Got {len(head_requests['items'])} requests from queue head")

Advanced Request Processing

# Process requests with locking
queue_client = client.request_queue('queue-id', client_key='worker-1')

# Get and lock requests for processing
locked_requests = queue_client.list_and_lock_head(lock_secs=300, limit=5)

for request in locked_requests['items']:
    try:
        # Process request
        response = process_request(request)
        
        # Update request with results
        request['userData']['processed'] = True
        request['userData']['response_status'] = response.status_code
        queue_client.update_request(request)
        
        # Remove processed request
        queue_client.delete_request(request['id'])
        
    except Exception as e:
        # Extend lock if processing takes longer
        queue_client.prolong_request_lock(request['id'], lock_secs=300)
        
        # Or unlock to allow retry by other workers
        queue_client.delete_request_lock(request['id'])

Batch Request Management

# Add large number of requests efficiently
urls = [f'https://example.com/page/{i}' for i in range(1000)]

batch_requests = []
for url in urls:
    batch_requests.append({
        'url': url,
        'method': 'GET',
        'userData': {'pageNumber': i}
    })

# Add in batches
batch_size = 100
for i in range(0, len(batch_requests), batch_size):
    batch = batch_requests[i:i + batch_size]
    result = queue_client.batch_add_requests(batch)
    
    print(f"Added {result.processed_requests} requests")
    if result.was_limit_reached:
        print("Rate limit reached, waiting...")
        time.sleep(10)

Queue Monitoring and Management

# Monitor queue status
queue_info = queue_client.get()
print(f"Queue: {queue_info['name']}")
print(f"Total requests: {queue_info['totalRequestCount']}")
print(f"Handled requests: {queue_info['handledRequestCount']}")
print(f"Pending requests: {queue_info['pendingRequestCount']}")

# List all requests with pagination
all_requests = []
exclusive_start_id = None

while True:
    batch = queue_client.list_requests(
        limit=1000, 
        exclusive_start_id=exclusive_start_id
    )
    
    if not batch['items']:
        break
        
    all_requests.extend(batch['items'])
    exclusive_start_id = batch['items'][-1]['id']

print(f"Retrieved {len(all_requests)} total requests")

# Clean up: delete failed requests
failed_requests = [
    req for req in all_requests 
    if req.get('userData', {}).get('failed', False)
]

if failed_requests:
    queue_client.batch_delete_requests(failed_requests)
    print(f"Deleted {len(failed_requests)} failed requests")

Multi-Worker Coordination

# Worker coordination with client keys
worker_id = 'worker-001'
queue_client = client.request_queue('shared-queue', client_key=worker_id)

def worker_loop():
    while True:
        # Get exclusive access to requests
        locked_requests = queue_client.list_and_lock_head(
            lock_secs=600,  # 10 minute lock
            limit=3
        )
        
        if not locked_requests['items']:
            print("No requests available, waiting...")
            time.sleep(30)
            continue
            
        for request in locked_requests['items']:
            try:
                # Process request
                result = crawl_page(request['url'])
                
                # Mark as completed
                queue_client.delete_request(request['id'])
                
            except Exception as e:
                print(f"Processing failed: {e}")
                # Release lock for retry by other workers
                queue_client.delete_request_lock(request['id'])

# Start worker
worker_loop()

Install with Tessl CLI

npx tessl i tessl/pypi-apify-client

docs

actors.md

builds.md

index.md

logging.md

request-queues.md

runs.md

schedules.md

storage.md

store.md

tasks.md

users.md

webhooks.md

tile.json