Apify API client for Python providing access to web scraping and automation platform resources
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Request queue operations for managing crawling workflows and Actor communication. Request queues provide distributed, persistent storage for web crawling requests with advanced features like deduplication, prioritization, and request locking.
Individual request queue management with comprehensive request handling capabilities.
class RequestQueueClient:
def get(self) -> dict | None:
"""Get request queue information."""
def update(self, *, name: str | None = None, general_access: StorageGeneralAccess | None = None) -> dict:
"""Update queue configuration.
Args:
name: Queue name
general_access: Storage access level (from apify_shared.consts)
"""
def delete(self) -> None:
"""Delete queue."""
def list_head(self, *, limit: int | None = None) -> dict:
"""Get requests from queue head.
Args:
limit: Maximum number of requests to return
"""
def list_and_lock_head(self, *, lock_secs: int, limit: int | None = None) -> dict:
"""Get and lock requests from head.
Args:
lock_secs: Lock duration in seconds
limit: Maximum number of requests to return
"""
def add_request(self, request: dict, *, forefront: bool | None = None) -> dict:
"""Add single request to queue.
Args:
request: Request object with url, method, headers, etc.
forefront: Whether to add to front of queue
"""
def get_request(self, request_id: str) -> dict | None:
"""Get specific request by ID.
Args:
request_id: Request identifier
"""
def update_request(self, request: dict, *, forefront: bool | None = None) -> dict:
"""Update existing request.
Args:
request: Updated request object
forefront: Whether to move to front of queue
"""
def delete_request(self, request_id: str) -> None:
"""Delete request by ID.
Args:
request_id: Request identifier
"""
def prolong_request_lock(
self,
request_id: str,
*,
forefront: bool | None = None,
lock_secs: int
) -> dict:
"""Extend request lock duration.
Args:
request_id: Request identifier
forefront: Whether to move to front when unlocked
lock_secs: New lock duration in seconds
"""
def delete_request_lock(self, request_id: str, *, forefront: bool | None = None) -> None:
"""Remove request lock.
Args:
request_id: Request identifier
forefront: Whether to move to front of queue
"""
def batch_add_requests(self, requests: list[dict], **kwargs) -> BatchAddRequestsResult:
"""Add multiple requests in batches.
Args:
requests: List of request objects
forefront: Whether to add to front of queue
**kwargs: Additional batch parameters
"""
def batch_delete_requests(self, requests: list[dict]) -> dict:
"""Delete multiple requests.
Args:
requests: List of request objects with IDs
"""
def list_requests(
self,
*,
limit: int | None = None,
exclusive_start_id: str | None = None
) -> dict:
"""List all requests in queue.
Args:
limit: Maximum number of requests
exclusive_start_id: ID to start listing from
"""
def unlock_requests(self) -> dict:
"""Unlock all requests locked by this client."""
class RequestQueueClientAsync:
"""Async version of RequestQueueClient with identical methods."""
class RequestQueueCollectionClient:
def list(self, **kwargs) -> ListPage[dict]:
"""List request queues.
Args:
unnamed (bool, optional): Include unnamed queues
limit (int, optional): Maximum number of items
offset (int, optional): Offset for pagination
desc (bool, optional): Sort in descending order
"""
def get_or_create(self, *, name: str | None = None) -> dict:
"""Get or create request queue.
Args:
name: Queue name
"""
class RequestQueueCollectionClientAsync:
"""Async version of RequestQueueCollectionClient with identical methods."""class BatchAddRequestsResult:
"""Result of batch add requests operation."""
added_requests: list[dict]
"""Successfully added requests."""
unprocessed_requests: list[dict]
"""Requests that could not be processed."""
processed_requests: int
"""Total number of processed requests."""
was_limit_reached: bool
"""Whether the operation hit rate limits."""from apify_client import ApifyClient
client = ApifyClient('your-api-token')
# Create or get request queue
queue = client.request_queues().get_or_create(name='crawling-queue')
queue_client = client.request_queue(queue['id'])
# Add requests to queue
requests = [
{
'url': 'https://example.com',
'method': 'GET',
'headers': {'User-Agent': 'My Bot 1.0'}
},
{
'url': 'https://example.org/api/data',
'method': 'POST',
'headers': {'Content-Type': 'application/json'},
'payload': '{"query": "search term"}'
}
]
for request in requests:
result = queue_client.add_request(request)
print(f"Added request: {result['id']}")
# Get requests from head (FIFO)
head_requests = queue_client.list_head(limit=10)
print(f"Got {len(head_requests['items'])} requests from queue head")# Process requests with locking
queue_client = client.request_queue('queue-id', client_key='worker-1')
# Get and lock requests for processing
locked_requests = queue_client.list_and_lock_head(lock_secs=300, limit=5)
for request in locked_requests['items']:
try:
# Process request
response = process_request(request)
# Update request with results
request['userData']['processed'] = True
request['userData']['response_status'] = response.status_code
queue_client.update_request(request)
# Remove processed request
queue_client.delete_request(request['id'])
except Exception as e:
# Extend lock if processing takes longer
queue_client.prolong_request_lock(request['id'], lock_secs=300)
# Or unlock to allow retry by other workers
queue_client.delete_request_lock(request['id'])# Add large number of requests efficiently
urls = [f'https://example.com/page/{i}' for i in range(1000)]
batch_requests = []
for url in urls:
batch_requests.append({
'url': url,
'method': 'GET',
'userData': {'pageNumber': i}
})
# Add in batches
batch_size = 100
for i in range(0, len(batch_requests), batch_size):
batch = batch_requests[i:i + batch_size]
result = queue_client.batch_add_requests(batch)
print(f"Added {result.processed_requests} requests")
if result.was_limit_reached:
print("Rate limit reached, waiting...")
time.sleep(10)# Monitor queue status
queue_info = queue_client.get()
print(f"Queue: {queue_info['name']}")
print(f"Total requests: {queue_info['totalRequestCount']}")
print(f"Handled requests: {queue_info['handledRequestCount']}")
print(f"Pending requests: {queue_info['pendingRequestCount']}")
# List all requests with pagination
all_requests = []
exclusive_start_id = None
while True:
batch = queue_client.list_requests(
limit=1000,
exclusive_start_id=exclusive_start_id
)
if not batch['items']:
break
all_requests.extend(batch['items'])
exclusive_start_id = batch['items'][-1]['id']
print(f"Retrieved {len(all_requests)} total requests")
# Clean up: delete failed requests
failed_requests = [
req for req in all_requests
if req.get('userData', {}).get('failed', False)
]
if failed_requests:
queue_client.batch_delete_requests(failed_requests)
print(f"Deleted {len(failed_requests)} failed requests")# Worker coordination with client keys
worker_id = 'worker-001'
queue_client = client.request_queue('shared-queue', client_key=worker_id)
def worker_loop():
while True:
# Get exclusive access to requests
locked_requests = queue_client.list_and_lock_head(
lock_secs=600, # 10 minute lock
limit=3
)
if not locked_requests['items']:
print("No requests available, waiting...")
time.sleep(30)
continue
for request in locked_requests['items']:
try:
# Process request
result = crawl_page(request['url'])
# Mark as completed
queue_client.delete_request(request['id'])
except Exception as e:
print(f"Processing failed: {e}")
# Release lock for retry by other workers
queue_client.delete_request_lock(request['id'])
# Start worker
worker_loop()Install with Tessl CLI
npx tessl i tessl/pypi-apify-client