pgx is a pure Go driver and toolkit for PostgreSQL providing a native high-performance interface with PostgreSQL-specific features plus a database/sql compatibility adapter.
Pipeline mode allows sending queries without waiting for previous results, reducing round trips.
Pipeline mode allows sending queries without waiting for previous results, reducing round trips.
func (pgConn *PgConn) StartPipeline(ctx context.Context) *Pipelinetype Pipeline struct { /* unexported */ }
func (p *Pipeline) SendQueryParams(sql string, paramValues [][]byte, paramOIDs []uint32, paramFormats, resultFormats []int16)
func (p *Pipeline) SendQueryPrepared(stmtName string, paramValues [][]byte, paramFormats, resultFormats []int16)
func (p *Pipeline) SendPrepare(name, sql string, paramOIDs []uint32)
func (p *Pipeline) SendDeallocate(name string)
func (p *Pipeline) SendPipelineSync()
func (p *Pipeline) SendFlushRequest()
func (p *Pipeline) Flush() error
func (p *Pipeline) Sync() error
func (p *Pipeline) GetResults() (results any, err error)
func (p *Pipeline) Close() errorGetResults returns one of: *ResultReader, *StatementDescription, *PipelineSync, or nil (no more results).
type PipelineSync struct{} // synchronization point
type CloseComplete struct{} // response to Close messagePipeline example:
pgConn := conn.PgConn()
pipeline := pgConn.StartPipeline(ctx)
pipeline.SendQueryParams("SELECT $1::int", [][]byte{[]byte("1")}, nil, nil, nil)
pipeline.SendQueryParams("SELECT $1::int", [][]byte{[]byte("2")}, nil, nil, nil)
pipeline.SendPipelineSync() // mark sync point
pipeline.Flush() // send buffered data
// Read results
for {
result, err := pipeline.GetResults()
if result == nil {
break
}
switch r := result.(type) {
case *pgconn.ResultReader:
for r.NextRow() {
vals := r.Values()
_ = vals
}
r.Close()
case *pgconn.PipelineSync:
// synchronization point reached
}
}
pipeline.Close()pgconn.PgConn.StartPipeline(ctx)SendPipelineSync() to mark synchronization pointsFlush() to send buffered dataGetResults()pgConn := conn.PgConn()
pipeline := pgConn.StartPipeline(ctx)
// Queue multiple queries
pipeline.SendQueryParams("SELECT $1::int", [][]byte{[]byte("1")}, nil, nil, nil)
pipeline.SendQueryParams("SELECT $1::int", [][]byte{[]byte("2")}, nil, nil, nil)
pipeline.SendQueryParams("SELECT $1::int", [][]byte{[]byte("3")}, nil, nil, nil)
pipeline.SendPipelineSync()
pipeline.Flush()
// Read all results
for {
result, err := pipeline.GetResults()
if err != nil {
return err
}
if result == nil {
break
}
switch r := result.(type) {
case *pgconn.ResultReader:
for r.NextRow() {
vals := r.Values()
fmt.Println("Got value:", string(vals[0]))
}
r.Close()
case *pgconn.PipelineSync:
fmt.Println("Sync point reached")
}
}
pipeline.Close()Use pipeline mode when:
Do not use when:
Pipeline mode can significantly reduce latency for multiple small queries:
Install with Tessl CLI
npx tessl i tessl/golang-github-com-jackc-pgx-v5@5.8.0