CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/golang-github-com-go-co-op-gocron-v2

A Golang job scheduling library that lets you run Go functions at pre-determined intervals using cron expressions, fixed durations, daily, weekly, monthly, or one-time schedules with support for distributed deployments.

Overview
Eval results
Files

distributed-deployment.mddocs/examples/by-feature/

Distributed Deployment Examples

Run gocron v2 across multiple instances using Elector and Locker implementations.

Redis Locker Implementation

Complete Redis-based distributed locking:

package main

import (
    "context"
    "errors"
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/go-co-op/gocron/v2"
    "github.com/redis/go-redis/v9"
)

type redisLocker struct {
    client *redis.Client
}

func NewRedisLocker(addr string) (*redisLocker, error) {
    client := redis.NewClient(&redis.Options{
        Addr: addr,
    })

    // Test connection
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    if err := client.Ping(ctx).Err(); err != nil {
        return nil, fmt.Errorf("redis connection failed: %w", err)
    }

    return &redisLocker{client: client}, nil
}

func (l *redisLocker) Lock(ctx context.Context, key string) (gocron.Lock, error) {
    lockKey := "gocron:lock:" + key
    // TTL should be 2-3x max expected job duration
    ttl := 5 * time.Minute

    ok, err := l.client.SetNX(ctx, lockKey, "1", ttl).Result()
    if err != nil {
        return nil, fmt.Errorf("lock acquisition failed: %w", err)
    }

    if !ok {
        return nil, errors.New("lock already held")
    }

    return &redisLock{client: l.client, key: lockKey}, nil
}

func (l *redisLocker) Close() error {
    return l.client.Close()
}

type redisLock struct {
    client *redis.Client
    key    string
}

func (l *redisLock) Unlock(ctx context.Context) error {
    return l.client.Del(ctx, l.key).Err()
}

func main() {
    // Create Redis locker
    locker, err := NewRedisLocker("localhost:6379")
    if err != nil {
        panic(err)
    }
    defer locker.Close()

    // Create scheduler with distributed locker
    s, err := gocron.NewScheduler(
        gocron.WithDistributedLocker(locker),
        gocron.WithLogger(gocron.NewLogger(gocron.LogLevelInfo)),
    )
    if err != nil {
        panic(err)
    }
    defer s.Shutdown()

    // Add jobs with stable names
    s.NewJob(
        gocron.DurationJob(time.Minute),
        gocron.NewTask(func() {
            fmt.Println("Job 1 running on instance", os.Getenv("INSTANCE_ID"))
        }),
        gocron.WithName("job-1"), // Stable name for lock key
        gocron.WithEventListeners(
            gocron.AfterLockError(func(jobID uuid.UUID, jobName string, err error) {
                fmt.Printf("Lock failed for %s: %v\n", jobName, err)
            }),
        ),
    )

    s.NewJob(
        gocron.DurationJob(2*time.Minute),
        gocron.NewTask(func() {
            fmt.Println("Job 2 running on instance", os.Getenv("INSTANCE_ID"))
        }),
        gocron.WithName("job-2"),
    )

    s.Start()
    fmt.Println("Scheduler started on instance", os.Getenv("INSTANCE_ID"))

    // Wait for interrupt
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
    <-sigCh

    fmt.Println("Shutting down...")
}

Redis Locker with Renewal

For long-running jobs, renew the lock periodically:

type renewableRedisLock struct {
    client *redis.Client
    key    string
    ttl    time.Duration
    cancel context.CancelFunc
}

func (l *redisLocker) Lock(ctx context.Context, key string) (gocron.Lock, error) {
    lockKey := "gocron:lock:" + key
    ttl := 5 * time.Minute

    ok, err := l.client.SetNX(ctx, lockKey, "1", ttl).Result()
    if err != nil {
        return nil, err
    }

    if !ok {
        return nil, errors.New("lock already held")
    }

    renewCtx, cancel := context.WithCancel(context.Background())
    lock := &renewableRedisLock{
        client: l.client,
        key:    lockKey,
        ttl:    ttl,
        cancel: cancel,
    }

    // Start renewal goroutine
    go lock.renewLoop(renewCtx)

    return lock, nil
}

