CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/golang-github-com-go-co-op-gocron-v2

A Golang job scheduling library that lets you run Go functions at pre-determined intervals using cron expressions, fixed durations, daily, weekly, monthly, or one-time schedules with support for distributed deployments.

Overview
Eval results
Files

observability.mddocs/examples/guides/

Examples: Observability

Practical observability and monitoring examples.

Prometheus Metrics

package main

import (
    "fmt"
    "time"
    "net/http"
    "github.com/go-co-op/gocron/v2"
    "github.com/google/uuid"
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promhttp"
)

var (
    jobExecutions = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "gocron_job_executions_total",
            Help: "Total number of job executions",
        },
        []string{"job_name"},
    )

    jobDuration = prometheus.NewHistogramVec(
        prometheus.HistogramOpts{
            Name:    "gocron_job_duration_seconds",
            Help:    "Job execution duration in seconds",
            Buckets: prometheus.DefBuckets,
        },
        []string{"job_name"},
    )

    jobErrors = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "gocron_job_errors_total",
            Help: "Total number of job errors",
        },
        []string{"job_name"},
    )
)

func init() {
    prometheus.MustRegister(jobExecutions)
    prometheus.MustRegister(jobDuration)
    prometheus.MustRegister(jobErrors)
}

type prometheusMonitor struct{}

func (m *prometheusMonitor) RecordJobTiming(
    start time.Time,
    duration time.Duration,
    jobID uuid.UUID,
    jobName string,
    tags []string,
) {
    jobExecutions.WithLabelValues(jobName).Inc()
    jobDuration.WithLabelValues(jobName).Observe(duration.Seconds())
}

func (m *prometheusMonitor) JobCompleted(jobID uuid.UUID, job gocron.Job, err error) {
    if err != nil {
        jobErrors.WithLabelValues(job.Name()).Inc()
    }
}

func (m *prometheusMonitor) JobScheduled(jobID uuid.UUID, job gocron.Job)   {}
func (m *prometheusMonitor) JobUnscheduled(jobID uuid.UUID, job gocron.Job) {}
func (m *prometheusMonitor) JobStarted(jobID uuid.UUID, job gocron.Job)     {}
func (m *prometheusMonitor) ConcurrencyLimitReached(limitType string, job gocron.Job) {}

func main() {
    s, _ := gocron.NewScheduler(
        gocron.WithMonitor(&prometheusMonitor{}),
        gocron.WithSchedulerMonitor(&prometheusMonitor{}),
    )
    defer s.Shutdown()

    s.NewJob(
        gocron.DurationJob(5*time.Second),
        gocron.NewTask(func() error {
            time.Sleep(time.Second)
            if time.Now().Unix()%3 == 0 {
                return fmt.Errorf("simulated error")
            }
            return nil
        }),
        gocron.WithName("sample-job"),
    )

    // Expose metrics
    http.Handle("/metrics", promhttp.Handler())
    go http.ListenAndServe(":2112", nil)

    s.Start()
    select {}
}

Structured Logging with Zap

import (
    "go.uber.org/zap"
)

type zapLogger struct {
    logger *zap.SugaredLogger
}

func newZapLogger() *zapLogger {
    logger, _ := zap.NewProduction()
    return &zapLogger{logger: logger.Sugar()}
}

func (l *zapLogger) Debug(msg string, args ...any) {
    l.logger.Debugf(msg, args...)
}

func (l *zapLogger) Error(msg string, args ...any) {
    l.logger.Errorf(msg, args...)
}

func (l *zapLogger) Info(msg string, args ...any) {
    l.logger.Infof(msg, args...)
}

func (l *zapLogger) Warn(msg string, args ...any) {
    l.logger.Warnf(msg, args...)
}

func main() {
    s, _ := gocron.NewScheduler(
        gocron.WithLogger(newZapLogger()),
    )
    defer s.Shutdown()

    s.NewJob(
        gocron.DurationJob(time.Minute),
        gocron.NewTask(func() {
            fmt.Println("Working...")
        }),
    )

    s.Start()
    select {}
}

Complete Observability Setup

type observabilityStack struct {
    logger            *zap.SugaredLogger
    prometheusMonitor *prometheusMonitor
}

func newObservabilityStack() *observabilityStack {
    logger, _ := zap.NewProduction()

    return &observabilityStack{
        logger:            logger.Sugar(),
        prometheusMonitor: &prometheusMonitor{},
    }
}

// Implement Logger interface
func (o *observabilityStack) Debug(msg string, args ...any) {
    o.logger.Debugf(msg, args...)
}

func (o *observabilityStack) Error(msg string, args ...any) {
    o.logger.Errorf(msg, args...)
}

func (o *observabilityStack) Info(msg string, args ...any) {
    o.logger.Infof(msg, args...)
}

func (o *observabilityStack) Warn(msg string, args ...any) {
    o.logger.Warnf(msg, args...)
}

