CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/golang-github-com-go-co-op-gocron-v2

A Golang job scheduling library that lets you run Go functions at pre-determined intervals using cron expressions, fixed durations, daily, weekly, monthly, or one-time schedules with support for distributed deployments.

Overview
Eval results
Files

scheduler-limits.mddocs/guides/concurrency/

Scheduler-Level Concurrency Limits

WithLimitConcurrentJobs and monitoring queue for global job concurrency control.

Overview

Scheduler-level concurrency limits control the total number of jobs running simultaneously across your entire scheduler, regardless of job type.

func WithLimitConcurrentJobs(limit uint, mode LimitMode) SchedulerOption

This is different from job-level singleton mode, which prevents a single job from running concurrently with itself.

Basic Usage

s, _ := gocron.NewScheduler(
    gocron.WithLimitConcurrentJobs(3, gocron.LimitModeReschedule),
)

// At most 3 jobs run concurrently across all jobs
for i := 0; i < 10; i++ {
    s.NewJob(
        gocron.DurationJob(time.Minute),
        gocron.NewTask(func(id int) {
            time.Sleep(30 * time.Second)
            fmt.Printf("Job %d complete\n", id)
        }, i),
    )
}

s.Start()

Behavior: At any given time, at most 3 jobs are running. Others wait or are rescheduled based on mode.

Limit Modes

LimitModeReschedule

Skip jobs when limit is reached:

s, _ := gocron.NewScheduler(
    gocron.WithLimitConcurrentJobs(3, gocron.LimitModeReschedule),
)

When limit reached:

  • Job scheduled time arrives
  • 3 jobs already running
  • This job is skipped
  • Rescheduled for next interval

Timeline example:

00:00 - Jobs A, B, C start (limit: 3)
00:01 - Job D scheduled → skip (limit reached)
00:30 - Job A completes
00:31 - Job D scheduled → runs (slot available)

When to use:

  • Protecting downstream services from overload
  • Resource-constrained environments
  • Non-critical jobs where skipping is acceptable
  • Rate limiting

LimitModeWait

Queue jobs when limit is reached:

s, _ := gocron.NewScheduler(
    gocron.WithLimitConcurrentJobs(3, gocron.LimitModeWait),
)

When limit reached:

  • Job scheduled time arrives
  • 3 jobs already running
  • This job is queued
  • Executes when a slot becomes available

Timeline example:

00:00 - Jobs A, B, C start (limit: 3)
00:01 - Job D scheduled → queued
00:02 - Job E scheduled → queued
00:30 - Job A completes → Job D starts immediately
00:31 - Job B completes → Job E starts immediately

When to use:

  • Every job execution is critical
  • Sequential processing acceptable
  • Can handle queue buildup

Warning: Queue can grow unbounded if jobs are scheduled faster than they complete.

Error Handling

Returns error if limit is 0:

s, err := gocron.NewScheduler(
    gocron.WithLimitConcurrentJobs(0, gocron.LimitModeReschedule),
)
if errors.Is(err, gocron.ErrWithLimitConcurrentJobsZero) {
    log.Fatal("Limit cannot be zero")
}

Monitoring Queue Size

JobsWaitingInQueue

func (s Scheduler) JobsWaitingInQueue() int

Returns the number of jobs currently queued:

s, _ := gocron.NewScheduler(
    gocron.WithLimitConcurrentJobs(3, gocron.LimitModeWait),
)

// Periodically check queue size
go func() {
    ticker := time.NewTicker(10 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        waiting := s.JobsWaitingInQueue()
        if waiting > 10 {
            log.Printf("Warning: %d jobs waiting", waiting)
        }
        if waiting > 50 {
            alerting.SendAlert("Job queue backlog", fmt.Sprintf("%d jobs waiting", waiting))
        }
    }
}()

Queue Monitoring Pattern

