CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/golang-github-com-go-co-op-gocron-v2

A Golang job scheduling library that lets you run Go functions at pre-determined intervals using cron expressions, fixed durations, daily, weekly, monthly, or one-time schedules with support for distributed deployments.

Overview
Eval results
Files

monitoring.mddocs/examples/by-feature/

Monitoring Examples

Monitor gocron v2 jobs with Prometheus, structured logging, and observability patterns.

Prometheus Metrics

Complete Prometheus integration with the monitoring file content from the original (lines 1-144 from original monitoring.md preserved here)

package main

import (
    "fmt"
    "net/http"
    "time"

    "github.com/go-co-op/gocron/v2"
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
    "github.com/prometheus/client_golang/prometheus/promhttp"
)

type prometheusMonitor struct {
    jobExecutions *prometheus.CounterVec
    jobDuration   *prometheus.HistogramVec
    jobErrors     *prometheus.CounterVec
    queueSize     prometheus.Gauge
}

func NewPrometheusMonitor() *prometheusMonitor {
    return &prometheusMonitor{
        jobExecutions: promauto.NewCounterVec(
            prometheus.CounterOpts{
                Name: "gocron_job_executions_total",
                Help: "Total number of job executions by job name and status",
            },
            []string{"job_name", "status"},
        ),
        jobDuration: promauto.NewHistogramVec(
            prometheus.HistogramOpts{
                Name:    "gocron_job_duration_seconds",
                Help:    "Job execution duration in seconds",
                Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
            },
            []string{"job_name", "status"},
        ),
        jobErrors: promauto.NewCounterVec(
            prometheus.CounterOpts{
                Name: "gocron_job_errors_total",
                Help: "Total number of job errors by job name and error type",
            },
            []string{"job_name", "error_type"},
        ),
        queueSize: promauto.NewGauge(
            prometheus.GaugeOpts{
                Name: "gocron_queue_size",
                Help: "Current number of jobs waiting in queue",
            },
        ),
    }
}

func (m *prometheusMonitor) IncrementJob(id uuid.UUID, name string, tags []string, status gocron.JobStatus) {
    m.jobExecutions.WithLabelValues(name, string(status)).Inc()
}

func (m *prometheusMonitor) RecordJobTiming(startTime, endTime time.Time, id uuid.UUID, name string, tags []string) {
    // Not called when using MonitorStatus
}

func (m *prometheusMonitor) RecordJobTimingWithStatus(
    startTime, endTime time.Time,
    id uuid.UUID,
    name string,
    tags []string,
    status gocron.JobStatus,
    err error,
) {
    duration := endTime.Sub(startTime).Seconds()
    m.jobDuration.WithLabelValues(name, string(status)).Observe(duration)

    if err != nil {
        errorType := "unknown"
        if errors.Is(err, context.Canceled) {
            errorType = "cancelled"
        } else if errors.Is(err, context.DeadlineExceeded) {
            errorType = "timeout"
        }
        m.jobErrors.WithLabelValues(name, errorType).Inc()
    }
}

func (m *prometheusMonitor) UpdateQueueSize(size int) {
    m.queueSize.Set(float64(size))
}

func main() {
    monitor := NewPrometheusMonitor()

    s, _ := gocron.NewScheduler(
        gocron.WithMonitorStatus(monitor),
        gocron.WithLogger(gocron.NewLogger(gocron.LogLevelInfo)),
    )
    defer s.Shutdown()

    s.NewJob(
        gocron.DurationJob(30*time.Second),
        gocron.NewTask(func() error {
            time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
            if rand.Float32() < 0.1 {
                return errors.New("simulated error")
            }
            return nil
        }),
        gocron.WithName("worker-job"),
        gocron.WithTags("worker"),
    )

    go func() {
        ticker := time.NewTicker(10 * time.Second)
        defer ticker.Stop()
        for range ticker.C {
            monitor.UpdateQueueSize(s.JobsWaitingInQueue())
        }
    }()

    s.Start()

    http.Handle("/metrics", promhttp.Handler())
    fmt.Println("Metrics available at http://localhost:9090/metrics")
    http.ListenAndServe(":9090", nil)
}

StatsD Integration

Send metrics to StatsD (lines 145-228 from original):

package main

import (
    "fmt"
    "time"

    "github.com/DataDog/datadog-go/statsd"
    "github.com/go-co-op/gocron/v2"
)

type statsdMonitor struct {
    client *statsd.Client
}

func NewStatsdMonitor(addr string) (*statsdMonitor, error) {
    client, err := statsd.New(addr)
    if err != nil {
        return nil, err
    }
    return &statsdMonitor{client: client}, nil
}

