Firebase Admin Python SDK enables server-side Python developers to integrate Firebase services into their applications from privileged environments.
—
Firebase Cloud Functions integration for enqueueing tasks in Cloud Task queues associated with callable functions. Enables server-side task scheduling and background processing.
Get task queue instances for enqueueing background tasks to Cloud Functions.
def task_queue(function_name, extension_id=None, app=None):
"""
Get a TaskQueue instance for the specified function.
Args:
function_name: Name of the Cloud Function to target
extension_id: Extension ID for extension functions (optional)
app: Firebase app instance (optional)
Returns:
TaskQueue: Task queue instance for the specified function
"""Enqueue and manage tasks for execution by Cloud Functions.
class TaskQueue:
"""Task queue for a specific Cloud Function."""
def enqueue(self, task_data, opts=None):
"""
Enqueue a task for execution by the Cloud Function.
Args:
task_data: JSON-serializable data to pass to the function
opts: TaskOptions instance for task configuration (optional)
Returns:
str: Task ID for the enqueued task
"""
def delete(self, task_id):
"""
Delete a task from the queue before it executes.
Args:
task_id: ID of the task to delete
"""
class TaskOptions:
"""Configuration options for task enqueueing."""
def __init__(self, schedule_time=None, dispatch_deadline=None, task_id=None):
"""
Initialize task options.
Args:
schedule_time: When to execute the task (datetime, optional)
dispatch_deadline: Maximum execution time (timedelta, optional)
task_id: Custom task ID (string, optional)
"""from firebase_admin import functions
import datetime
# Get task queue for a function
queue = functions.task_queue('processOrder')
# Enqueue a task with data
task_data = {
'order_id': 'order_123',
'user_id': 'user_456',
'items': [
{'product_id': 'prod_1', 'quantity': 2},
{'product_id': 'prod_2', 'quantity': 1}
]
}
task_id = queue.enqueue(task_data)
print(f'Enqueued task: {task_id}')from firebase_admin import functions
from datetime import datetime, timedelta
# Schedule task for future execution
queue = functions.task_queue('sendReminders')
# Execute task in 1 hour
schedule_time = datetime.utcnow() + timedelta(hours=1)
opts = functions.TaskOptions(schedule_time=schedule_time)
task_data = {
'reminder_type': 'payment_due',
'user_id': 'user_789',
'due_date': '2024-01-15'
}
task_id = queue.enqueue(task_data, opts=opts)
print(f'Scheduled task: {task_id} for {schedule_time}')from firebase_admin import functions
from datetime import datetime, timedelta
queue = functions.task_queue('processLargeFile')
# Configure task with custom options
opts = functions.TaskOptions(
schedule_time=datetime.utcnow() + timedelta(minutes=30),
dispatch_deadline=timedelta(minutes=10), # Max 10 minutes to complete
task_id='file_process_custom_id_123' # Custom task ID
)
task_data = {
'file_url': 'gs://my-bucket/large-file.csv',
'processing_options': {
'format': 'csv',
'delimiter': ',',
'headers': True
}
}
task_id = queue.enqueue(task_data, opts=opts)
print(f'Configured task: {task_id}')# For Firebase Extension functions
queue = functions.task_queue(
function_name='processImage',
extension_id='firebase/storage-resize-images'
)
task_data = {
'image_path': 'uploads/photo.jpg',
'sizes': [100, 300, 600]
}
task_id = queue.enqueue(task_data)# Enqueue task and potentially cancel it
queue = functions.task_queue('longRunningProcess')
task_data = {'data_to_process': 'large_dataset_id'}
task_id = queue.enqueue(task_data)
# Later, if needed, delete the task before it executes
try:
queue.delete(task_id)
print(f'Cancelled task: {task_id}')
except Exception as e:
print(f'Could not cancel task: {e}')
# Task may have already started or completed# Enqueue multiple related tasks
queue = functions.task_queue('processUserData')
users_to_process = ['user1', 'user2', 'user3', 'user4']
task_ids = []
for user_id in users_to_process:
task_data = {
'user_id': user_id,
'operation': 'update_profile',
'batch_id': 'batch_001'
}
task_id = queue.enqueue(task_data)
task_ids.append(task_id)
print(f'Enqueued {len(task_ids)} tasks for batch processing')from firebase_admin import functions
from firebase_admin.exceptions import FirebaseError
try:
queue = functions.task_queue('nonexistentFunction')
task_id = queue.enqueue({'data': 'test'})
except FirebaseError as e:
print(f'Failed to enqueue task: {e}')
except Exception as e:
print(f'Unexpected error: {e}')
# Handle task deletion errors
try:
queue.delete('invalid_task_id')
except FirebaseError as e:
if 'NOT_FOUND' in str(e):
print('Task not found or already completed')
else:
print(f'Error deleting task: {e}')The Cloud Functions that receive these tasks should be implemented to handle the enqueued data:
// Example Cloud Function (Node.js)
const functions = require('firebase-functions');
exports.processOrder = functions.tasks.taskQueue({
retryConfig: {
maxAttempts: 3,
minBackoffSeconds: 60,
},
rateLimits: {
maxConcurrentDispatches: 10,
},
}).onDispatch(async (data) => {
const { order_id, user_id, items } = data;
// Process the order
console.log(`Processing order ${order_id} for user ${user_id}`);
// Your order processing logic here
for (const item of items) {
console.log(`Processing item: ${item.product_id}, quantity: ${item.quantity}`);
}
return { success: true, processed_at: new Date().toISOString() };
});Task queues can be configured with various options in the Cloud Function:
// Advanced task queue configuration
exports.processLargeFile = functions.tasks.taskQueue({
retryConfig: {
maxAttempts: 5,
minBackoffSeconds: 120,
maxBackoffSeconds: 3600,
maxDoublings: 3,
},
rateLimits: {
maxConcurrentDispatches: 5,
maxDispatchesPerSecond: 1,
},
invoker: ['service-account@project.iam.gserviceaccount.com'],
}).onDispatch(async (data) => {
// Handle large file processing
const { file_url, processing_options } = data;
// Processing logic here
});Monitor task queue performance through Firebase Console:
import logging
from firebase_admin import functions
# Enable detailed logging
logging.basicConfig(level=logging.DEBUG)
# Enqueue task with debugging info
queue = functions.task_queue('debugFunction')
task_data = {
'debug_info': {
'enqueued_at': datetime.utcnow().isoformat(),
'source': 'admin_script',
'version': '1.0'
},
'actual_data': {'key': 'value'}
}
task_id = queue.enqueue(task_data)
logging.info(f'Debug task enqueued: {task_id}')class TaskQueue:
"""Task queue for a specific Cloud Function."""
def enqueue(self, task_data, opts=None):
"""Enqueue a task for execution."""
def delete(self, task_id):
"""Delete a task from the queue."""
class TaskOptions:
"""Configuration options for task enqueueing."""
def __init__(self, schedule_time=None, dispatch_deadline=None, task_id=None):
"""
Initialize task options.
Args:
schedule_time: When to execute the task (datetime)
dispatch_deadline: Maximum execution time (timedelta)
task_id: Custom task ID (string)
"""Install with Tessl CLI
npx tessl i tessl/pypi-firebase-admin