CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/golang-github-com-go-co-op-gocron-v2

A Golang job scheduling library that lets you run Go functions at pre-determined intervals using cron expressions, fixed durations, daily, weekly, monthly, or one-time schedules with support for distributed deployments.

Overview
Eval results
Files

production-patterns.mddocs/guides/advanced/

Production Patterns

Best practices and patterns for running gocron in production environments.

Overview

Production deployments require:

  • Reliability and fault tolerance
  • Observability and monitoring
  • Resource management
  • Graceful degradation
  • Security considerations

Reliability Patterns

Retry Logic

j, _ := s.NewJob(
    gocron.DurationJob(time.Minute),
    gocron.NewTask(func() error {
        maxRetries := 3
        var err error

        for attempt := 0; attempt < maxRetries; attempt++ {
            err = doWork()
            if err == nil {
                return nil
            }

            log.Printf("Attempt %d failed: %v", attempt+1, err)
            time.Sleep(time.Second * time.Duration(attempt+1))
        }

        return fmt.Errorf("failed after %d attempts: %w", maxRetries, err)
    }),
    gocron.WithEventListeners(
        gocron.AfterJobRunsWithError(func(jobID uuid.UUID, jobName string, err error) {
            sendAlert(fmt.Sprintf("Job %s failed: %v", jobName, err))
        }),
    ),
)

Circuit Breaker

type circuitBreaker struct {
    failures  int
    threshold int
    open      bool
    mu        sync.Mutex
}

func (cb *circuitBreaker) Call(fn func() error) error {
    cb.mu.Lock()
    if cb.open {
        cb.mu.Unlock()
        return errors.New("circuit breaker open")
    }
    cb.mu.Unlock()

    err := fn()

    cb.mu.Lock()
    defer cb.mu.Unlock()

    if err != nil {
        cb.failures++
        if cb.failures >= cb.threshold {
            cb.open = true
            go cb.halfOpen()
        }
        return err
    }

    cb.failures = 0
    return nil
}

func (cb *circuitBreaker) halfOpen() {
    time.Sleep(30 * time.Second)
    cb.mu.Lock()
    cb.open = false
    cb.failures = 0
    cb.mu.Unlock()
}

// Usage
cb := &circuitBreaker{threshold: 5}

j, _ := s.NewJob(
    gocron.DurationJob(time.Minute),
    gocron.NewTask(func() error {
        return cb.Call(func() error {
            return callExternalAPI()
        })
    }),
)

Idempotency

j, _ := s.NewJob(
    gocron.DurationJob(time.Minute),
    gocron.NewTask(func() error {
        // Check if already processed
        if alreadyProcessed(getProcessID()) {
            log.Println("Already processed, skipping")
            return nil
        }

        // Do work
        if err := doWork(); err != nil {
            return err
        }

        // Mark as processed
        return markProcessed(getProcessID())
    }),
)

Resource Management

Connection Pooling

type jobDependencies struct {
    db    *sql.DB
    cache *redis.Client
}

func newJobDependencies() *jobDependencies {
    db, _ := sql.Open("postgres", dsn)
    db.SetMaxOpenConns(25)
    db.SetMaxIdleConns(25)
    db.SetConnMaxLifetime(5 * time.Minute)

    cache := redis.NewClient(&redis.Options{
        Addr:     "localhost:6379",
        PoolSize: 10,
    })

    return &jobDependencies{
        db:    db,
        cache: cache,
    }
}

func (d *jobDependencies) Close() error {
    d.db.Close()
    return d.cache.Close()
}

// Usage
deps := newJobDependencies()
defer deps.Close()

j, _ := s.NewJob(
    gocron.DurationJob(time.Minute),
    gocron.NewTask(func() error {
        return processWithDB(deps.db)
    }),
)

Memory Limits

j, _ := s.NewJob(
    gocron.DurationJob(time.Minute),
    gocron.NewTask(func() error {
        // Process in batches to limit memory
        batchSize := 1000

        for offset := 0; ; offset += batchSize {
            items, err := fetchBatch(offset, batchSize)
            if err != nil {
                return err
            }

            if len(items) == 0 {
                break
            }

            for _, item := range items {
                if err := processItem(item); err != nil {
                    return err
                }
            }

            // Allow GC between batches
            runtime.GC()
        }

        return nil
    }),
)

Concurrency Control

s, _ := gocron.NewScheduler(
    gocron.WithLimitConcurrentJobs(10, gocron.LimitModeReschedule),
)

// Per-job singleton mode
j, _ := s.NewJob(
    gocron.DurationJob(time.Minute),
    gocron.NewTask(expensiveOperation),
    gocron.WithSingletonMode(gocron.LimitModeReschedule),
)

Observability

Comprehensive Monitoring

type productionMonitor struct {
    prometheus *prometheusMonitor
    logger     *zap.Logger
}

