CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/golang-github-com-go-co-op-gocron-v2

A Golang job scheduling library that lets you run Go functions at pre-determined intervals using cron expressions, fixed durations, daily, weekly, monthly, or one-time schedules with support for distributed deployments.

Overview
Eval results
Files

leader-election.mddocs/guides/distributed/

Leader Election

Elector interface, implementations, and use cases for distributed deployments.

Overview

Leader election ensures that only one instance of your application runs scheduled jobs, even when multiple instances are deployed. This is an all-or-nothing approach: the leader runs all jobs, followers run none.

type Elector interface {
    IsLeader(context.Context) error
}

Interface

Elector

type Elector interface {
    IsLeader(context.Context) error
}

IsLeader is called before each job execution:

  • Returns nil → this instance is the leader → job runs
  • Returns error → this instance is NOT the leader → job is skipped

Scheduler Option

func WithDistributedElector(elector Elector) SchedulerOption

Set the elector for the scheduler:

s, _ := gocron.NewScheduler(
    gocron.WithDistributedElector(elector),
)

Returns ErrWithDistributedElectorNil if elector is nil.

How It Works

Before Each Job Run

  1. Scheduler calls elector.IsLeader(ctx)
  2. If error returned → skip job execution
  3. If nil returned → proceed with job execution
Instance A (Leader):
  09:00 - IsLeader() → nil → Job runs
  09:01 - IsLeader() → nil → Job runs

Instance B (Follower):
  09:00 - IsLeader() → error → Job skipped
  09:01 - IsLeader() → error → Job skipped

Leadership Changes

When leadership changes, the new leader starts running jobs:

09:00 - Instance A is leader
09:05 - Instance A crashes
09:05 - Instance B detects leader loss, becomes leader
09:06 - Instance B starts running jobs

When to Use Elector

Use Elector when:

  • All jobs should run on exactly one instance
  • Simple all-or-nothing leadership
  • Clock skew between instances is unacceptable
  • Coordinator service is available (etcd, Consul, Redis)
  • Job distribution across instances is not needed

Don't use Elector when:

  • Different jobs can run on different instances
  • Need fine-grained per-job control
  • Want job-level distribution
  • Coordinator service unavailable

Use Distributed Locking instead for per-job control.

Implementation: etcd

Using etcd Election

import (
    "context"
    "log"
    "os"
    "time"

    "github.com/go-co-op/gocron/v2"
    "go.etcd.io/etcd/client/v3"
    "go.etcd.io/etcd/client/v3/concurrency"
)

type etcdElector struct {
    client     *clientv3.Client
    session    *concurrency.Session
    election   *concurrency.Election
    instanceID string
}

func NewEtcdElector(client *clientv3.Client, electionPrefix string) (*etcdElector, error) {
    // Create session with TTL
    session, err := concurrency.NewSession(client, concurrency.WithTTL(10))
    if err != nil {
        return nil, err
    }

    // Create election
    election := concurrency.NewElection(session, electionPrefix)

    hostname, _ := os.Hostname()
    instanceID := fmt.Sprintf("%s-%d", hostname, os.Getpid())

    e := &etcdElector{
        client:     client,
        session:    session,
        election:   election,
        instanceID: instanceID,
    }

    // Campaign for leadership in background
    go func() {
        ctx := context.Background()
        if err := e.election.Campaign(ctx, instanceID); err != nil {
            log.Printf("Campaign failed: %v", err)
        }
    }()

    return e, nil
}

func (e *etcdElector) IsLeader(ctx context.Context) error {
    resp, err := e.election.Leader(ctx)
    if err != nil {
        return err
    }

    if len(resp.Kvs) == 0 {
        return errors.New("no leader elected")
    }

    if string(resp.Kvs[0].Value) == e.instanceID {
        return nil // This instance is leader
    }

    return errors.New("not the leader")
}

func (e *etcdElector) Close() error {
    // Resign from election
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    if err := e.election.Resign(ctx); err != nil {
        log.Printf("Resign failed: %v", err)
    }

    return e.session.Close()
}

Usage

