A Golang job scheduling library that lets you run Go functions at pre-determined intervals using cron expressions, fixed durations, daily, weekly, monthly, or one-time schedules with support for distributed deployments.
Mock electors and lockers for testing distributed gocron deployments.
Testing distributed systems is challenging. gocron's elector and locker interfaces make it easy to test distributed behavior without external dependencies (etcd, Redis, etc.) by using mock implementations.
import (
"context"
"errors"
"sync"
"github.com/go-co-op/gocron/v2"
)
type mockElector struct {
isLeader bool
mu sync.RWMutex
}
func NewMockElector(isLeader bool) *mockElector {
return &mockElector{isLeader: isLeader}
}
func (e *mockElector) IsLeader(ctx context.Context) error {
e.mu.RLock()
defer e.mu.RUnlock()
if !e.isLeader {
return errors.New("not leader")
}
return nil
}
func (e *mockElector) SetLeader(isLeader bool) {
e.mu.Lock()
defer e.mu.Unlock()
e.isLeader = isLeader
}func TestLeaderElection(t *testing.T) {
elector := NewMockElector(true)
s, err := gocron.NewScheduler(
gocron.WithDistributedElector(elector),
)
require.NoError(t, err)
defer s.Shutdown()
executed := false
s.NewJob(
gocron.DurationJob(time.Second),
gocron.NewTask(func() {
executed = true
}),
)
s.Start()
time.Sleep(1500 * time.Millisecond)
// Job should execute (is leader)
assert.True(t, executed)
// Simulate leadership loss
executed = false
elector.SetLeader(false)
time.Sleep(1500 * time.Millisecond)
// Job should not execute (not leader)
assert.False(t, executed)
}type trackingMockElector struct {
isLeader bool
callCount int
mu sync.RWMutex
}
func (e *trackingMockElector) IsLeader(ctx context.Context) error {
e.mu.Lock()
defer e.mu.Unlock()
e.callCount++
if !e.isLeader {
return errors.New("not leader")
}
return nil
}
func (e *trackingMockElector) CallCount() int {
e.mu.RLock()
defer e.mu.RUnlock()
return e.callCount
}
func TestElectorCallCount(t *testing.T) {
elector := &trackingMockElector{isLeader: true}
s, _ := gocron.NewScheduler(gocron.WithDistributedElector(elector))
defer s.Shutdown()
s.NewJob(gocron.DurationJob(100*time.Millisecond), gocron.NewTask(func() {}))
s.Start()
time.Sleep(350 * time.Millisecond)
// Should have checked leadership 3+ times
assert.GreaterOrEqual(t, elector.CallCount(), 3)
}type mockLocker struct {
locks map[string]bool
mu sync.Mutex
}
func NewMockLocker() *mockLocker {
return &mockLocker{
locks: make(map[string]bool),
}
}
func (l *mockLocker) Lock(ctx context.Context, key string) (gocron.Lock, error) {
l.mu.Lock()
defer l.mu.Unlock()
if l.locks[key] {
return nil, errors.New("already locked")
}
l.locks[key] = true
return &mockLock{locker: l, key: key}, nil
}
type mockLock struct {
locker *mockLocker
key string
}
func (l *mockLock) Unlock(ctx context.Context) error {
l.locker.mu.Lock()
defer l.locker.mu.Unlock()
delete(l.locker.locks, l.key)
return nil
}func TestDistributedLocking(t *testing.T) {
locker := NewMockLocker()
// Create two schedulers sharing the same locker
s1, _ := gocron.NewScheduler(gocron.WithDistributedLocker(locker))
s2, _ := gocron.NewScheduler(gocron.WithDistributedLocker(locker))
defer s1.Shutdown()
defer s2.Shutdown()
execution1 := 0
execution2 := 0
// Both schedulers add the same job
s1.NewJob(
gocron.DurationJob(100*time.Millisecond),
gocron.NewTask(func() {
execution1++
time.Sleep(50 * time.Millisecond)
}),
gocron.WithName("shared-job"),
)
s2.NewJob(
gocron.DurationJob(100*time.Millisecond),
gocron.NewTask(func() {
execution2++
time.Sleep(50 * time.Millisecond)
}),
gocron.WithName("shared-job"),
)
s1.Start()
s2.Start()
time.Sleep(500 * time.Millisecond)
// Only one scheduler should have executed the job
total := execution1 + execution2
assert.Greater(t, total, 0, "At least one execution")
assert.Less(t, total, 10, "Not both executing every time")
// One scheduler should dominate (or fair split)
t.Logf("Scheduler 1 executions: %d", execution1)
t.Logf("Scheduler 2 executions: %d", execution2)
}type trackingMockLocker struct {
locks map[string]bool
lockAttempts map[string]int
lockSuccesses map[string]int
mu sync.Mutex
}
func NewTrackingMockLocker() *trackingMockLocker {
return &trackingMockLocker{
locks: make(map[string]bool),
lockAttempts: make(map[string]int),
lockSuccesses: make(map[string]int),
}
}
func (l *trackingMockLocker) Lock(ctx context.Context, key string) (gocron.Lock, error) {
l.mu.Lock()
defer l.mu.Unlock()
l.lockAttempts[key]++
if l.locks[key] {
return nil, errors.New("already locked")
}
l.locks[key] = true
l.lockSuccesses[key]++
return &mockLock{locker: l, key: key}, nil
}
func (l *trackingMockLocker) Stats(key string) (attempts, successes int) {
l.mu.Lock()
defer l.mu.Unlock()
return l.lockAttempts[key], l.lockSuccesses[key]
}
func TestLockContention(t *testing.T) {
locker := NewTrackingMockLocker()
s1, _ := gocron.NewScheduler(gocron.WithDistributedLocker(locker))
s2, _ := gocron.NewScheduler(gocron.WithDistributedLocker(locker))
defer s1.Shutdown()
defer s2.Shutdown()
// Both add same job
for _, s := range []gocron.Scheduler{s1, s2} {
s.NewJob(
gocron.DurationJob(100*time.Millisecond),
gocron.NewTask(func() {
time.Sleep(50 * time.Millisecond)
}),
gocron.WithName("contended-job"),
)
}
s1.Start()
s2.Start()
time.Sleep(550 * time.Millisecond)
attempts, successes := locker.Stats("contended-job")
t.Logf("Lock attempts: %d, successes: %d", attempts, successes)
// Should have contention (attempts > successes)
assert.Greater(t, attempts, successes)
}func TestLeadershipTransition(t *testing.T) {
elector := NewMockElector(true)
s, _ := gocron.NewScheduler(gocron.WithDistributedElector(elector))
defer s.Shutdown()
var executions []time.Time
var mu sync.Mutex
s.NewJob(
gocron.DurationJob(100*time.Millisecond),
gocron.NewTask(func() {
mu.Lock()
defer mu.Unlock()
executions = append(executions, time.Now())
}),
)
s.Start()
// Leader for 300ms
time.Sleep(300 * time.Millisecond)
elector.SetLeader(false)
// Not leader for 300ms
time.Sleep(300 * time.Millisecond)
elector.SetLeader(true)
// Leader again for 300ms
time.Sleep(300 * time.Millisecond)
mu.Lock()
defer mu.Unlock()
// Should have executions in first and third periods, none in middle
t.Logf("Executions: %d", len(executions))
assert.GreaterOrEqual(t, len(executions), 4) // ~3 in first, ~3 in third
assert.LessOrEqual(t, len(executions), 8)
}type expiringMockLocker struct {
locks map[string]time.Time
ttl time.Duration
mu sync.Mutex
}
func NewExpiringMockLocker(ttl time.Duration) *expiringMockLocker {
l := &expiringMockLocker{
locks: make(map[string]time.Time),
ttl: ttl,
}
// Background cleanup
go func() {
ticker := time.NewTicker(ttl / 10)
defer ticker.Stop()
for range ticker.C {
l.cleanup()
}
}()
return l
}
func (l *expiringMockLocker) cleanup() {
l.mu.Lock()
defer l.mu.Unlock()
now := time.Now()
for key, acquired := range l.locks {
if now.Sub(acquired) > l.ttl {
delete(l.locks, key)
}
}
}
func (l *expiringMockLocker) Lock(ctx context.Context, key string) (gocron.Lock, error) {
l.mu.Lock()
defer l.mu.Unlock()
if _, exists := l.locks[key]; exists {
return nil, errors.New("already locked")
}
l.locks[key] = time.Now()
return &expiringMockLock{locker: l, key: key}, nil
}
type expiringMockLock struct {
locker *expiringMockLocker
key string
}
func (l *expiringMockLock) Unlock(ctx context.Context) error {
l.locker.mu.Lock()
defer l.locker.mu.Unlock()
delete(l.locker.locks, l.key)
return nil
}
func TestLockExpiry(t *testing.T) {
locker := NewExpiringMockLocker(200 * time.Millisecond)
// Acquire lock
lock, err := locker.Lock(context.Background(), "test-key")
require.NoError(t, err)
// Should still be locked
_, err = locker.Lock(context.Background(), "test-key")
assert.Error(t, err)
// Wait for expiry
time.Sleep(250 * time.Millisecond)
// Should be able to acquire again
lock2, err := locker.Lock(context.Background(), "test-key")
assert.NoError(t, err)
lock2.Unlock(context.Background())
}func TestMultipleInstances(t *testing.T) {
locker := NewMockLocker()
numInstances := 5
schedulers := make([]gocron.Scheduler, numInstances)
executions := make([]int, numInstances)
var mu sync.Mutex
for i := 0; i < numInstances; i++ {
s, _ := gocron.NewScheduler(gocron.WithDistributedLocker(locker))
defer s.Shutdown()
idx := i
s.NewJob(
gocron.DurationJob(50*time.Millisecond),
gocron.NewTask(func() {
mu.Lock()
defer mu.Unlock()
executions[idx]++
time.Sleep(20 * time.Millisecond)
}),
gocron.WithName("shared-job"),
)
schedulers[i] = s
}
// Start all schedulers
for _, s := range schedulers {
s.Start()
}
time.Sleep(500 * time.Millisecond)
mu.Lock()
defer mu.Unlock()
total := 0
for i, count := range executions {
t.Logf("Instance %d: %d executions", i, count)
total += count
}
// Should have reasonable total executions
assert.Greater(t, total, 5)
assert.Less(t, total, 15)
// No single instance should have all executions (reasonable fairness)
for _, count := range executions {
assert.Less(t, count, total)
}
}import (
"context"
"testing"
"github.com/redis/go-redis/v9"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
)
func TestWithRealRedis(t *testing.T) {
ctx := context.Background()
// Start Redis container
req := testcontainers.ContainerRequest{
Image: "redis:7",
ExposedPorts: []string{"6379/tcp"},
WaitingFor: wait.ForLog("Ready to accept connections"),
}
redisC, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
})
require.NoError(t, err)
defer redisC.Terminate(ctx)
// Get Redis address
host, _ := redisC.Host(ctx)
port, _ := redisC.MappedPort(ctx, "6379")
// Create Redis client
client := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%s", host, port.Port()),
})
defer client.Close()
// Create real locker
locker := NewRedisLocker(client)
// Test with real locker
s1, _ := gocron.NewScheduler(gocron.WithDistributedLocker(locker))
s2, _ := gocron.NewScheduler(gocron.WithDistributedLocker(locker))
defer s1.Shutdown()
defer s2.Shutdown()
// ... test distributed behavior with real Redis
}func TestLeaderAndFollower(t *testing.T) {
t.Run("as leader", func(t *testing.T) {
elector := NewMockElector(true)
// ... test leader behavior
})
t.Run("as follower", func(t *testing.T) {
elector := NewMockElector(false)
// ... test follower behavior
})
}func TestTransitions(t *testing.T) {
elector := NewMockElector(true)
// Test becoming follower
elector.SetLeader(false)
// Test becoming leader again
elector.SetLeader(true)
}func TestContention(t *testing.T) {
locker := NewMockLocker()
// Create multiple schedulers
// Verify only one executes at a time
}func TestDistributedScenarios(t *testing.T) {
tests := []struct {
name string
numInstances int
isLeader []bool
expectedTotal int
}{
{"single leader", 3, []bool{true, false, false}, 5},
{"no leader", 3, []bool{false, false, false}, 0},
{"all leaders", 3, []bool{true, true, true}, 15},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// ... test scenario
})
}
}func TestWithTimeout(t *testing.T) {
timeout := time.After(5 * time.Second)
done := make(chan bool)
go func() {
// ... test logic
done <- true
}()
select {
case <-done:
// Test completed
case <-timeout:
t.Fatal("Test timed out")
}
}Elector, Locker interfacesInstall with Tessl CLI
npx tessl i tessl/golang-github-com-go-co-op-gocron-v2docs
api
examples
guides