func (m *productionMonitor) JobStarted(jobID uuid.UUID, job gocron.Job) {
    m.logger.Info("job started",
        zap.String("job_id", jobID.String()),
        zap.String("job_name", job.Name()),
    )
    m.prometheus.JobStarted(jobID, job)
}

func (m *productionMonitor) JobCompleted(jobID uuid.UUID, job gocron.Job, err error) {
    if err != nil {
        m.logger.Error("job failed",
            zap.String("job_id", jobID.String()),
            zap.String("job_name", job.Name()),
            zap.Error(err),
        )
        m.prometheus.JobErrors.WithLabelValues(job.Name()).Inc()
    } else {
        m.logger.Info("job completed",
            zap.String("job_id", jobID.String()),
            zap.String("job_name", job.Name()),
        )
    }
    m.prometheus.JobCompleted(jobID, job, err)
}

s, _ := gocron.NewScheduler(
    gocron.WithSchedulerMonitor(&productionMonitor{}),
)

Health Checks

type healthChecker struct {
    scheduler gocron.Scheduler
    mu        sync.RWMutex
    healthy   bool
}

func (h *healthChecker) Check() bool {
    h.mu.RLock()
    defer h.mu.RUnlock()

    // Check if jobs are running as expected
    jobs := h.scheduler.Jobs()
    for _, j := range jobs {
        lastRun, _ := j.LastRun()
        if time.Since(lastRun) > 10*time.Minute {
            return false
        }
    }

    return h.healthy
}

func (h *healthChecker) Handler(w http.ResponseWriter, r *http.Request) {
    if h.Check() {
        w.WriteHeader(http.StatusOK)
        w.Write([]byte("healthy"))
    } else {
        w.WriteHeader(http.StatusServiceUnavailable)
        w.Write([]byte("unhealthy"))
    }
}

// Setup
hc := &healthChecker{scheduler: s, healthy: true}
http.HandleFunc("/health", hc.Handler)
go http.ListenAndServe(":8080", nil)

Alerting

type alertManager struct {
    pagerduty *pagerdutyClient
    slack     *slackClient
}

func (am *alertManager) Alert(severity string, message string) {
    switch severity {
    case "critical":
        am.pagerduty.Trigger(message)
        am.slack.Send(message)
    case "warning":
        am.slack.Send(message)
    }
}

// Monitor
type alertingMonitor struct {
    alerts *alertManager
}

func (m *alertingMonitor) JobCompleted(jobID uuid.UUID, job gocron.Job, err error) {
    if err != nil {
        m.alerts.Alert("critical",
            fmt.Sprintf("Job %s failed: %v", job.Name(), err))
    }
}

func (m *alertingMonitor) ConcurrencyLimitReached(limitType string, job gocron.Job) {
    m.alerts.Alert("warning",
        fmt.Sprintf("Concurrency limit reached for %s", job.Name()))
}

Security

Input Validation

j, _ := s.NewJob(
    gocron.DurationJob(time.Minute),
    gocron.NewTask(func() error {
        input := getUserInput()

        // Validate input
        if !isValid(input) {
            return errors.New("invalid input")
        }

        // Sanitize
        sanitized := sanitize(input)

        return process(sanitized)
    }),
)

Secrets Management

import (
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/secretsmanager"
)

type secretProvider struct {
    client *secretsmanager.SecretsManager
}

func (sp *secretProvider) GetSecret(name string) (string, error) {
    input := &secretsmanager.GetSecretValueInput{
        SecretId: aws.String(name),
    }

    result, err := sp.client.GetSecretValue(input)
    if err != nil {
        return "", err
    }

    return *result.SecretString, nil
}

// Usage
sp := &secretProvider{client: secretsmanager.New(session.New())}
apiKey, _ := sp.GetSecret("api-key")

j, _ := s.NewJob(
    gocron.DurationJob(time.Minute),
    gocron.NewTask(func() error {
        return callAPIWithKey(apiKey)
    }),
)

Rate Limiting

import "golang.org/x/time/rate"

limiter := rate.NewLimiter(rate.Every(time.Second), 10) // 10 req/sec

j, _ := s.NewJob(
    gocron.DurationJob(time.Minute),
    gocron.NewTask(func(ctx context.Context) error {
        for i := 0; i < 100; i++ {
            if err := limiter.Wait(ctx); err != nil {
                return err
            }

            if err := callExternalAPI(); err != nil {
                return err
            }
        }
        return nil
    }),
)

Deployment Patterns

Blue-Green Deployment

func main() {
    // Check deployment version
    version := os.Getenv("VERSION")

    s, _ := gocron.NewScheduler()

    // Version-specific jobs
    if version == "blue" {
        s.NewJob(
            gocron.DurationJob(time.Minute),
            gocron.NewTask(blueVersionTask),
        )
    } else {
        s.NewJob(
            gocron.DurationJob(time.Minute),
            gocron.NewTask(greenVersionTask),
        )
    }

    s.Start()
    waitForShutdown()
}

