Flask extension that integrates APScheduler for job scheduling with REST API management and authentication support
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive job management capabilities for creating, modifying, and controlling scheduled jobs. Supports both programmatic job creation and decorator-based job definitions with multiple trigger types.
Methods for adding and removing jobs from the scheduler.
def add_job(self, id, func, **kwargs):
"""
Add the given job to the job list and wake up the scheduler if it's already running.
Args:
id (str): Explicit identifier for the job (for modifying it later)
func: Callable (or textual reference to one) to run at the given time
**kwargs: Additional job configuration options including trigger parameters
Returns:
Job: The created job object
Trigger Options:
trigger (str): Type of trigger ("date", "interval", "cron")
Date trigger:
run_date: When to run the job
timezone: Timezone for the run_date
Interval trigger:
weeks, days, hours, minutes, seconds: Interval components
start_date: When to start the interval
end_date: When to stop the interval
timezone: Timezone for dates
Cron trigger:
year, month, day, week, day_of_week: Cron schedule fields
hour, minute, second: Time components
start_date, end_date: Date range bounds
timezone: Timezone for schedule
Job Options:
name: Human-readable name for the job
misfire_grace_time: Seconds after scheduled time job can still run
max_instances: Maximum number of concurrent job instances
coalesce: Whether to combine multiple pending executions
"""
def remove_job(self, id, jobstore=None):
"""
Remove a job, preventing it from being run any more.
Args:
id (str): The identifier of the job
jobstore (str): Alias of the job store that contains the job
"""
def remove_all_jobs(self, jobstore=None):
"""
Remove all jobs from the specified job store, or all job stores if none is given.
Args:
jobstore (str): Alias of the job store
"""Methods for retrieving job information.
def get_job(self, id, jobstore=None):
"""
Return the Job that matches the given ID.
Args:
id (str): The identifier of the job
jobstore (str): Alias of the job store that most likely contains the job
Returns:
Job: The job by the given ID, or None if it wasn't found
"""
def get_jobs(self, jobstore=None):
"""
Return a list of pending jobs (if scheduler hasn't started) and scheduled jobs.
Args:
jobstore (str): Alias of the job store
Returns:
list[Job]: List of job objects
"""Methods for modifying existing job properties and scheduling.
def modify_job(self, id, jobstore=None, **changes):
"""
Modify the properties of a single job.
Args:
id (str): The identifier of the job
jobstore (str): Alias of the job store that contains the job
**changes: Job properties to modify (same as add_job kwargs)
Returns:
Job: The modified job object
Note:
If "trigger" is in changes, the job will be rescheduled with new trigger.
All trigger parameters can be modified (run_date, interval components, cron fields).
"""Methods for controlling individual job execution state.
def pause_job(self, id, jobstore=None):
"""
Pause the given job until it is explicitly resumed.
Args:
id (str): The identifier of the job
jobstore (str): Alias of the job store that contains the job
"""
def resume_job(self, id, jobstore=None):
"""
Resume the schedule of the given job, or remove job if its schedule is finished.
Args:
id (str): The identifier of the job
jobstore (str): Alias of the job store that contains the job
"""
def run_job(self, id, jobstore=None):
"""
Run the given job without scheduling it.
Args:
id (str): The identifier of the job
jobstore (str): Alias of the job store that contains the job
Raises:
JobLookupError: If the job is not found
"""Task decorator for defining jobs using decorators.
@scheduler.task(trigger, id=None, **trigger_args)
def job_function():
"""
Decorator for scheduling functions as jobs.
Args:
trigger (str): Trigger type ("interval", "cron", "date")
id (str): Job identifier (uses function name if None)
**trigger_args: Trigger-specific arguments
Returns:
The decorated function
"""# Interval job - runs every 30 seconds
scheduler.add_job(
id="heartbeat",
func=send_heartbeat,
trigger="interval",
seconds=30,
name="System Heartbeat"
)
# Cron job - runs daily at midnight
scheduler.add_job(
id="daily_cleanup",
func=cleanup_temp_files,
trigger="cron",
hour=0,
minute=0,
name="Daily Cleanup"
)
# Date job - runs once at specific time
from datetime import datetime, timedelta
run_time = datetime.now() + timedelta(hours=1)
scheduler.add_job(
id="delayed_task",
func=process_data,
trigger="date",
run_date=run_time,
args=[data_file],
kwargs={"format": "json"}
)# Interval job using decorator
@scheduler.task("interval", id="status_check", seconds=60)
def check_system_status():
print("Checking system status...")
# Cron job using decorator
@scheduler.task("cron", id="weekly_report", day_of_week="sun", hour=9)
def generate_weekly_report():
print("Generating weekly report...")
# Job with misfire grace time
@scheduler.task("interval", id="important_task", minutes=10, misfire_grace_time=300)
def important_periodic_task():
print("Running important task...")# Modify job schedule
scheduler.modify_job("heartbeat", seconds=60) # Change from 30s to 60s
# Change cron job time
scheduler.modify_job("daily_cleanup", hour=2, minute=30) # Change to 2:30 AM
# Modify job function and arguments
scheduler.modify_job("data_processor", func=new_processor, args=[new_data])# Pause and resume jobs
scheduler.pause_job("heartbeat")
scheduler.resume_job("heartbeat")
# Run job immediately (outside normal schedule)
scheduler.run_job("daily_cleanup")
# Remove jobs
scheduler.remove_job("old_task")
scheduler.remove_all_jobs() # Remove all jobsclass Config:
JOBS = [
{
"id": "job1",
"func": "mymodule:my_function",
"args": (1, 2),
"trigger": "interval",
"seconds": 10
},
{
"id": "job2",
"func": "mymodule:another_function",
"trigger": "cron",
"day_of_week": "mon-fri",
"hour": 9,
"minute": 0
}
]
app.config.from_object(Config())
scheduler.init_app(app) # Jobs automatically loadedscheduler.add_job(
id="batch_processor",
func=process_batch,
trigger="interval",
minutes=5,
max_instances=3, # Allow up to 3 concurrent executions
coalesce=True, # Combine multiple pending executions
misfire_grace_time=60, # Allow 60 seconds past schedule time
replace_existing=True # Replace job if ID already exists
)Install with Tessl CLI
npx tessl i tessl/pypi-flask-apscheduler