tessl install tessl/golang-cloud-google-com--go--bigquery@1.72.0Google Cloud BigQuery client library providing comprehensive Go APIs for querying, loading data, managing datasets and tables, streaming inserts, and accessing BigQuery's ecosystem of services including Storage, Analytics Hub, Data Transfer, and Migration APIs
This document covers managing BigQuery jobs, including monitoring status, canceling jobs, and retrieving job information.
All BigQuery operations that take time to complete (queries, loads, extracts, copies) create jobs. Jobs can be monitored, canceled, and retrieved by ID.
type Job struct {
// Read-only fields
JobID string
Location string
Email string
}func (j *Job) ID() string
func (j *Job) Location() string
func (j *Job) Config() (JobConfig, error)
func (j *Job) SessionID() stringtype JobConfig interface {
isJobConfig()
}The JobConfig interface is implemented by configuration types for different job types:
*CopyConfig - Table copy operations*ExtractConfig - Data extract operations*LoadConfig - Data load operations*QueryConfig - Query operationsGet the configuration for a job:
config, err := job.Config()
if err != nil {
return err
}
// Type assert to the specific config type
switch c := config.(type) {
case *bigquery.QueryConfig:
fmt.Printf("Query: %s\n", c.Q)
case *bigquery.LoadConfig:
fmt.Printf("Loading to: %s\n", c.Dst.FullyQualifiedName())
case *bigquery.CopyConfig:
fmt.Printf("Copying to: %s\n", c.Dst.FullyQualifiedName())
case *bigquery.ExtractConfig:
fmt.Printf("Extracting from: %s\n", c.Src.FullyQualifiedName())
}Jobs are created by running queries, loads, extracts, or copies:
// Query job
q := client.Query("SELECT * FROM dataset.table")
job, err := q.Run(ctx)
// Load job
loader := table.LoaderFrom(gcsRef)
job, err := loader.Run(ctx)
// Extract job
extractor := table.ExtractorTo(gcsRef)
job, err := extractor.Run(ctx)
// Copy job
copier := dstTable.CopierFrom(srcTable)
job, err := copier.Run(ctx)func (c *Client) JobFromID(ctx context.Context, id string) (*Job, error)Retrieve a job using its ID (in client's location):
jobID := "my-job-id"
job, err := client.JobFromID(ctx, jobID)
if err != nil {
return err
}func (c *Client) JobFromIDLocation(ctx context.Context, id, location string) (*Job, error)jobID := "my-job-id"
location := "US"
job, err := client.JobFromIDLocation(ctx, jobID, location)func (c *Client) JobFromProject(ctx context.Context, projectID, jobID, location string) (*Job, error)projectID := "other-project"
jobID := "their-job-id"
location := "EU"
job, err := client.JobFromProject(ctx, projectID, jobID, location)type JobStatus struct {
State State
Done bool
Err error
Statistics *JobStatistics
}func (j *Job) Status(ctx context.Context) (*JobStatus, error)
func (j *Job) LastStatus() *JobStatustype State string
const (
Pending State = "PENDING"
Running State = "RUNNING"
Done State = "DONE"
)status, err := job.Status(ctx)
if err != nil {
return err
}
fmt.Printf("State: %s\n", status.State)
fmt.Printf("Done: %v\n", status.Done)
if status.Err != nil {
fmt.Printf("Error: %v\n", status.Err)
}func (j *Job) Wait(ctx context.Context) (*JobStatus, error)Wait with exponential backoff:
status, err := job.Wait(ctx)
if err != nil {
return err
}
if err := status.Err(); err != nil {
return err
}
fmt.Println("Job completed successfully")import "time"
for {
status, err := job.Status(ctx)
if err != nil {
return err
}
if status.Done {
if status.Err != nil {
return status.Err
}
break
}
time.Sleep(5 * time.Second)
}type JobStatistics struct {
CreationTime time.Time
StartTime time.Time
EndTime time.Time
TotalBytesProcessed int64
NumChildJobs int64
ParentJobID string
ScriptStatistics *ScriptStatistics
ReservationID string
TransactionInfo *TransactionInfo
SessionInfo *SessionInfo
FinalExecutionDuration time.Duration
TotalSlotMs int64
QuotaDeferments []string
Details StatisticsDetails
}type Statistics interface {
implementsStatistics()
}The Statistics interface is implemented by job-type-specific statistics types:
*QueryStatistics - Query job statistics*LoadStatistics - Load job statistics*ExtractStatistics - Extract job statistics*CopyStatistics - Copy job statistics (not separately defined, uses ExtractStatistics)status, err := job.Wait(ctx)
if err != nil {
return err
}
stats := status.Statistics
fmt.Printf("Creation time: %s\n", stats.CreationTime)
fmt.Printf("Start time: %s\n", stats.StartTime)
fmt.Printf("End time: %s\n", stats.EndTime)
fmt.Printf("Total bytes processed: %d\n", stats.TotalBytesProcessed)type QueryStatistics struct {
TotalBytesProcessed int64
TotalBytesBilled int64
BillingTier int64
CacheHit bool
DDLTargetTable *Table
DDLOperationPerformed string
StatementType string
TotalPartitionsProcessed int64
TotalSlotMs int64
ReferencedTables []*Table
Schema Schema
NumDMLAffectedRows int64
DMLStats *DMLStatistics
QueryPlan []*ExplainQueryStage
Timeline []*QueryTimelineIteration
UndeclaredQueryParameters []QueryParameter
BiEngineStatistics *BIEngineStatistics
TransactionInfo *TransactionInfo
}queryStats := status.Statistics.Details.(*bigquery.QueryStatistics)
fmt.Printf("Bytes billed: %d\n", queryStats.TotalBytesBilled)
fmt.Printf("Cache hit: %v\n", queryStats.CacheHit)
fmt.Printf("Statement type: %s\n", queryStats.StatementType)type LoadStatistics struct {
InputFileBytes int64
InputFiles int64
OutputBytes int64
OutputRows int64
BadRecords int64
}loadStats := status.Statistics.Details.(*bigquery.LoadStatistics)
fmt.Printf("Input files: %d\n", loadStats.InputFiles)
fmt.Printf("Output rows: %d\n", loadStats.OutputRows)
fmt.Printf("Bad records: %d\n", loadStats.BadRecords)type ExtractStatistics struct {
DestinationURIFileCounts []int64
}Extract statistics for export jobs:
extractStats := status.Statistics.Details.(*bigquery.ExtractStatistics)
fmt.Printf("Files created: %d\n", len(extractStats.DestinationURIFileCounts))
for i, count := range extractStats.DestinationURIFileCounts {
fmt.Printf(" URI %d: %d files\n", i, count)
}type DMLStatistics struct {
InsertedRowCount int64
DeletedRowCount int64
UpdatedRowCount int64
}DML statistics for INSERT, UPDATE, DELETE statements:
queryStats := status.Statistics.Details.(*bigquery.QueryStatistics)
if queryStats.DMLStats != nil {
fmt.Printf("Inserted rows: %d\n", queryStats.DMLStats.InsertedRowCount)
fmt.Printf("Deleted rows: %d\n", queryStats.DMLStats.DeletedRowCount)
fmt.Printf("Updated rows: %d\n", queryStats.DMLStats.UpdatedRowCount)
}type ExportDataStatistics struct {
FileCount int64
RowCount int64
}Statistics for EXPORT DATA statements within query jobs:
queryStats := status.Statistics.Details.(*bigquery.QueryStatistics)
if queryStats.ExportDataStatistics != nil {
fmt.Printf("Exported files: %d\n", queryStats.ExportDataStatistics.FileCount)
fmt.Printf("Exported rows: %d\n", queryStats.ExportDataStatistics.RowCount)
}type ExplainQueryStage struct {
CompletedParallelInputs int64
ComputeAvg time.Duration
ComputeMax time.Duration
ComputeRatioAvg float64
ComputeRatioMax float64
EndTime time.Time
ID int64
InputStages []int64
Name string
ParallelInputs int64
ReadAvg time.Duration
ReadMax time.Duration
ReadRatioAvg float64
ReadRatioMax float64
RecordsRead int64
RecordsWritten int64
ShuffleOutputBytes int64
ShuffleOutputBytesSpilled int64
StartTime time.Time
Status string
Steps []*ExplainQueryStep
WaitAvg time.Duration
WaitMax time.Duration
WaitRatioAvg float64
WaitRatioMax float64
WriteAvg time.Duration
WriteMax time.Duration
WriteRatioAvg float64
WriteRatioMax float64
}type ExplainQueryStep struct {
Kind string
Substeps []string
}Query execution stages and steps from the query plan:
queryStats := status.Statistics.Details.(*bigquery.QueryStatistics)
for _, stage := range queryStats.QueryPlan {
fmt.Printf("Stage %d: %s\n", stage.ID, stage.Name)
fmt.Printf(" Records read: %d\n", stage.RecordsRead)
fmt.Printf(" Records written: %d\n", stage.RecordsWritten)
fmt.Printf(" Duration: %s to %s\n", stage.StartTime, stage.EndTime)
for _, step := range stage.Steps {
fmt.Printf(" Step: %s\n", step.Kind)
}
}type ScriptStatistics struct {
EvaluationKind string
StackFrames []*ScriptStackFrame
}type ScriptStackFrame struct {
StartLine int64
StartColumn int64
EndLine int64
EndColumn int64
ProcedureID string
Text string
}Statistics for script-based query jobs:
stats := status.Statistics
if stats.ScriptStatistics != nil {
fmt.Printf("Evaluation kind: %s\n", stats.ScriptStatistics.EvaluationKind)
for _, frame := range stats.ScriptStatistics.StackFrames {
fmt.Printf("Frame: %s (lines %d-%d)\n",
frame.ProcedureID, frame.StartLine, frame.EndLine)
}
}type SessionInfo struct {
SessionID string
}type TransactionInfo struct {
TransactionID string
}Information about multi-statement transactions and sessions:
stats := status.Statistics
if stats.SessionInfo != nil {
fmt.Printf("Session ID: %s\n", stats.SessionInfo.SessionID)
}
if stats.TransactionInfo != nil {
fmt.Printf("Transaction ID: %s\n", stats.TransactionInfo.TransactionID)
}func (j *Job) Cancel(ctx context.Context) errorCancel a running job:
if err := job.Cancel(ctx); err != nil {
return err
}
// Wait for cancellation to complete
status, err := job.Wait(ctx)
if err != nil {
return err
}
if status.Err != nil {
fmt.Printf("Job was canceled: %v\n", status.Err)
}func (c *Client) Jobs(ctx context.Context) *JobIteratortype JobIterator struct {
// ProjectID is the project to list jobs from
ProjectID string
// AllUsers, when true, lists jobs from all users in the project
AllUsers bool
// State filters jobs by state
State State
// MinCreationTime filters jobs created after this time
MinCreationTime time.Time
// MaxCreationTime filters jobs created before this time
MaxCreationTime time.Time
// ParentJobID filters jobs by parent job ID
ParentJobID string
}func (it *JobIterator) Next() (*Job, error)
func (it *JobIterator) PageInfo() *iterator.PageInfoit := client.Jobs(ctx)
for {
job, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return err
}
fmt.Printf("Job: %s\n", job.ID())
}it := client.Jobs(ctx)
it.AllUsers = true
it.State = bigquery.Running
it.MinCreationTime = time.Now().Add(-24 * time.Hour)
for {
job, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return err
}
status := job.LastStatus()
fmt.Printf("Job: %s, State: %s\n", job.ID(), status.State)
}type JobIDConfig struct {
JobID string
AddJobIDSuffix bool
Location string
ProjectID string
}Set custom job ID:
q := client.Query("SELECT 1")
q.JobID = "my-custom-job-id"
q.Location = "US"
job, err := q.Run(ctx)Add random suffix:
q.JobID = "query-job"
q.AddJobIDSuffix = true
// Results in "query-job-<random-suffix>"func (j *Job) Read(ctx context.Context) (*RowIterator, error)job, err := q.Run(ctx)
if err != nil {
return err
}
// Can read results before job is done
it, err := job.Read(ctx)
if err != nil {
return err
}
for {
var values []bigquery.Value
err := it.Next(&values)
if err == iterator.Done {
break
}
if err != nil {
return err
}
fmt.Println(values)
}Execute a dry run to get statistics without actually running:
q := client.Query("SELECT * FROM large_table")
q.DryRun = true
job, err := q.Run(ctx)
if err != nil {
return err
}
stats := job.LastStatus().Statistics
queryStats := stats.Details.(*bigquery.QueryStatistics)
fmt.Printf("Would process: %d bytes\n", queryStats.TotalBytesProcessed)Set a timeout for job execution:
q := client.Query("SELECT * FROM dataset.table")
q.JobTimeout = 10 * time.Minute
job, err := q.Run(ctx)Note: The timeout cannot be adjusted after the job is created. Use Job.Cancel() for more dynamic control.
package main
import (
"context"
"fmt"
"log"
"time"
"cloud.google.com/go/bigquery"
"google.golang.org/api/iterator"
)
func main() {
ctx := context.Background()
client, err := bigquery.NewClient(ctx, "my-project")
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Start a query job
q := client.Query(`
SELECT name, COUNT(*) as count
FROM ` + "`bigquery-public-data.usa_names.usa_1910_2013`" + `
GROUP BY name
ORDER BY count DESC
LIMIT 100
`)
q.JobID = "analysis-job"
q.AddJobIDSuffix = true
job, err := q.Run(ctx)
if err != nil {
log.Fatal(err)
}
jobID := job.ID()
fmt.Printf("Started job: %s\n", jobID)
// Monitor job progress
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
done := make(chan bool)
go func() {
for {
select {
case <-ticker.C:
status, err := job.Status(ctx)
if err != nil {
log.Printf("Error checking status: %v\n", err)
continue
}
fmt.Printf("Job state: %s\n", status.State)
if status.Done {
if status.Err != nil {
log.Printf("Job failed: %v\n", status.Err)
} else {
fmt.Println("Job completed successfully")
}
done <- true
return
}
}
}
}()
// Wait for completion
<-done
// Get final statistics
status := job.LastStatus()
stats := status.Statistics
queryStats := stats.Details.(*bigquery.QueryStatistics)
fmt.Printf("\nJob Statistics:\n")
fmt.Printf(" Creation time: %s\n", stats.CreationTime)
fmt.Printf(" Start time: %s\n", stats.StartTime)
fmt.Printf(" End time: %s\n", stats.EndTime)
fmt.Printf(" Duration: %s\n", stats.EndTime.Sub(stats.StartTime))
fmt.Printf(" Bytes processed: %d\n", queryStats.TotalBytesProcessed)
fmt.Printf(" Bytes billed: %d\n", queryStats.TotalBytesBilled)
fmt.Printf(" Cache hit: %v\n", queryStats.CacheHit)
fmt.Printf(" Slot ms: %d\n", queryStats.TotalSlotMs)
// Read results
it, err := job.Read(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Printf("\nResults (%d total rows):\n", it.TotalRows)
count := 0
for {
var values []bigquery.Value
err := it.Next(&values)
if err == iterator.Done {
break
}
if err != nil {
log.Fatal(err)
}
if count < 10 {
fmt.Printf(" %v\n", values)
}
count++
}
// List recent jobs
fmt.Println("\nRecent jobs:")
listRecentJobs(ctx, client)
}
func listRecentJobs(ctx context.Context, client *bigquery.Client) {
it := client.Jobs(ctx)
it.AllUsers = false
it.MinCreationTime = time.Now().Add(-1 * time.Hour)
count := 0
for {
job, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
log.Fatal(err)
}
status := job.LastStatus()
fmt.Printf(" %s: %s\n", job.ID(), status.State)
count++
if count >= 10 {
break
}
}
}