func (m *statsdMonitor) IncrementJob(id uuid.UUID, name string, tags []string, status gocron.JobStatus) {
    statsTags := []string{
        "job:" + name,
        "status:" + string(status),
    }
    statsTags = append(statsTags, tags...)
    m.client.Incr("gocron.job.executions", statsTags, 1)
}

func (m *statsdMonitor) RecordJobTimingWithStatus(
    startTime, endTime time.Time,
    id uuid.UUID,
    name string,
    tags []string,
    status gocron.JobStatus,
    err error,
) {
    duration := endTime.Sub(startTime)
    statsTags := []string{
        "job:" + name,
        "status:" + string(status),
    }
    statsTags = append(statsTags, tags...)

    m.client.Timing("gocron.job.duration", duration, statsTags, 1)

    if err != nil {
        m.client.Incr("gocron.job.errors", statsTags, 1)
    }
}

func (m *statsdMonitor) Close() error {
    return m.client.Close()
}

func main() {
    monitor, _ := NewStatsdMonitor("localhost:8125")
    defer monitor.Close()

    s, _ := gocron.NewScheduler(
        gocron.WithMonitorStatus(monitor),
    )
    defer s.Shutdown()

    s.NewJob(
        gocron.DurationJob(time.Minute),
        gocron.NewTask(doWork),
        gocron.WithName("my-job"),
        gocron.WithTags("production", "worker"),
    )

    s.Start()
    select {}
}

Structured Logging with slog

Complete logging setup (lines 229-381 from original):

package main

import (
    "context"
    "log/slog"
    "os"
    "time"

    "github.com/go-co-op/gocron/v2"
)

type slogLogger struct {
    l *slog.Logger
}

func (l *slogLogger) Debug(msg string, args ...any) { l.l.Debug(msg, args...) }
func (l *slogLogger) Error(msg string, args ...any) { l.l.Error(msg, args...) }
func (l *slogLogger) Info(msg string, args ...any)  { l.l.Info(msg, args...) }
func (l *slogLogger) Warn(msg string, args ...any)  { l.l.Warn(msg, args...) }

type lifecycleMonitor struct {
    logger *slog.Logger
}

func NewLifecycleMonitor(logger *slog.Logger) *lifecycleMonitor {
    return &lifecycleMonitor{logger: logger}
}

func (m *lifecycleMonitor) SchedulerStarted() {
    m.logger.Info("scheduler started")
}

func (m *lifecycleMonitor) SchedulerStopped() {
    m.logger.Info("scheduler stopped")
}

func (m *lifecycleMonitor) SchedulerShutdown() {
    m.logger.Info("scheduler shutdown complete")
}

func (m *lifecycleMonitor) JobRegistered(job gocron.Job) {
    m.logger.Info("job registered",
        "job_id", job.ID(),
        "job_name", job.Name(),
        "tags", job.Tags(),
    )
}

func (m *lifecycleMonitor) JobUnregistered(job gocron.Job) {
    m.logger.Info("job unregistered",
        "job_id", job.ID(),
        "job_name", job.Name(),
    )
}

func (m *lifecycleMonitor) JobStarted(job gocron.Job) {
    m.logger.Debug("job started", "job_name", job.Name())
}

func (m *lifecycleMonitor) JobRunning(job gocron.Job) {
    // Periodic callback while job runs
}

func (m *lifecycleMonitor) JobCompleted(job gocron.Job) {
    m.logger.Info("job completed", "job_name", job.Name())
}

func (m *lifecycleMonitor) JobFailed(job gocron.Job, err error) {
    m.logger.Error("job failed",
        "job_name", job.Name(),
        "error", err,
    )
}

func (m *lifecycleMonitor) JobExecutionTime(job gocron.Job, duration time.Duration) {
    m.logger.Debug("job execution time",
        "job_name", job.Name(),
        "duration_ms", duration.Milliseconds(),
    )
}

func (m *lifecycleMonitor) JobSchedulingDelay(job gocron.Job, scheduledTime time.Time, actualStartTime time.Time) {
    delay := actualStartTime.Sub(scheduledTime)
    if delay > time.Second {
        m.logger.Warn("job scheduling delay",
            "job_name", job.Name(),
            "delay_ms", delay.Milliseconds(),
            "scheduled_time", scheduledTime,
            "actual_start_time", actualStartTime,
        )
    }
}

