CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/golang-github-com-go-co-op-gocron-v2

A Golang job scheduling library that lets you run Go functions at pre-determined intervals using cron expressions, fixed durations, daily, weekly, monthly, or one-time schedules with support for distributed deployments.

Overview
Eval results
Files

testing.mddocs/guides/distributed/

Testing Distributed Deployments

Mock electors and lockers for testing distributed gocron deployments.

Overview

Testing distributed systems is challenging. gocron's elector and locker interfaces make it easy to test distributed behavior without external dependencies (etcd, Redis, etc.) by using mock implementations.

Mock Elector

Basic Mock

import (
    "context"
    "errors"
    "sync"

    "github.com/go-co-op/gocron/v2"
)

type mockElector struct {
    isLeader bool
    mu       sync.RWMutex
}

func NewMockElector(isLeader bool) *mockElector {
    return &mockElector{isLeader: isLeader}
}

func (e *mockElector) IsLeader(ctx context.Context) error {
    e.mu.RLock()
    defer e.mu.RUnlock()

    if !e.isLeader {
        return errors.New("not leader")
    }
    return nil
}

func (e *mockElector) SetLeader(isLeader bool) {
    e.mu.Lock()
    defer e.mu.Unlock()
    e.isLeader = isLeader
}

Usage in Tests

func TestLeaderElection(t *testing.T) {
    elector := NewMockElector(true)

    s, err := gocron.NewScheduler(
        gocron.WithDistributedElector(elector),
    )
    require.NoError(t, err)
    defer s.Shutdown()

    executed := false
    s.NewJob(
        gocron.DurationJob(time.Second),
        gocron.NewTask(func() {
            executed = true
        }),
    )

    s.Start()
    time.Sleep(1500 * time.Millisecond)

    // Job should execute (is leader)
    assert.True(t, executed)

    // Simulate leadership loss
    executed = false
    elector.SetLeader(false)
    time.Sleep(1500 * time.Millisecond)

    // Job should not execute (not leader)
    assert.False(t, executed)
}

Call Tracking Mock

type trackingMockElector struct {
    isLeader  bool
    callCount int
    mu        sync.RWMutex
}

func (e *trackingMockElector) IsLeader(ctx context.Context) error {
    e.mu.Lock()
    defer e.mu.Unlock()

    e.callCount++

    if !e.isLeader {
        return errors.New("not leader")
    }
    return nil
}

func (e *trackingMockElector) CallCount() int {
    e.mu.RLock()
    defer e.mu.RUnlock()
    return e.callCount
}

func TestElectorCallCount(t *testing.T) {
    elector := &trackingMockElector{isLeader: true}

    s, _ := gocron.NewScheduler(gocron.WithDistributedElector(elector))
    defer s.Shutdown()

    s.NewJob(gocron.DurationJob(100*time.Millisecond), gocron.NewTask(func() {}))
    s.Start()

    time.Sleep(350 * time.Millisecond)

    // Should have checked leadership 3+ times
    assert.GreaterOrEqual(t, elector.CallCount(), 3)
}

Mock Locker

Basic Mock

type mockLocker struct {
    locks map[string]bool
    mu    sync.Mutex
}

func NewMockLocker() *mockLocker {
    return &mockLocker{
        locks: make(map[string]bool),
    }
}

func (l *mockLocker) Lock(ctx context.Context, key string) (gocron.Lock, error) {
    l.mu.Lock()
    defer l.mu.Unlock()

    if l.locks[key] {
        return nil, errors.New("already locked")
    }

    l.locks[key] = true
    return &mockLock{locker: l, key: key}, nil
}

type mockLock struct {
    locker *mockLocker
    key    string
}

func (l *mockLock) Unlock(ctx context.Context) error {
    l.locker.mu.Lock()
    defer l.locker.mu.Unlock()

    delete(l.locker.locks, l.key)
    return nil
}

Usage in Tests

func TestDistributedLocking(t *testing.T) {
    locker := NewMockLocker()

    // Create two schedulers sharing the same locker
    s1, _ := gocron.NewScheduler(gocron.WithDistributedLocker(locker))
    s2, _ := gocron.NewScheduler(gocron.WithDistributedLocker(locker))
    defer s1.Shutdown()
    defer s2.Shutdown()

    execution1 := 0
    execution2 := 0

    // Both schedulers add the same job
    s1.NewJob(
        gocron.DurationJob(100*time.Millisecond),
        gocron.NewTask(func() {
            execution1++
            time.Sleep(50 * time.Millisecond)
        }),
        gocron.WithName("shared-job"),
    )

    s2.NewJob(
        gocron.DurationJob(100*time.Millisecond),
        gocron.NewTask(func() {
            execution2++
            time.Sleep(50 * time.Millisecond)
        }),
        gocron.WithName("shared-job"),
    )

    s1.Start()
    s2.Start()

    time.Sleep(500 * time.Millisecond)

    // Only one scheduler should have executed the job
    total := execution1 + execution2
    assert.Greater(t, total, 0, "At least one execution")
    assert.Less(t, total, 10, "Not both executing every time")

    // One scheduler should dominate (or fair split)
    t.Logf("Scheduler 1 executions: %d", execution1)
    t.Logf("Scheduler 2 executions: %d", execution2)
}