type QueueMonitor struct {
    scheduler gocron.Scheduler
    threshold int
    ticker    *time.Ticker
    done      chan struct{}
}

func NewQueueMonitor(s gocron.Scheduler, threshold int, interval time.Duration) *QueueMonitor {
    m := &QueueMonitor{
        scheduler: s,
        threshold: threshold,
        ticker:    time.NewTicker(interval),
        done:      make(chan struct{}),
    }
    go m.monitor()
    return m
}

func (m *QueueMonitor) monitor() {
    for {
        select {
        case <-m.done:
            return
        case <-m.ticker.C:
            waiting := m.scheduler.JobsWaitingInQueue()
            if waiting > m.threshold {
                log.Printf("Queue threshold exceeded: %d jobs waiting", waiting)
                metrics.RecordQueueSize(waiting)
            }
        }
    }
}

func (m *QueueMonitor) Stop() {
    m.ticker.Stop()
    close(m.done)
}

// Usage
monitor := NewQueueMonitor(s, 10, 10*time.Second)
defer monitor.Stop()

Combining with Job-Level Limits

Both scheduler-level and job-level limits can be set:

s, _ := gocron.NewScheduler(
    gocron.WithLimitConcurrentJobs(5, gocron.LimitModeReschedule),
)

j, _ := s.NewJob(
    gocron.DurationJob(time.Minute),
    gocron.NewTask(myFunc),
    gocron.WithSingletonMode(gocron.LimitModeWait),
)

Precedence:

  1. Scheduler-level limit applies first
  2. If scheduler has capacity, job-level limit is checked
  3. If either limit is reached, appropriate action is taken

Example scenario:

Scheduler limit: 5 concurrent jobs
Job A singleton mode: LimitModeWait

Current state:
- 4 jobs running (not Job A)
- Job A scheduled

Result:
- Scheduler has capacity (4 < 5)
- Job A singleton check passes
- Job A runs

Another scenario:

Scheduler limit: 5 concurrent jobs
Job A singleton mode: LimitModeWait

Current state:
- 5 jobs running (including one instance of Job A)
- Job A scheduled again

Result:
- Scheduler limit reached (5 = 5)
- Job queued due to scheduler limit
- Job-level singleton check not performed yet

Resource-Based Limits

CPU-Bound Jobs

import "runtime"

// Limit to number of CPU cores
cpuLimit := uint(runtime.NumCPU())
s, _ := gocron.NewScheduler(
    gocron.WithLimitConcurrentJobs(cpuLimit, gocron.LimitModeReschedule),
)

For CPU-intensive work, match concurrency to available cores.

I/O-Bound Jobs

// Higher limit for I/O-bound work
ioLimit := uint(50)
s, _ := gocron.NewScheduler(
    gocron.WithLimitConcurrentJobs(ioLimit, gocron.LimitModeWait),
)

I/O-bound jobs can have higher concurrency since they spend time waiting.

Memory-Based Limits

// Each job uses ~100MB, system has 2GB available
memLimit := uint(20) // 2GB / 100MB = 20 jobs
s, _ := gocron.NewScheduler(
    gocron.WithLimitConcurrentJobs(memLimit, gocron.LimitModeReschedule),
)

Calculate limit based on available memory and per-job memory usage.

Connection Pool Limits

// Database connection pool has 10 connections
// Reserve 5 for web requests, 5 for scheduled jobs
dbJobLimit := uint(5)
s, _ := gocron.NewScheduler(
    gocron.WithLimitConcurrentJobs(dbJobLimit, gocron.LimitModeWait),
)

Match concurrency to available connections.

Event Monitoring

ConcurrencyLimitReached

type SchedulerMonitor interface {
    ConcurrencyLimitReached(limitType string, job Job)
    // ... other methods
}

Called when a job cannot start due to scheduler limit:

type myMonitor struct{}