func (l *renewableRedisLock) renewLoop(ctx context.Context) {
    ticker := time.NewTicker(l.ttl / 2) // Renew at half TTL
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            // Renew lock
            l.client.Expire(ctx, l.key, l.ttl)
        }
    }
}

func (l *renewableRedisLock) Unlock(ctx context.Context) error {
    l.cancel() // Stop renewal
    return l.client.Del(ctx, l.key).Err()
}

etcd Elector Implementation

Leader election using etcd:

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/go-co-op/gocron/v2"
    "go.etcd.io/etcd/client/v3"
    "go.etcd.io/etcd/client/v3/concurrency"
)

type etcdElector struct {
    client   *clientv3.Client
    session  *concurrency.Session
    election *concurrency.Election
    cancel   context.CancelFunc
}

func NewEtcdElector(endpoints []string, electionPrefix string) (*etcdElector, error) {
    client, err := clientv3.New(clientv3.Config{
        Endpoints:   endpoints,
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        return nil, err
    }

    session, err := concurrency.NewSession(client, concurrency.WithTTL(10))
    if err != nil {
        client.Close()
        return nil, err
    }

    election := concurrency.NewElection(session, electionPrefix)

    ctx, cancel := context.WithCancel(context.Background())
    e := &etcdElector{
        client:   client,
        session:  session,
        election: election,
        cancel:   cancel,
    }

    // Campaign for leadership
    go func() {
        instanceID := os.Getenv("INSTANCE_ID")
        if instanceID == "" {
            instanceID = fmt.Sprintf("instance-%d", time.Now().Unix())
        }

        if err := e.election.Campaign(ctx, instanceID); err != nil && ctx.Err() == nil {
            fmt.Printf("Campaign failed: %v\n", err)
        }
    }()

    return e, nil
}

func (e *etcdElector) IsLeader(ctx context.Context) error {
    resp, err := e.election.Leader(ctx)
    if err != nil {
        return err
    }

    instanceID := os.Getenv("INSTANCE_ID")
    if instanceID == "" {
        instanceID = fmt.Sprintf("instance-%d", time.Now().Unix())
    }

    if string(resp.Kvs[0].Value) == instanceID {
        return nil // This instance is leader
    }

    return fmt.Errorf("not the leader")
}

func (e *etcdElector) Close() error {
    e.cancel()
    if err := e.election.Resign(context.Background()); err != nil {
        return err
    }
    if err := e.session.Close(); err != nil {
        return err
    }
    return e.client.Close()
}

func main() {
    // Create elector
    elector, err := NewEtcdElector([]string{"localhost:2379"}, "/gocron/election")
    if err != nil {
        panic(err)
    }
    defer elector.Close()

    // Create scheduler with elector
    s, err := gocron.NewScheduler(
        gocron.WithDistributedElector(elector),
        gocron.WithLogger(gocron.NewLogger(gocron.LogLevelInfo)),
    )
    if err != nil {
        panic(err)
    }
    defer s.Shutdown()

    // Add jobs
    s.NewJob(
        gocron.DurationJob(time.Minute),
        gocron.NewTask(func() {
            fmt.Println("Job running on leader:", os.Getenv("INSTANCE_ID"))
        }),
        gocron.WithName("leader-job"),
    )

    s.Start()
    fmt.Println("Scheduler started on instance", os.Getenv("INSTANCE_ID"))

    // Wait for interrupt
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
    <-sigCh

    fmt.Println("Shutting down...")
}

PostgreSQL Advisory Locks

Using PostgreSQL for distributed locking:

package main

import (
    "context"
    "database/sql"
    "errors"
    "fmt"
    "hash/fnv"
    "time"

    "github.com/go-co-op/gocron/v2"
    _ "github.com/lib/pq"
)

type postgresLocker struct {
    db *sql.DB
}

func NewPostgresLocker(connStr string) (*postgresLocker, error) {
    db, err := sql.Open("postgres", connStr)
    if err != nil {
        return nil, err
    }

    // Test connection
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    if err := db.PingContext(ctx); err != nil {
        db.Close()
        return nil, fmt.Errorf("postgres connection failed: %w", err)
    }

    return &postgresLocker{db: db}, nil
}

func (l *postgresLocker) Lock(ctx context.Context, key string) (gocron.Lock, error) {
    // Hash key to 64-bit integer for PostgreSQL advisory lock
    lockID := hashKeyToInt64(key)

    var acquired bool
    err := l.db.QueryRowContext(ctx, "SELECT pg_try_advisory_lock($1)", lockID).Scan(&acquired)
    if err != nil {
        return nil, err
    }

    if !acquired {
        return nil, errors.New("lock already held")
    }

    return &postgresLock{db: l.db, lockID: lockID}, nil
}

func (l *postgresLocker) Close() error {
    return l.db.Close()
}

type postgresLock struct {
    db     *sql.DB
    lockID int64
}

func (l *postgresLock) Unlock(ctx context.Context) error {
    var released bool
    err := l.db.QueryRowContext(ctx, "SELECT pg_advisory_unlock($1)", l.lockID).Scan(&released)
    if err != nil {
        return err
    }
    if !released {
        return errors.New("lock was not held")
    }
    return nil
}

func hashKeyToInt64(key string) int64 {
    h := fnv.New64a()
    h.Write([]byte(key))
    return int64(h.Sum64())
}

func main() {
    // Create locker
    locker, err := NewPostgresLocker("postgres://user:password@localhost/dbname?sslmode=disable")
    if err != nil {
        panic(err)
    }
    defer locker.Close()

    // Create scheduler
    s, err := gocron.NewScheduler(
        gocron.WithDistributedLocker(locker),
    )
    if err != nil {
        panic(err)
    }
    defer s.Shutdown()

    // Add jobs
    s.NewJob(
        gocron.DurationJob(time.Minute),
        gocron.NewTask(func() {
            fmt.Println("Job running")
        }),
        gocron.WithName("postgres-job"),
    )

    s.Start()
    select {}
}

Time Alignment for Duration Jobs

Ensure all instances schedule jobs at the same time:

func main() {
    locker, _ := NewRedisLocker("localhost:6379")
    defer locker.Close()

    s, _ := gocron.NewScheduler(
        gocron.WithDistributedLocker(locker),
    )
    defer s.Shutdown()

    // Align to 5-minute boundaries
    now := time.Now()
    next5Min := now.Truncate(5 * time.Minute).Add(5 * time.Minute)

    s.NewJob(
        gocron.DurationJob(5*time.Minute),
        gocron.NewTask(doWork),
        gocron.WithName("aligned-job"),
        gocron.WithStartAt(gocron.WithStartDateTime(next5Min)),
    )

    s.Start()
    select {}
}

Per-Job Locker Override

Use different lockers for different jobs:

func main() {
    // Redis for most jobs
    redisLocker, _ := NewRedisLocker("localhost:6379")
    defer redisLocker.Close()

    // PostgreSQL for critical jobs
    pgLocker, _ := NewPostgresLocker("postgres://...")
    defer pgLocker.Close()

    s, _ := gocron.NewScheduler(
        gocron.WithDistributedLocker(redisLocker), // Default
    )
    defer s.Shutdown()

    // Regular job uses Redis
    s.NewJob(
        gocron.DurationJob(time.Minute),
        gocron.NewTask(doRegularWork),
        gocron.WithName("regular-job"),
    )

    // Critical job uses PostgreSQL
    s.NewJob(
        gocron.DurationJob(time.Minute),
        gocron.NewTask(doCriticalWork),
        gocron.WithName("critical-job"),
        gocron.WithDistributedJobLocker(pgLocker), // Override
    )

    // Local-only job (no locking)
    s.NewJob(
        gocron.DurationJob(time.Minute),
        gocron.NewTask(doLocalWork),
        gocron.WithName("local-job"),
        gocron.WithDisabledDistributedJobLocker(true),
    )

    s.Start()
    select {}
}

Combining Elector and Locker

Use both strategies (uncommon):

func main() {
    elector, _ := NewEtcdElector([]string{"localhost:2379"}, "/gocron/election")
    defer elector.Close()

    locker, _ := NewRedisLocker("localhost:6379")
    defer locker.Close()

    s, _ := gocron.NewScheduler(
        gocron.WithDistributedElector(elector), // Only leader runs jobs
        gocron.WithDistributedLocker(locker),   // Leader uses locks for per-job mutex
    )
    defer s.Shutdown()

    s.NewJob(
        gocron.DurationJob(time.Minute),
        gocron.NewTask(doWork),
        gocron.WithName("combined-job"),
    )

    s.Start()
    select {}
}

Monitoring Distributed Jobs

Track lock failures and distribution:

package main

import (
    "fmt"
    "sync/atomic"
    "time"

    "github.com/go-co-op/gocron/v2"
)

type distributedMetrics struct {
    lockFailures  atomic.Int64
    lockSuccesses atomic.Int64
}

func (m *distributedMetrics) recordLockFailure() {
    m.lockFailures.Add(1)
}

func (m *distributedMetrics) recordLockSuccess() {
    m.lockSuccesses.Add(1)
}

func (m *distributedMetrics) printStats() {
    failures := m.lockFailures.Load()
    successes := m.lockSuccesses.Load()
    total := failures + successes
    if total == 0 {
        fmt.Println("No lock attempts yet")
        return
    }

    successRate := float64(successes) / float64(total) * 100
    fmt.Printf("Lock stats: %d/%d successful (%.1f%%)\n", successes, total, successRate)
}

func main() {
    metrics := &distributedMetrics{}

    locker, _ := NewRedisLocker("localhost:6379")
    defer locker.Close()

    s, _ := gocron.NewScheduler(
        gocron.WithDistributedLocker(locker),
    )
    defer s.Shutdown()

    // Add jobs with lock monitoring
    s.NewJob(
        gocron.DurationJob(time.Minute),
        gocron.NewTask(func() {
            metrics.recordLockSuccess()
            fmt.Println("Job running")
        }),
        gocron.WithName("monitored-job"),
        gocron.WithEventListeners(
            gocron.AfterLockError(func(jobID uuid.UUID, jobName string, err error) {
                metrics.recordLockFailure()
                fmt.Printf("Lock failed for %s: %v\n", jobName, err)
            }),
        ),
    )

    // Print stats periodically
    go func() {
        ticker := time.NewTicker(1 * time.Minute)
        defer ticker.Stop()

        for range ticker.C {
            metrics.printStats()
        }
    }()

    s.Start()
    select {}
}

Testing Distributed Setup

Mock implementations for testing:

package main

import (
    "context"
    "errors"
    "sync"
    "testing"
    "time"

    "github.com/go-co-op/gocron/v2"
)

// Mock locker for testing
type mockLocker struct {
    locks map[string]bool
    mu    sync.Mutex
}

func newMockLocker() *mockLocker {
    return &mockLocker{locks: make(map[string]bool)}
}

func (l *mockLocker) Lock(ctx context.Context, key string) (gocron.Lock, error) {
    l.mu.Lock()
    defer l.mu.Unlock()

    if l.locks[key] {
        return nil, errors.New("already locked")
    }
    l.locks[key] = true
    return &mockLock{locker: l, key: key}, nil
}

type mockLock struct {
    locker *mockLocker
    key    string
}

func (l *mockLock) Unlock(ctx context.Context) error {
    l.locker.mu.Lock()
    defer l.locker.mu.Unlock()
    delete(l.locker.locks, l.key)
    return nil
}

// Mock elector for testing
type mockElector struct {
    isLeader bool
    mu       sync.RWMutex
}

func (e *mockElector) IsLeader(ctx context.Context) error {
    e.mu.RLock()
    defer e.mu.RUnlock()

    if !e.isLeader {
        return errors.New("not leader")
    }
    return nil
}

func (e *mockElector) SetLeader(isLeader bool) {
    e.mu.Lock()
    defer e.mu.Unlock()
    e.isLeader = isLeader
}

// Test distributed locking
func TestDistributedLocking(t *testing.T) {
    locker := newMockLocker()

    // Create two schedulers
    s1, _ := gocron.NewScheduler(gocron.WithDistributedLocker(locker))
    s2, _ := gocron.NewScheduler(gocron.WithDistributedLocker(locker))
    defer s1.Shutdown()
    defer s2.Shutdown()

    var runs1, runs2 int
    var mu sync.Mutex

    // Same job on both schedulers
    s1.NewJob(
        gocron.DurationJob(time.Second),
        gocron.NewTask(func() {
            mu.Lock()
            runs1++
            mu.Unlock()
        }),
        gocron.WithName("shared-job"),
    )

    s2.NewJob(
        gocron.DurationJob(time.Second),
        gocron.NewTask(func() {
            mu.Lock()
            runs2++
            mu.Unlock()
        }),
        gocron.WithName("shared-job"),
    )

    s1.Start()
    s2.Start()

    time.Sleep(5 * time.Second)

    mu.Lock()
    total := runs1 + runs2
    mu.Unlock()

    t.Logf("Scheduler 1: %d runs, Scheduler 2: %d runs, Total: %d", runs1, runs2, total)

    // Both schedulers should have attempted runs, but locks prevent simultaneous execution
    if total == 0 {
        t.Fatal("No runs executed")
    }
}

// Test leader election
func TestLeaderElection(t *testing.T) {
    elector := &mockElector{isLeader: true}

    s1, _ := gocron.NewScheduler(gocron.WithDistributedElector(elector))
    defer s1.Shutdown()

    var runs int
    var mu sync.Mutex

    s1.NewJob(
        gocron.DurationJob(time.Second),
        gocron.NewTask(func() {
            mu.Lock()
            runs++
            mu.Unlock()
        }),
    )

    s1.Start()
    time.Sleep(3 * time.Second)

    // Simulate leadership loss
    elector.SetLeader(false)
    time.Sleep(3 * time.Second)

    mu.Lock()
    finalRuns := runs
    mu.Unlock()

    t.Logf("Total runs: %d", finalRuns)

    // Should run while leader, stop after losing leadership
    if finalRuns == 0 {
        t.Fatal("No runs while leader")
    }
}

Docker Compose Example

Multi-instance deployment:

version: '3.8'

services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

  scheduler-1:
    build: .
    environment:
      - INSTANCE_ID=scheduler-1
      - REDIS_ADDR=redis:6379
    depends_on:
      - redis

  scheduler-2:
    build: .
    environment:
      - INSTANCE_ID=scheduler-2
      - REDIS_ADDR=redis:6379
    depends_on:
      - redis

  scheduler-3:
    build: .
    environment:
      - INSTANCE_ID=scheduler-3
      - REDIS_ADDR=redis:6379
    depends_on:
      - redis

Best Practices

  1. Always use stable job names with WithName()
  2. Align start times for duration jobs
  3. Set appropriate lock TTL (2-3x max job duration)
  4. Implement lock renewal for long-running jobs
  5. Monitor lock failures with event listeners
  6. Synchronize clocks across instances (NTP)
  7. Handle lock failures gracefully
  8. Test with mock implementations

When to Use What

Use Locker When:

  • Multiple instances run the same jobs
  • Jobs should run on any available instance
  • Want load distribution across instances
  • Each job has independent execution

Use Elector When:

  • Only one instance should be active
  • Want simple active-standby failover
  • All jobs run together on leader
  • Simpler than per-job locking

Use Both When:

  • Leader runs jobs (elector)
  • But jobs need individual locking (locker)
  • Uncommon pattern

Related Documentation

  • Guide: Distributed Deployment
  • API: TypesElector, Locker, Lock
  • API: Scheduler OptionsWithDistributedElector, WithDistributedLocker
  • API: Job OptionsWithDistributedJobLocker, WithDisabledDistributedJobLocker

Install with Tessl CLI

npx tessl i tessl/golang-github-com-go-co-op-gocron-v2@2.19.1

docs

index.md

tile.json