Lock Tracking Mock

type trackingMockLocker struct {
    locks         map[string]bool
    lockAttempts  map[string]int
    lockSuccesses map[string]int
    mu            sync.Mutex
}

func NewTrackingMockLocker() *trackingMockLocker {
    return &trackingMockLocker{
        locks:         make(map[string]bool),
        lockAttempts:  make(map[string]int),
        lockSuccesses: make(map[string]int),
    }
}

func (l *trackingMockLocker) Lock(ctx context.Context, key string) (gocron.Lock, error) {
    l.mu.Lock()
    defer l.mu.Unlock()

    l.lockAttempts[key]++

    if l.locks[key] {
        return nil, errors.New("already locked")
    }

    l.locks[key] = true
    l.lockSuccesses[key]++
    return &mockLock{locker: l, key: key}, nil
}

func (l *trackingMockLocker) Stats(key string) (attempts, successes int) {
    l.mu.Lock()
    defer l.mu.Unlock()
    return l.lockAttempts[key], l.lockSuccesses[key]
}

func TestLockContention(t *testing.T) {
    locker := NewTrackingMockLocker()

    s1, _ := gocron.NewScheduler(gocron.WithDistributedLocker(locker))
    s2, _ := gocron.NewScheduler(gocron.WithDistributedLocker(locker))
    defer s1.Shutdown()
    defer s2.Shutdown()

    // Both add same job
    for _, s := range []gocron.Scheduler{s1, s2} {
        s.NewJob(
            gocron.DurationJob(100*time.Millisecond),
            gocron.NewTask(func() {
                time.Sleep(50 * time.Millisecond)
            }),
            gocron.WithName("contended-job"),
        )
    }

    s1.Start()
    s2.Start()

    time.Sleep(550 * time.Millisecond)

    attempts, successes := locker.Stats("contended-job")
    t.Logf("Lock attempts: %d, successes: %d", attempts, successes)

    // Should have contention (attempts > successes)
    assert.Greater(t, attempts, successes)
}

Testing Patterns

Test Leadership Transition

func TestLeadershipTransition(t *testing.T) {
    elector := NewMockElector(true)

    s, _ := gocron.NewScheduler(gocron.WithDistributedElector(elector))
    defer s.Shutdown()

    var executions []time.Time
    var mu sync.Mutex

    s.NewJob(
        gocron.DurationJob(100*time.Millisecond),
        gocron.NewTask(func() {
            mu.Lock()
            defer mu.Unlock()
            executions = append(executions, time.Now())
        }),
    )

    s.Start()

    // Leader for 300ms
    time.Sleep(300 * time.Millisecond)
    elector.SetLeader(false)

    // Not leader for 300ms
    time.Sleep(300 * time.Millisecond)
    elector.SetLeader(true)

    // Leader again for 300ms
    time.Sleep(300 * time.Millisecond)

    mu.Lock()
    defer mu.Unlock()

    // Should have executions in first and third periods, none in middle
    t.Logf("Executions: %d", len(executions))
    assert.GreaterOrEqual(t, len(executions), 4) // ~3 in first, ~3 in third
    assert.LessOrEqual(t, len(executions), 8)
}

Test Lock Expiry

type expiringMockLocker struct {
    locks   map[string]time.Time
    ttl     time.Duration
    mu      sync.Mutex
}

func NewExpiringMockLocker(ttl time.Duration) *expiringMockLocker {
    l := &expiringMockLocker{
        locks: make(map[string]time.Time),
        ttl:   ttl,
    }

    // Background cleanup
    go func() {
        ticker := time.NewTicker(ttl / 10)
        defer ticker.Stop()

        for range ticker.C {
            l.cleanup()
        }
    }()

    return l
}

func (l *expiringMockLocker) cleanup() {
    l.mu.Lock()
    defer l.mu.Unlock()

    now := time.Now()
    for key, acquired := range l.locks {
        if now.Sub(acquired) > l.ttl {
            delete(l.locks, key)
        }
    }
}

func (l *expiringMockLocker) Lock(ctx context.Context, key string) (gocron.Lock, error) {
    l.mu.Lock()
    defer l.mu.Unlock()

    if _, exists := l.locks[key]; exists {
        return nil, errors.New("already locked")
    }

    l.locks[key] = time.Now()
    return &expiringMockLock{locker: l, key: key}, nil
}

type expiringMockLock struct {
    locker *expiringMockLocker
    key    string
}

func (l *expiringMockLock) Unlock(ctx context.Context) error {
    l.locker.mu.Lock()
    defer l.locker.mu.Unlock()
    delete(l.locker.locks, l.key)
    return nil
}