func (m *myMonitor) ConcurrencyLimitReached(limitType string, job gocron.Job) {
    log.Printf("Scheduler limit reached for job %s", job.Name())
    // limitType: "limit" (for scheduler-level limit)

    metrics.IncrementLimitReached(job.Name())
}

// ... implement other SchedulerMonitor methods

s, _ := gocron.NewScheduler(
    gocron.WithLimitConcurrentJobs(5, gocron.LimitModeReschedule),
    gocron.WithSchedulerMonitor(&myMonitor{}),
)

See Observability: Lifecycle Monitoring for details.

Advanced Patterns

Dynamic Concurrency Adjustment

Concurrency limit cannot be changed after creation. To adjust, recreate the scheduler:

func adjustConcurrency(currentScheduler gocron.Scheduler, newLimit uint) (gocron.Scheduler, error) {
    // Save job definitions (you need to store these separately)
    jobs := currentScheduler.Jobs()

    // Shutdown old scheduler gracefully
    if err := currentScheduler.Shutdown(); err != nil {
        return nil, err
    }

    // Create new scheduler with new limit
    newScheduler, err := gocron.NewScheduler(
        gocron.WithLimitConcurrentJobs(newLimit, gocron.LimitModeReschedule),
    )
    if err != nil {
        return nil, err
    }

    // Re-register jobs (requires storing job definitions)
    for _, job := range jobs {
        // You need to reconstruct job definition and task
        // This is why dynamic adjustment is complex
    }

    newScheduler.Start()
    return newScheduler, nil
}

Better approach: Design initial limit conservatively or use multiple schedulers.

Multiple Schedulers for Priority

Separate high and low priority jobs:

// High-priority scheduler (no limit)
highPriority, _ := gocron.NewScheduler()
defer highPriority.Shutdown()

highPriority.NewJob(
    gocron.DurationJob(time.Minute),
    gocron.NewTask(criticalTask),
    gocron.WithTags("critical"),
)

// Low-priority scheduler (limited to 3)
lowPriority, _ := gocron.NewScheduler(
    gocron.WithLimitConcurrentJobs(3, gocron.LimitModeReschedule),
)
defer lowPriority.Shutdown()

lowPriority.NewJob(
    gocron.DurationJob(5*time.Minute),
    gocron.NewTask(backgroundTask),
    gocron.WithTags("background"),
)

highPriority.Start()
lowPriority.Start()

Adaptive Limits Based on Load

type AdaptiveScheduler struct {
    currentLoad   float64
    maxLoad       float64
    baseLimit     uint
    currentLimit  uint
    mu            sync.RWMutex
}

func (a *AdaptiveScheduler) UpdateLoad(load float64) {
    a.mu.Lock()
    defer a.mu.Unlock()

    a.currentLoad = load

    // Adjust limit based on load
    if load > a.maxLoad*0.8 {
        // High load: reduce concurrency
        a.currentLimit = a.baseLimit / 2
    } else if load < a.maxLoad*0.5 {
        // Low load: increase concurrency
        a.currentLimit = a.baseLimit * 2
    } else {
        // Normal load
        a.currentLimit = a.baseLimit
    }

    log.Printf("Load: %.2f, Limit adjusted to: %d", load, a.currentLimit)
}

Note: gocron doesn't support runtime limit changes. This pattern requires scheduler recreation.

Queue-Based Backpressure

func monitorAndAlert(s gocron.Scheduler) {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    consecutiveHighQueue := 0

    for range ticker.C {
        waiting := s.JobsWaitingInQueue()

        if waiting > 20 {
            consecutiveHighQueue++
            log.Printf("High queue: %d jobs waiting (count: %d)", waiting, consecutiveHighQueue)

            if consecutiveHighQueue > 3 {
                // Queue consistently high for 90+ seconds
                alerting.SendCriticalAlert(
                    "Job queue consistently high",
                    fmt.Sprintf("%d jobs waiting for %d checks", waiting, consecutiveHighQueue),
                )
            }
        } else {
            consecutiveHighQueue = 0
        }
    }
}