Canary Deployment

func main() {
    canaryPercent := getCanaryPercent() // e.g., 10%

    s, _ := gocron.NewScheduler()

    j, _ := s.NewJob(
        gocron.DurationJob(time.Minute),
        gocron.NewTask(func() error {
            if rand.Intn(100) < canaryPercent {
                return newVersionTask()
            }
            return stableVersionTask()
        }),
    )

    s.Start()
    waitForShutdown()
}

Rolling Deployment

Use distributed locking to prevent duplicate execution during rolling updates:

s, _ := gocron.NewScheduler(
    gocron.WithDistributedLocker(redisLocker),
)

// Jobs automatically coordinated across instances

Configuration Management

Environment-Based Config

type Config struct {
    JobInterval    time.Duration
    MaxConcurrency int
    StopTimeout    time.Duration
}

func loadConfig() Config {
    env := os.Getenv("ENV")

    switch env {
    case "production":
        return Config{
            JobInterval:    5 * time.Minute,
            MaxConcurrency: 10,
            StopTimeout:    60 * time.Second,
        }
    case "staging":
        return Config{
            JobInterval:    time.Minute,
            MaxConcurrency: 5,
            StopTimeout:    30 * time.Second,
        }
    default:
        return Config{
            JobInterval:    10 * time.Second,
            MaxConcurrency: 2,
            StopTimeout:    10 * time.Second,
        }
    }
}

// Usage
cfg := loadConfig()

s, _ := gocron.NewScheduler(
    gocron.WithLimitConcurrentJobs(cfg.MaxConcurrency, gocron.LimitModeReschedule),
    gocron.WithStopTimeout(cfg.StopTimeout),
)

j, _ := s.NewJob(
    gocron.DurationJob(cfg.JobInterval),
    gocron.NewTask(myTask),
)

Feature Flags

type featureFlags struct {
    NewAlgorithmEnabled bool
    BatchSizeOverride   *int
}

func getFlags() featureFlags {
    // Load from config service, env vars, etc.
    return featureFlags{
        NewAlgorithmEnabled: os.Getenv("NEW_ALGO") == "true",
    }
}

j, _ := s.NewJob(
    gocron.DurationJob(time.Minute),
    gocron.NewTask(func() error {
        flags := getFlags()

        if flags.NewAlgorithmEnabled {
            return newAlgorithm()
        }
        return legacyAlgorithm()
    }),
)

Testing in Production

Dry-Run Mode

dryRun := os.Getenv("DRY_RUN") == "true"

j, _ := s.NewJob(
    gocron.DurationJob(time.Minute),
    gocron.NewTask(func() error {
        if dryRun {
            log.Println("DRY RUN: Would perform action")
            return nil
        }
        return performAction()
    }),
)

Shadow Testing

j, _ := s.NewJob(
    gocron.DurationJob(time.Minute),
    gocron.NewTask(func() error {
        // Run production version
        prodResult, prodErr := productionVersion()

        // Run shadow version (don't fail job on error)
        go func() {
            shadowResult, shadowErr := shadowVersion()

            // Compare results
            if !equal(prodResult, shadowResult) {
                log.Printf("Shadow divergence detected")
            }
        }()

        return prodErr
    }),
)

Performance Optimization

Batch Processing

j, _ := s.NewJob(
    gocron.DurationJob(time.Minute),
    gocron.NewTask(func() error {
        items, err := fetchItemsToProcess()
        if err != nil {
            return err
        }

        // Process in batches
        batchSize := 100
        for i := 0; i < len(items); i += batchSize {
            end := i + batchSize
            if end > len(items) {
                end = len(items)
            }

            batch := items[i:end]
            if err := processBatch(batch); err != nil {
                return err
            }
        }

        return nil
    }),
)

Parallel Processing

j, _ := s.NewJob(
    gocron.DurationJob(time.Minute),
    gocron.NewTask(func(ctx context.Context) error {
        items, err := fetchItems()
        if err != nil {
            return err
        }

        // Process in parallel with worker pool
        workers := 10
        itemsChan := make(chan item, len(items))
        errChan := make(chan error, len(items))

        var wg sync.WaitGroup
        for i := 0; i < workers; i++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                for item := range itemsChan {
                    if err := processItem(item); err != nil {
                        errChan <- err
                    }
                }
            }()
        }

        for _, item := range items {
            itemsChan <- item
        }
        close(itemsChan)

        wg.Wait()
        close(errChan)

        // Check for errors
        for err := range errChan {
            if err != nil {
                return err
            }
        }

        return nil
    }),
)

See Also

  • Graceful Shutdown Guide - Shutdown patterns
  • Time-Based Scheduling Guide - Scheduling patterns
  • Observability Guide - Monitoring and logging
  • Distributed Guide - Multi-instance deployment

Install with Tessl CLI

npx tessl i tessl/golang-github-com-go-co-op-gocron-v2@2.19.1

docs

index.md

tile.json