func TestLockExpiry(t *testing.T) {
    locker := NewExpiringMockLocker(200 * time.Millisecond)

    // Acquire lock
    lock, err := locker.Lock(context.Background(), "test-key")
    require.NoError(t, err)

    // Should still be locked
    _, err = locker.Lock(context.Background(), "test-key")
    assert.Error(t, err)

    // Wait for expiry
    time.Sleep(250 * time.Millisecond)

    // Should be able to acquire again
    lock2, err := locker.Lock(context.Background(), "test-key")
    assert.NoError(t, err)

    lock2.Unlock(context.Background())
}

Test Multiple Instances

func TestMultipleInstances(t *testing.T) {
    locker := NewMockLocker()

    numInstances := 5
    schedulers := make([]gocron.Scheduler, numInstances)
    executions := make([]int, numInstances)
    var mu sync.Mutex

    for i := 0; i < numInstances; i++ {
        s, _ := gocron.NewScheduler(gocron.WithDistributedLocker(locker))
        defer s.Shutdown()

        idx := i
        s.NewJob(
            gocron.DurationJob(50*time.Millisecond),
            gocron.NewTask(func() {
                mu.Lock()
                defer mu.Unlock()
                executions[idx]++
                time.Sleep(20 * time.Millisecond)
            }),
            gocron.WithName("shared-job"),
        )

        schedulers[i] = s
    }

    // Start all schedulers
    for _, s := range schedulers {
        s.Start()
    }

    time.Sleep(500 * time.Millisecond)

    mu.Lock()
    defer mu.Unlock()

    total := 0
    for i, count := range executions {
        t.Logf("Instance %d: %d executions", i, count)
        total += count
    }

    // Should have reasonable total executions
    assert.Greater(t, total, 5)
    assert.Less(t, total, 15)

    // No single instance should have all executions (reasonable fairness)
    for _, count := range executions {
        assert.Less(t, count, total)
    }
}

Integration Testing

Using Testcontainers

import (
    "context"
    "testing"

    "github.com/redis/go-redis/v9"
    "github.com/testcontainers/testcontainers-go"
    "github.com/testcontainers/testcontainers-go/wait"
)

func TestWithRealRedis(t *testing.T) {
    ctx := context.Background()

    // Start Redis container
    req := testcontainers.ContainerRequest{
        Image:        "redis:7",
        ExposedPorts: []string{"6379/tcp"},
        WaitingFor:   wait.ForLog("Ready to accept connections"),
    }

    redisC, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
        ContainerRequest: req,
        Started:          true,
    })
    require.NoError(t, err)
    defer redisC.Terminate(ctx)

    // Get Redis address
    host, _ := redisC.Host(ctx)
    port, _ := redisC.MappedPort(ctx, "6379")

    // Create Redis client
    client := redis.NewClient(&redis.Options{
        Addr: fmt.Sprintf("%s:%s", host, port.Port()),
    })
    defer client.Close()

    // Create real locker
    locker := NewRedisLocker(client)

    // Test with real locker
    s1, _ := gocron.NewScheduler(gocron.WithDistributedLocker(locker))
    s2, _ := gocron.NewScheduler(gocron.WithDistributedLocker(locker))
    defer s1.Shutdown()
    defer s2.Shutdown()

    // ... test distributed behavior with real Redis
}

Best Practices

1. Test Both Leader and Follower

func TestLeaderAndFollower(t *testing.T) {
    t.Run("as leader", func(t *testing.T) {
        elector := NewMockElector(true)
        // ... test leader behavior
    })

    t.Run("as follower", func(t *testing.T) {
        elector := NewMockElector(false)
        // ... test follower behavior
    })
}

2. Test Leadership Transitions

func TestTransitions(t *testing.T) {
    elector := NewMockElector(true)

    // Test becoming follower
    elector.SetLeader(false)

    // Test becoming leader again
    elector.SetLeader(true)
}

3. Test Lock Contention

func TestContention(t *testing.T) {
    locker := NewMockLocker()

    // Create multiple schedulers
    // Verify only one executes at a time
}

4. Use Table-Driven Tests

func TestDistributedScenarios(t *testing.T) {
    tests := []struct {
        name          string
        numInstances  int
        isLeader      []bool
        expectedTotal int
    }{
        {"single leader", 3, []bool{true, false, false}, 5},
        {"no leader", 3, []bool{false, false, false}, 0},
        {"all leaders", 3, []bool{true, true, true}, 15},
    }

    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            // ... test scenario
        })
    }
}

5. Add Timeouts

func TestWithTimeout(t *testing.T) {
    timeout := time.After(5 * time.Second)
    done := make(chan bool)

    go func() {
        // ... test logic
        done <- true
    }()

    select {
    case <-done:
        // Test completed
    case <-timeout:
        t.Fatal("Test timed out")
    }
}

Related Documentation

  • Leader Election — Elector implementations
  • Distributed Locking — Locker implementations
  • API: TypesElector, Locker interfaces

Install with Tessl CLI

npx tessl i tessl/golang-github-com-go-co-op-gocron-v2@2.19.1

docs

index.md

tile.json