Best Practices

1. Choose Appropriate Limit

// CPU-bound: 1-2x CPU cores
cpuLimit := uint(runtime.NumCPU() * 2)

// I/O-bound: 10-50 depending on latency
ioLimit := uint(20)

// Memory-bound: Based on available memory
memLimit := uint(availableGB * 10) // Assuming 100MB per job

// Connection-bound: Match connection pool size
connLimit := uint(dbPool.MaxConns())

2. Always Monitor Queue

go func() {
    ticker := time.NewTicker(1 * time.Minute)
    defer ticker.Stop()

    for range ticker.C {
        waiting := s.JobsWaitingInQueue()
        metrics.RecordQueueSize(waiting)

        if waiting > 50 {
            log.Printf("Queue backlog: %d jobs", waiting)
        }
    }
}()

3. Use Reschedule for Non-Critical Jobs

// Background tasks: OK to skip
s, _ := gocron.NewScheduler(
    gocron.WithLimitConcurrentJobs(5, gocron.LimitModeReschedule),
)

4. Use Wait for Critical Jobs

// Critical processing: must execute
s, _ := gocron.NewScheduler(
    gocron.WithLimitConcurrentJobs(5, gocron.LimitModeWait),
)

But monitor queue size closely.

5. Set Graceful Shutdown Timeout

s, _ := gocron.NewScheduler(
    gocron.WithLimitConcurrentJobs(5, gocron.LimitModeWait),
    gocron.WithStopTimeout(30*time.Second),
)

defer func() {
    if err := s.Shutdown(); err != nil {
        if errors.Is(err, gocron.ErrStopSchedulerTimedOut) {
            log.Println("Some jobs did not finish in time")
        }
    }
}()

6. Consider Job Duration

// Short jobs (< 10s): higher concurrency
shortJobScheduler, _ := gocron.NewScheduler(
    gocron.WithLimitConcurrentJobs(20, gocron.LimitModeWait),
)

// Long jobs (> 1m): lower concurrency
longJobScheduler, _ := gocron.NewScheduler(
    gocron.WithLimitConcurrentJobs(3, gocron.LimitModeReschedule),
)

Troubleshooting

Queue Growing Unbounded

Symptom: JobsWaitingInQueue() keeps increasing.

Causes:

  1. Jobs scheduled faster than they complete
  2. Limit too low for job volume
  3. Jobs taking longer than expected

Solutions:

  • Increase concurrency limit
  • Switch to LimitModeReschedule
  • Optimize job performance
  • Reduce job frequency
  • Increase job intervals

All Jobs Being Skipped

Symptom: Jobs scheduled but not executing with LimitModeReschedule.

Cause: Limit consistently reached.

Solutions:

  • Increase limit
  • Reduce number of concurrent jobs
  • Stagger job schedules
  • Use LimitModeWait (with monitoring)

High Memory Usage

Symptom: Scheduler using excessive memory.

Cause: Large queue with LimitModeWait.

Solutions:

  • Switch to LimitModeReschedule
  • Reduce concurrency limit (counterintuitive but prevents queue buildup)
  • Increase limit (to drain queue faster)
  • Optimize job duration

Inconsistent Performance

Symptom: Some jobs fast, others slow.

Cause: Mix of fast and slow jobs competing for slots.

Solution: Separate schedulers by job type:

// Fast jobs
fastScheduler, _ := gocron.NewScheduler(
    gocron.WithLimitConcurrentJobs(10, gocron.LimitModeWait),
)

// Slow jobs
slowScheduler, _ := gocron.NewScheduler(
    gocron.WithLimitConcurrentJobs(2, gocron.LimitModeReschedule),
)

Related Documentation

Install with Tessl CLI

npx tessl i tessl/golang-github-com-go-co-op-gocron-v2@2.19.1

docs

index.md

tile.json