CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/golang-github-com-go-co-op-gocron-v2

A Golang job scheduling library that lets you run Go functions at pre-determined intervals using cron expressions, fixed durations, daily, weekly, monthly, or one-time schedules with support for distributed deployments.

Overview
Eval results
Files

distributed.mddocs/examples/guides/

Examples: Distributed

Practical distributed deployment examples.

Leader Election with Redis

package main

import (
    "context"
    "errors"
    "fmt"
    "time"
    "github.com/go-co-op/gocron/v2"
    "github.com/redis/go-redis/v9"
)

type redisElector struct {
    client *redis.Client
    key    string
    id     string
    ttl    time.Duration
}

func newRedisElector(addr, id string) *redisElector {
    return &redisElector{
        client: redis.NewClient(&redis.Options{Addr: addr}),
        key:    "gocron:leader",
        id:     id,
        ttl:    10 * time.Second,
    }
}

func (e *redisElector) IsLeader(ctx context.Context) error {
    // Try to acquire leader lock
    ok, err := e.client.SetNX(ctx, e.key, e.id, e.ttl).Result()
    if err != nil {
        return err
    }

    if !ok {
        // Check if we're the current leader
        current, err := e.client.Get(ctx, e.key).Result()
        if err != nil {
            return err
        }

        if current != e.id {
            return errors.New("not leader")
        }

        // Refresh TTL
        e.client.Expire(ctx, e.key, e.ttl)
    }

    return nil
}

func main() {
    instanceID := fmt.Sprintf("instance-%d", time.Now().Unix())
    elector := newRedisElector("localhost:6379", instanceID)

    s, _ := gocron.NewScheduler(
        gocron.WithDistributedElector(elector),
    )
    defer s.Shutdown()

    // Only leader schedules jobs
    s.NewJob(
        gocron.DurationJob(time.Minute),
        gocron.NewTask(func() {
            fmt.Printf("[%s] Leader doing work\n", instanceID)
        }),
        gocron.WithName("leader-job"),
    )

    s.Start()
    select {}
}

Distributed Locking with Redis

type redisLocker struct {
    client *redis.Client
}

type redisLock struct {
    client *redis.Client
    key    string
    value  string
}

func newRedisLocker(addr string) *redisLocker {
    return &redisLocker{
        client: redis.NewClient(&redis.Options{Addr: addr}),
    }
}

func (l *redisLocker) Lock(ctx context.Context, key string) (gocron.Lock, error) {
    lockKey := fmt.Sprintf("gocron:lock:%s", key)
    lockValue := fmt.Sprintf("%d", time.Now().UnixNano())

    ok, err := l.client.SetNX(ctx, lockKey, lockValue, time.Minute).Result()
    if err != nil {
        return nil, err
    }

    if !ok {
        return nil, errors.New("lock already held")
    }

    return &redisLock{
        client: l.client,
        key:    lockKey,
        value:  lockValue,
    }, nil
}

func (l *redisLock) Unlock(ctx context.Context) error {
    // Only unlock if we hold the lock
    script := `
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
    `

    _, err := l.client.Eval(ctx, script, []string{l.key}, l.value).Result()
    return err
}

func main() {
    instanceID := fmt.Sprintf("instance-%d", time.Now().Unix())
    locker := newRedisLocker("localhost:6379")

    s, _ := gocron.NewScheduler(
        gocron.WithDistributedLocker(locker),
    )
    defer s.Shutdown()

    // All instances schedule, but only one executes per job
    s.NewJob(
        gocron.DurationJob(time.Minute),
        gocron.NewTask(func() {
            fmt.Printf("[%s] Acquired lock, doing work\n", instanceID)
        }),
        gocron.WithName("distributed-job"),
    )

    s.Start()
    select {}
}

Per-Job Locking

func main() {
    locker := newRedisLocker("localhost:6379")

    s, _ := gocron.NewScheduler()
    defer s.Shutdown()

    // Job 1 with distributed locking
    s.NewJob(
        gocron.DurationJob(time.Minute),
        gocron.NewTask(func() {
            fmt.Println("Exclusive job 1")
        }),
        gocron.WithName("exclusive-job-1"),
        gocron.WithDistributedJobLocker(locker),
    )

    // Job 2 with distributed locking
    s.NewJob(
        gocron.DurationJob(time.Minute),
        gocron.NewTask(func() {
            fmt.Println("Exclusive job 2")
        }),
        gocron.WithName("exclusive-job-2"),
        gocron.WithDistributedJobLocker(locker),
    )

    // Job 3 without locking (runs on all instances)
    s.NewJob(
        gocron.DurationJob(time.Minute),
        gocron.NewTask(func() {
            fmt.Println("Non-exclusive job")
        }),
        gocron.WithName("local-job"),
    )

    s.Start()
    select {}
}

Testing Distributed Setup

func main() {
    // Simulate 3 instances
    for i := 0; i < 3; i++ {
        go runInstance(i)
    }

    select {}
}

func runInstance(instanceNum int) {
    instanceID := fmt.Sprintf("instance-%d", instanceNum)
    locker := newRedisLocker("localhost:6379")

    s, _ := gocron.NewScheduler(
        gocron.WithDistributedLocker(locker),
    )
    defer s.Shutdown()

    s.NewJob(
        gocron.DurationJob(5*time.Second),
        gocron.NewTask(func() {
            fmt.Printf("[%s] Executing task\n", instanceID)
            time.Sleep(time.Second)
        }),
        gocron.WithName("test-job"),
    )

    s.Start()
    select {}
}

Health Check for Distributed System

import (
    "net/http"
)

type distributedHealthCheck struct {
    scheduler gocron.Scheduler
    elector   gocron.Elector
}

func (h *distributedHealthCheck) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    ctx := r.Context()

    // Check if this instance is leader
    isLeader := h.elector.IsLeader(ctx) == nil

    // Check scheduler health
    jobs := h.scheduler.Jobs()

    response := map[string]interface{}{
        "healthy":   true,
        "is_leader": isLeader,
        "job_count": len(jobs),
    }

    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(response)
}

func main() {
    elector := newRedisElector("localhost:6379", "instance-1")

    s, _ := gocron.NewScheduler(
        gocron.WithDistributedElector(elector),
    )
    defer s.Shutdown()

    // Setup health endpoint
    health := &distributedHealthCheck{
        scheduler: s,
        elector:   elector,
    }
    http.Handle("/health", health)
    go http.ListenAndServe(":8080", nil)

    s.Start()
    select {}
}

See Also

  • Distributed Guide
  • Leader Election Guide
  • Distributed Locking Guide

Install with Tessl CLI

npx tessl i tessl/golang-github-com-go-co-op-gocron-v2

docs

index.md

tile.json