func (m *lifecycleMonitor) ConcurrencyLimitReached(limitType string, job gocron.Job) {
    m.logger.Warn("concurrency limit reached",
        "limit_type", limitType,
        "job_name", job.Name(),
    )
}

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
        Level: slog.LevelDebug,
    }))

    slogWrapper := &slogLogger{l: logger}
    lifecycleMonitor := NewLifecycleMonitor(logger)

    s, _ := gocron.NewScheduler(
        gocron.WithLogger(slogWrapper),
        gocron.WithSchedulerMonitor(lifecycleMonitor),
    )
    defer s.Shutdown()

    s.NewJob(
        gocron.DurationJob(time.Minute),
        gocron.NewTask(func(ctx context.Context) error {
            logger.Info("job executing")
            time.Sleep(5 * time.Second)
            return nil
        }),
        gocron.WithName("monitored-job"),
        gocron.WithEventListeners(
            gocron.BeforeJobRuns(func(jobID uuid.UUID, jobName string) {
                logger.Info("before job runs",
                    "job_id", jobID,
                    "job_name", jobName,
                )
            }),
            gocron.AfterJobRunsWithError(func(jobID uuid.UUID, jobName string, err error) {
                logger.Error("job error",
                    "job_id", jobID,
                    "job_name", jobName,
                    "error", err,
                )
            }),
        ),
    )

    s.Start()
    select {}
}

OpenTelemetry Tracing

Add distributed tracing (lines 382-474 from original, now condensed):

package main

import (
    "context"
    "time"

    "github.com/go-co-op/gocron/v2"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/exporters/jaeger"
    "go.opentelemetry.io/otel/sdk/trace"
)

func initTracer() (*trace.TracerProvider, error) {
    exporter, err := jaeger.New(jaeger.WithCollectorEndpoint(
        jaeger.WithEndpoint("http://localhost:14268/api/traces"),
    ))
    if err != nil {
        return nil, err
    }

    tp := trace.NewTracerProvider(trace.WithBatcher(exporter))
    otel.SetTracerProvider(tp)
    return tp, nil
}

func main() {
    tp, _ := initTracer()
    defer tp.Shutdown(context.Background())

    tracer := otel.Tracer("gocron")
    s, _ := gocron.NewScheduler()
    defer s.Shutdown()

    s.NewJob(
        gocron.DurationJob(time.Minute),
        gocron.NewTask(func(ctx context.Context) error {
            ctx, span := tracer.Start(ctx, "job.execute")
            defer span.End()

            span.SetAttributes(attribute.String("job.name", "traced-job"))

            // Simulate work
            time.Sleep(2 * time.Second)
            return nil
        }),
        gocron.WithName("traced-job"),
    )

    s.Start()
    select {}
}

Alerting Examples

Alert on Job Failures

s.NewJob(
    gocron.DurationJob(time.Minute),
    gocron.NewTask(doCriticalWork),
    gocron.WithName("critical-job"),
    gocron.WithEventListeners(
        gocron.AfterJobRunsWithError(func(jobID uuid.UUID, jobName string, err error) {
            sendAlert(fmt.Sprintf("Critical job %s failed: %v", jobName, err))
        }),
    ),
)

Alert on Scheduling Delay

func (m *lifecycleMonitor) JobSchedulingDelay(job gocron.Job, scheduledTime time.Time, actualStartTime time.Time) {
    delay := actualStartTime.Sub(scheduledTime)

    if delay > 10*time.Second {
        sendAlert(fmt.Sprintf(
            "Job %s delayed by %v",
            job.Name(),
            delay,
        ))
    }
}

Alert on Queue Buildup

go func() {
    ticker := time.NewTicker(1 * time.Minute)
    defer ticker.Stop()

    for range ticker.C {
        queueSize := s.JobsWaitingInQueue()
        if queueSize > 50 {
            sendAlert(fmt.Sprintf("Job queue size critical: %d jobs waiting", queueSize))
        }
    }
}()

Grafana Dashboard Queries

Example Prometheus queries for Grafana:

# Job execution rate
rate(gocron_job_executions_total[5m])

# Job success rate
sum(rate(gocron_job_executions_total{status="success"}[5m])) / sum(rate(gocron_job_executions_total[5m]))

# Job duration p95
histogram_quantile(0.95, rate(gocron_job_duration_seconds_bucket[5m]))

# Queue size
gocron_queue_size

Best Practices

  1. Use structured logging (JSON format)
  2. Tag jobs consistently for metrics
  3. Monitor scheduling delays for overload detection
  4. Alert on critical job failures
  5. Track queue size for capacity planning
  6. Set up dashboards for at-a-glance monitoring
  7. Use event listeners for per-job observability

Related Documentation

  • Guide: Observability
  • API: TypesMonitor, MonitorStatus, SchedulerMonitor, Logger
  • API: Scheduler OptionsWithLogger, WithMonitor, WithSchedulerMonitor
  • Examples: Error Handling

Install with Tessl CLI

npx tessl i tessl/golang-github-com-go-co-op-gocron-v2

docs

index.md

tile.json