func main() {
    // Connect to etcd
    client, err := clientv3.New(clientv3.Config{
        Endpoints:   []string{"localhost:2379"},
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // Create elector
    elector, err := NewEtcdElector(client, "/gocron/election")
    if err != nil {
        log.Fatal(err)
    }
    defer elector.Close()

    // Create scheduler with elector
    s, err := gocron.NewScheduler(
        gocron.WithDistributedElector(elector),
    )
    if err != nil {
        log.Fatal(err)
    }
    defer s.Shutdown()

    // Add jobs
    s.NewJob(
        gocron.DurationJob(time.Minute),
        gocron.NewTask(func() {
            log.Println("Job running on leader")
        }),
        gocron.WithName("my-job"),
    )

    // Start scheduler
    s.Start()

    // Wait for interrupt
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
    <-sigCh

    log.Println("Shutting down...")
}

Implementation: Redis (Redlock)

Using Redis for Leader Election

import (
    "context"
    "errors"
    "time"

    "github.com/go-co-op/gocron/v2"
    "github.com/go-redsync/redsync/v4"
    "github.com/go-redsync/redsync/v4/redis/goredis/v9"
    "github.com/redis/go-redis/v9"
)

type redisElector struct {
    rs         *redsync.Redsync
    mutex      *redsync.Mutex
    instanceID string
    cancel     context.CancelFunc
}

func NewRedisElector(client *redis.Client) *redisElector {
    pool := goredis.NewPool(client)
    rs := redsync.New(pool)

    hostname, _ := os.Hostname()
    instanceID := fmt.Sprintf("%s-%d", hostname, os.Getpid())

    e := &redisElector{
        rs:         rs,
        instanceID: instanceID,
        mutex: rs.NewMutex(
            "gocron-leader",
            redsync.WithExpiry(10*time.Second),
            redsync.WithTries(1),
        ),
    }

    // Continuously try to acquire leadership
    ctx, cancel := context.WithCancel(context.Background())
    e.cancel = cancel

    go e.campaignLoop(ctx)

    return e
}

func (e *redisElector) campaignLoop(ctx context.Context) {
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            // Try to acquire or extend lock
            if err := e.mutex.ExtendContext(ctx); err != nil {
                // Lost lock, try to acquire
                e.mutex.LockContext(ctx)
            }
        }
    }
}

func (e *redisElector) IsLeader(ctx context.Context) error {
    // Check if we still hold the lock
    _, err := e.mutex.ExtendContext(ctx)
    if err != nil {
        return errors.New("not the leader")
    }
    return nil
}

func (e *redisElector) Close() error {
    e.cancel()
    _, err := e.mutex.Unlock()
    return err
}

Usage

func main() {
    client := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })
    defer client.Close()

    elector := NewRedisElector(client)
    defer elector.Close()

    s, _ := gocron.NewScheduler(
        gocron.WithDistributedElector(elector),
    )
    defer s.Shutdown()

    s.NewJob(
        gocron.DurationJob(time.Minute),
        gocron.NewTask(doWork),
    )

    s.Start()
    select {}
}

Implementation: Consul

Using Consul Sessions

import (
    "context"
    "errors"
    "time"

    consulapi "github.com/hashicorp/consul/api"
    "github.com/go-co-op/gocron/v2"
)

type consulElector struct {
    client     *consulapi.Client
    sessionID  string
    key        string
    instanceID string
    cancel     context.CancelFunc
}

func NewConsulElector(client *consulapi.Client, keyPrefix string) (*consulElector, error) {
    hostname, _ := os.Hostname()
    instanceID := fmt.Sprintf("%s-%d", hostname, os.Getpid())

    // Create session
    session := client.Session()
    sessionID, _, err := session.Create(&consulapi.SessionEntry{
        Name:      "gocron-leader",
        TTL:       "10s",
        Behavior:  "release",
        LockDelay: time.Second,
    }, nil)
    if err != nil {
        return nil, err
    }

    e := &consulElector{
        client:     client,
        sessionID:  sessionID,
        key:        keyPrefix + "/leader",
        instanceID: instanceID,
    }

    // Try to acquire lock in background
    ctx, cancel := context.WithCancel(context.Background())
    e.cancel = cancel

    go e.campaignLoop(ctx)

    return e, nil
}

func (e *consulElector) campaignLoop(ctx context.Context) {
    kv := e.client.KV()
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            // Try to acquire lock
            acquired, _, err := kv.Acquire(&consulapi.KVPair{
                Key:     e.key,
                Value:   []byte(e.instanceID),
                Session: e.sessionID,
            }, nil)

            if err != nil {
                log.Printf("Lock acquire failed: %v", err)
            } else if acquired {
                log.Printf("Acquired leadership")
            }
        }
    }
}

func (e *consulElector) IsLeader(ctx context.Context) error {
    kv := e.client.KV()

    pair, _, err := kv.Get(e.key, nil)
    if err != nil {
        return err
    }

    if pair == nil || pair.Session != e.sessionID {
        return errors.New("not the leader")
    }

    return nil
}

func (e *consulElector) Close() error {
    e.cancel()

    // Release lock
    kv := e.client.KV()
    _, _, err := kv.Release(&consulapi.KVPair{
        Key:     e.key,
        Session: e.sessionID,
    }, nil)

    // Destroy session
    session := e.client.Session()
    _, _ = session.Destroy(e.sessionID, nil)

    return err
}

Best Practices

1. Handle Leadership Loss Gracefully

type resilientElector struct {
    elector    Elector
    isLeader   bool
    mu         sync.RWMutex
    lastCheck  time.Time
    cacheTTL   time.Duration
}

func (e *resilientElector) IsLeader(ctx context.Context) error {
    e.mu.Lock()
    defer e.mu.Unlock()

    // Cache result for short time
    if time.Since(e.lastCheck) < e.cacheTTL {
        if e.isLeader {
            return nil
        }
        return errors.New("not leader (cached)")
    }

    // Check actual leadership
    err := e.elector.IsLeader(ctx)
    e.isLeader = (err == nil)
    e.lastCheck = time.Now()

    if err != nil {
        log.Printf("Leadership check failed: %v", err)
    }

    return err
}