// Implement SchedulerMonitor interface
func (o *observabilityStack) JobScheduled(jobID uuid.UUID, job gocron.Job) {
    o.logger.Infow("job scheduled",
        "job_id", jobID.String(),
        "job_name", job.Name(),
    )
}

func (o *observabilityStack) JobStarted(jobID uuid.UUID, job gocron.Job) {
    o.logger.Infow("job started",
        "job_id", jobID.String(),
        "job_name", job.Name(),
    )
}

func (o *observabilityStack) JobCompleted(jobID uuid.UUID, job gocron.Job, err error) {
    if err != nil {
        o.logger.Errorw("job failed",
            "job_id", jobID.String(),
            "job_name", job.Name(),
            "error", err,
        )
        o.prometheusMonitor.JobCompleted(jobID, job, err)
    } else {
        o.logger.Infow("job completed",
            "job_id", jobID.String(),
            "job_name", job.Name(),
        )
    }
}

func (o *observabilityStack) JobUnscheduled(jobID uuid.UUID, job gocron.Job) {
    o.logger.Infow("job unscheduled",
        "job_id", jobID.String(),
        "job_name", job.Name(),
    )
}

func (o *observabilityStack) ConcurrencyLimitReached(limitType string, job gocron.Job) {
    o.logger.Warnw("concurrency limit reached",
        "limit_type", limitType,
        "job_name", job.Name(),
    )
}

// Implement Monitor interface
func (o *observabilityStack) RecordJobTiming(
    start time.Time,
    duration time.Duration,
    jobID uuid.UUID,
    jobName string,
    tags []string,
) {
    o.prometheusMonitor.RecordJobTiming(start, duration, jobID, jobName, tags)

    o.logger.Infow("job timing",
        "job_id", jobID.String(),
        "job_name", jobName,
        "duration_ms", duration.Milliseconds(),
        "tags", tags,
    )
}

func main() {
    obs := newObservabilityStack()

    s, _ := gocron.NewScheduler(
        gocron.WithLogger(obs),
        gocron.WithSchedulerMonitor(obs),
        gocron.WithMonitor(obs),
    )
    defer s.Shutdown()

    s.NewJob(
        gocron.DurationJob(10*time.Second),
        gocron.NewTask(func() error {
            time.Sleep(time.Second)
            return nil
        }),
        gocron.WithName("monitored-job"),
        gocron.WithTags("production", "critical"),
    )

    // Expose metrics
    http.Handle("/metrics", promhttp.Handler())
    go http.ListenAndServe(":2112", nil)

    s.Start()
    select {}
}

Alerting on Job Failures

type alertingMonitor struct {
    consecutiveFailures map[string]int
    threshold           int
    mu                  sync.Mutex
}

func newAlertingMonitor(threshold int) *alertingMonitor {
    return &alertingMonitor{
        consecutiveFailures: make(map[string]int),
        threshold:           threshold,
    }
}

func (m *alertingMonitor) JobCompleted(jobID uuid.UUID, job gocron.Job, err error) {
    m.mu.Lock()
    defer m.mu.Unlock()

    jobName := job.Name()

    if err != nil {
        m.consecutiveFailures[jobName]++

        if m.consecutiveFailures[jobName] >= m.threshold {
            m.sendAlert(fmt.Sprintf(
                "Job %s has failed %d times consecutively",
                jobName,
                m.consecutiveFailures[jobName],
            ))
        }
    } else {
        // Reset on success
        m.consecutiveFailures[jobName] = 0
    }
}

func (m *alertingMonitor) sendAlert(message string) {
    fmt.Printf("ALERT: %s\n", message)
    // Send to PagerDuty, Slack, etc.
}

func (m *alertingMonitor) JobScheduled(jobID uuid.UUID, job gocron.Job)         {}
func (m *alertingMonitor) JobUnscheduled(jobID uuid.UUID, job gocron.Job)       {}
func (m *alertingMonitor) JobStarted(jobID uuid.UUID, job gocron.Job)           {}
func (m *alertingMonitor) ConcurrencyLimitReached(limitType string, job gocron.Job) {}

func main() {
    s, _ := gocron.NewScheduler(
        gocron.WithSchedulerMonitor(newAlertingMonitor(3)),
    )
    defer s.Shutdown()

    s.NewJob(
        gocron.DurationJob(10*time.Second),
        gocron.NewTask(func() error {
            // Simulate failures
            if time.Now().Unix()%2 == 0 {
                return fmt.Errorf("simulated failure")
            }
            return nil
        }),
        gocron.WithName("flaky-job"),
    )

    s.Start()
    select {}
}

See Also

  • Observability Guide
  • Logging Guide
  • Metrics Guide
  • Lifecycle Monitoring Guide

Install with Tessl CLI

npx tessl i tessl/golang-github-com-go-co-op-gocron-v2

docs

index.md

tile.json