pgx is a pure Go driver and toolkit for PostgreSQL providing a native high-performance interface with PostgreSQL-specific features plus a database/sql compatibility adapter.
—
Pipeline mode allows sending queries without waiting for previous results, reducing round trips.
Pipeline mode allows sending queries without waiting for previous results, reducing round trips.
func (pgConn *PgConn) StartPipeline(ctx context.Context) *Pipelinetype Pipeline struct { /* unexported */ }
func (p *Pipeline) SendQueryParams(sql string, paramValues [][]byte, paramOIDs []uint32, paramFormats, resultFormats []int16)
func (p *Pipeline) SendQueryPrepared(stmtName string, paramValues [][]byte, paramFormats, resultFormats []int16)
func (p *Pipeline) SendPrepare(name, sql string, paramOIDs []uint32)
func (p *Pipeline) SendDeallocate(name string)
func (p *Pipeline) SendPipelineSync()
func (p *Pipeline) SendFlushRequest()
func (p *Pipeline) Flush() error
func (p *Pipeline) Sync() error
func (p *Pipeline) GetResults() (results any, err error)
func (p *Pipeline) Close() errorGetResults returns one of: *ResultReader, *StatementDescription, *PipelineSync, or nil (no more results).
type PipelineSync struct{} // synchronization point
type CloseComplete struct{} // response to Close messagePipeline example:
pgConn := conn.PgConn()
pipeline := pgConn.StartPipeline(ctx)
pipeline.SendQueryParams("SELECT $1::int", [][]byte{[]byte("1")}, nil, nil, nil)
pipeline.SendQueryParams("SELECT $1::int", [][]byte{[]byte("2")}, nil, nil, nil)
pipeline.SendPipelineSync() // mark sync point
pipeline.Flush() // send buffered data
// Read results
for {
result, err := pipeline.GetResults()
if result == nil {
break
}
switch r := result.(type) {
case *pgconn.ResultReader:
for r.NextRow() {
vals := r.Values()
_ = vals
}
r.Close()
case *pgconn.PipelineSync:
// synchronization point reached
}
}
pipeline.Close()pgconn.PgConn.StartPipeline(ctx)SendPipelineSync() to mark synchronization pointsFlush() to send buffered dataGetResults()pgConn := conn.PgConn()
pipeline := pgConn.StartPipeline(ctx)
// Queue multiple queries
pipeline.SendQueryParams("SELECT $1::int", [][]byte{[]byte("1")}, nil, nil, nil)
pipeline.SendQueryParams("SELECT $1::int", [][]byte{[]byte("2")}, nil, nil, nil)
pipeline.SendQueryParams("SELECT $1::int", [][]byte{[]byte("3")}, nil, nil, nil)
pipeline.SendPipelineSync()
pipeline.Flush()
// Read all results
for {
result, err := pipeline.GetResults()
if err != nil {
return err
}
if result == nil {
break
}
switch r := result.(type) {
case *pgconn.ResultReader:
for r.NextRow() {
vals := r.Values()
fmt.Println("Got value:", string(vals[0]))
}
r.Close()
case *pgconn.PipelineSync:
fmt.Println("Sync point reached")
}
}
pipeline.Close()Use pipeline mode when:
Do not use when:
Pipeline mode can significantly reduce latency for multiple small queries:
Install with Tessl CLI
npx tessl i tessl/golang-github-com-jackc-pgx-v5