2. Monitor Leadership Status

func monitorLeadership(elector Elector) {
    ticker := time.NewTicker(10 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        err := elector.IsLeader(ctx)
        cancel()

        if err == nil {
            log.Println("Status: Leader")
            metrics.RecordLeadershipStatus(1)
        } else {
            log.Println("Status: Follower")
            metrics.RecordLeadershipStatus(0)
        }
    }
}

3. Use Appropriate TTL

// TTL should balance:
// - Fast failover (shorter TTL)
// - Network stability (longer TTL)

// Fast failover (5-10s)
session, _ := concurrency.NewSession(client, concurrency.WithTTL(5))

// Stable networks (15-30s)
session, _ := concurrency.NewSession(client, concurrency.WithTTL(20))

Recommendation: Start with 10s, adjust based on network stability and failover requirements.

4. Implement Health Checks

type healthyElector struct {
    elector       Elector
    healthChecker func() bool
}

func (e *healthyElector) IsLeader(ctx context.Context) error {
    // Check our own health first
    if !e.healthChecker() {
        log.Println("Instance unhealthy, not claiming leadership")
        return errors.New("instance unhealthy")
    }

    return e.elector.IsLeader(ctx)
}

5. Log Leadership Changes

type loggingElector struct {
    elector   Elector
    wasLeader bool
    mu        sync.Mutex
}

func (e *loggingElector) IsLeader(ctx context.Context) error {
    err := e.elector.IsLeader(ctx)
    isLeader := (err == nil)

    e.mu.Lock()
    defer e.mu.Unlock()

    if isLeader != e.wasLeader {
        if isLeader {
            log.Println("*** Became leader ***")
        } else {
            log.Println("*** Lost leadership ***")
        }
        e.wasLeader = isLeader
    }

    return err
}

6. Handle Cleanup on Leadership Loss

j, _ := s.NewJob(
    gocron.DurationJob(time.Minute),
    gocron.NewTask(func(ctx context.Context) {
        // Check context for leadership loss
        select {
        case <-ctx.Done():
            log.Println("Leadership lost during execution, cleaning up...")
            cleanup()
            return
        default:
            doWork()
        }
    }),
)

Advanced Patterns

Fenced Leadership

Prevent split-brain with fencing tokens:

type fencedElector struct {
    elector     Elector
    leaderEpoch int64
    mu          sync.RWMutex
}

func (e *fencedElector) IsLeader(ctx context.Context) error {
    err := e.elector.IsLeader(ctx)
    if err != nil {
        return err
    }

    // Get current epoch from coordinator
    epoch := e.fetchEpoch(ctx)

    e.mu.Lock()
    defer e.mu.Unlock()

    if epoch > e.leaderEpoch {
        e.leaderEpoch = epoch
        log.Printf("New leadership epoch: %d", epoch)
    }

    return nil
}

Stepdown on Error

Voluntarily give up leadership on errors:

j, _ := s.NewJob(
    gocron.DurationJob(time.Minute),
    gocron.NewTask(doWork),
    gocron.WithEventListeners(
        gocron.AfterJobRunsWithError(func(jobID uuid.UUID, jobName string, err error) {
            // Critical error → resign leadership
            if isCriticalError(err) {
                log.Printf("Critical error, resigning leadership: %v", err)
                elector.Resign()
            }
        }),
    ),
)

Troubleshooting

No Jobs Running on Any Instance

Symptom: All instances skip jobs.

Causes:

  1. No leader elected
  2. Coordinator service down
  3. All instances failing IsLeader() check

Solutions:

  • Check coordinator service (etcd, Redis, Consul)
  • Verify network connectivity
  • Check elector implementation logs
  • Verify session/lock TTL not expired

Multiple Instances Think They're Leader

Symptom: Jobs run on multiple instances simultaneously.

Causes:

  1. Network partition (split-brain)
  2. Elector implementation bug
  3. Lock/session expiry too short

Solutions:

  • Increase session TTL
  • Implement fencing tokens
  • Use consensus-based coordinator (etcd, Consul)
  • Add health checks before claiming leadership

Frequent Leadership Changes

Symptom: Leadership flips between instances often.

Causes:

  1. TTL too short
  2. Network instability
  3. Instance health issues

Solutions:

  • Increase session TTL
  • Improve network stability
  • Add health checks to prevent unhealthy leaders

Leadership Not Failing Over

Symptom: Leader crashes, but follower doesn't take over.

Causes:

  1. Session renewal still succeeding
  2. Follower not checking for leadership
  3. Lock release not happening

Solutions:

  • Reduce session TTL
  • Implement heartbeat monitoring
  • Add lock expiry
  • Verify follower campaign logic

Related Documentation

Install with Tessl CLI

npx tessl i tessl/golang-github-com-go-co-op-gocron-v2

docs

index.md

tile.json