CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-postgres

Fastest full featured PostgreSQL client for Node.js and Deno with tagged template literals, transactions, streaming, and logical replication support

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

query-processing.mddocs/

Query Processing

Advanced query execution modes including streaming, cursors, result formats, and performance optimization techniques.

Capabilities

Query Execution Modes

Control how queries are executed and results are processed with specialized methods for different use cases.

/**
 * Execute query with simple protocol (no prepared statements)
 * @returns Query instance for method chaining
 */
query.simple(): PendingQuery;

/**
 * Execute query immediately without waiting
 * @returns Query instance for method chaining  
 */
query.execute(): PendingQuery;

/**
 * Describe query without executing (get statement metadata)
 * @returns Promise resolving to statement description
 */
query.describe(): Promise<StatementDescription>;

/**
 * Cancel a running query
 * @returns Void (cancellation is synchronous)
 */
query.cancel(): void;

Usage Examples:

// Force simple protocol (no prepared statements)
const result = await sql`SELECT * FROM large_table`.simple();

// Execute immediately
const query = sql`SELECT * FROM users WHERE id = ${userId}`;
query.execute();

// Get query metadata without executing
const description = await sql`SELECT * FROM products`.describe();
console.log(description.fields); // Column information

// Cancel a long-running query
const longQuery = sql`SELECT * FROM huge_table`;
setTimeout(() => longQuery.cancel(), 5000);
const result = await longQuery;

Cursor-Based Processing

Process large result sets efficiently using cursors to stream results in batches.

/**
 * Execute query with cursor for streaming results
 * @param rows - Number of rows to fetch per batch (default: 1000)
 * @returns AsyncIterable yielding batches of rows
 */
query.cursor(rows?: number): AsyncIterable<Row[]>;

/**
 * Execute query with cursor using callback
 * @param fn - Function called for each batch of rows
 * @returns Promise resolving to execution result
 */
query.cursor(fn: (rows: Row[]) => void): Promise<ExecutionResult>;

/**
 * Execute query with cursor specifying batch size and callback
 * @param rows - Number of rows per batch
 * @param fn - Function called for each batch
 * @returns Promise resolving to execution result
 */
query.cursor(rows: number, fn: (rows: Row[]) => void): Promise<ExecutionResult>;

Usage Examples:

// Async iterator with default batch size
for await (const batch of sql`SELECT * FROM large_table`.cursor()) {
  console.log(`Processing ${batch.length} rows`);
  await processBatch(batch);
}

// Custom batch size with async iterator
for await (const batch of sql`SELECT * FROM orders`.cursor(500)) {
  await processOrders(batch);
}

// Callback-based processing
await sql`SELECT * FROM transactions`.cursor(1000, (rows) => {
  rows.forEach(transaction => {
    validateTransaction(transaction);
  });
});

// Smaller batches for memory-constrained processing
await sql`SELECT * FROM logs`.cursor(100, async (batch) => {
  await sendToAnalytics(batch);
});

Row-by-Row Processing

Process individual rows as they arrive for maximum memory efficiency.

/**
 * Process query results row by row
 * @param fn - Function called for each row
 * @returns Promise resolving to execution result
 */
query.forEach(fn: (row: Row, result: ExecutionResult) => void): Promise<ExecutionResult>;

Usage Examples:

// Process each row individually
let processedCount = 0;
const result = await sql`SELECT * FROM users`.forEach((user, result) => {
  processUser(user);
  processedCount++;
  
  if (processedCount % 1000 === 0) {
    console.log(`Processed ${processedCount} users`);
  }
});

console.log(`Total processed: ${result.count}`);

// Async processing within forEach
await sql`SELECT * FROM orders`.forEach(async (order, result) => {
  await validateOrder(order);
  await updateInventory(order.items);
});

Result Format Options

Control the format and structure of query results.

/**
 * Return raw buffer data instead of parsed values
 * @returns Query returning raw buffer results
 */
query.raw(): PendingQuery<Buffer[][]>;

/**
 * Return results as 2D array instead of objects
 * @returns Query returning array of arrays
 */
query.values(): PendingQuery<any[][]>;

Usage Examples:

// Raw buffer data for binary processing
const rawData = await sql`SELECT image_data FROM photos`.raw();
rawData.forEach(row => {
  const imageBuffer = row[0]; // Buffer containing image data
  processImage(imageBuffer);
});

// Values as 2D array for CSV export
const userData = await sql`SELECT name, email, age FROM users`.values();
// Returns: [["John", "john@example.com", 30], ["Jane", "jane@example.com", 25]]

const csvContent = userData.map(row => row.join(',')).join('\n');

COPY Operations

Efficiently import and export large datasets using PostgreSQL's COPY functionality.

/**
 * Get readable stream for COPY TO operations
 * @returns Promise resolving to readable stream
 */
query.readable(): Promise<Readable>;

/**
 * Get writable stream for COPY FROM operations  
 * @returns Promise resolving to writable stream
 */
query.writable(): Promise<Writable>;

Usage Examples:

import { createWriteStream, createReadStream } from 'fs';

// Export data to file using COPY TO
const readable = await sql`
  COPY (SELECT * FROM users) TO STDOUT WITH CSV HEADER
`.readable();

const fileStream = createWriteStream('users.csv');
readable.pipe(fileStream);

// Import data from file using COPY FROM
const writable = await sql`
  COPY products (name, price, category) FROM STDIN WITH CSV HEADER
`.writable();

const inputStream = createReadStream('products.csv');
inputStream.pipe(writable);

// Stream transformation during COPY
const transform = new Transform({
  transform(chunk, encoding, callback) {
    // Process data during copy
    const processed = processChunk(chunk);
    callback(null, processed);
  }
});

inputStream.pipe(transform).pipe(writable);

Result Metadata

ExecutionResult Interface

Query execution results include comprehensive metadata about the operation.

interface ExecutionResult {
  /** Number of rows affected by the query */
  count: number;
  
  /** SQL command that was executed */
  command: string;
  
  /** Backend connection state information */
  state: ConnectionState;
  
  /** Column information for SELECT queries */
  columns?: ColumnInfo[];
  
  /** Statement metadata including parameter types */
  statement?: StatementInfo;
}

interface ColumnInfo {
  /** Column name */
  name: string;
  
  /** PostgreSQL type OID */
  type: number;
  
  /** Table OID (0 if not from a table) */
  table: number;
  
  /** Column attribute number in table */
  number: number;
  
  /** Type modifier */
  modifier: number;
  
  /** Format code (0 = text, 1 = binary) */
  format: number;
}

Usage Examples:

// Access result metadata
const result = await sql`SELECT name, age FROM users WHERE active = true`;

console.log(`Command: ${result.command}`); // "SELECT"
console.log(`Rows returned: ${result.count}`); // Number of rows
console.log(`Columns: ${result.columns?.map(col => col.name)}`); // ["name", "age"]

// Check column types
result.columns?.forEach(col => {
  console.log(`${col.name}: type ${col.type}`);
});

StatementDescription Interface

Detailed information about prepared statements obtained via describe().

interface StatementDescription {
  /** Parameter type OIDs */
  parameters: number[];
  
  /** Result field information */
  fields: FieldDescription[];
}

interface FieldDescription {
  /** Field name */
  name: string;
  
  /** Table OID */
  tableOid: number;
  
  /** Column attribute number */
  columnAttrNumber: number;
  
  /** Data type OID */
  dataTypeOid: number;
  
  /** Data type size */
  dataTypeSize: number;
  
  /** Type modifier */
  typeModifier: number;
  
  /** Format code */
  format: number;
}

Usage Examples:

// Analyze query structure before execution
const description = await sql`
  SELECT u.name, u.email, p.title 
  FROM users u 
  JOIN posts p ON u.id = p.author_id 
  WHERE u.created_at > $1
`.describe();

console.log(`Parameters needed: ${description.parameters.length}`);
description.fields.forEach(field => {
  console.log(`Field: ${field.name}, Type: ${field.dataTypeOid}`);
});

Performance Optimization

Query Preparation

Control prepared statement usage for optimal performance.

// Global preparation setting
const sql = postgres(connectionConfig, {
  prepare: true  // Enable prepared statements (default)
});

// Per-query preparation control
await sql`SELECT * FROM users WHERE id = ${id}`.simple(); // Skip preparation

Usage Examples:

// Disable preparation for one-time queries
const oneTimeResult = await sql`
  SELECT * FROM system_stats WHERE collected_at = NOW()
`.simple();

// Use preparation for repeated queries
const getUserById = (id) => sql`SELECT * FROM users WHERE id = ${id}`;

// These will reuse the same prepared statement
const user1 = await getUserById(1);
const user2 = await getUserById(2);

Memory Management

Optimize memory usage for large result sets.

// Process large datasets without loading everything into memory
for await (const batch of sql`SELECT * FROM large_table`.cursor(1000)) {
  // Process batch and let it be garbage collected
  await processBatch(batch);
}

// Row-by-row processing for minimal memory footprint
await sql`SELECT * FROM huge_table`.forEach((row) => {
  processRow(row);
  // Each row can be garbage collected after processing
});

Usage Examples:

// Memory-efficient data export
async function exportLargeTable(tableName, outputFile) {
  const writeStream = createWriteStream(outputFile);
  
  await sql`SELECT * FROM ${sql(tableName)}`.forEach((row) => {
    const csvLine = Object.values(row).join(',') + '\n';
    writeStream.write(csvLine);
  });
  
  writeStream.end();
}

// Batch processing with backpressure control
async function processWithBackpressure() {
  const cursor = sql`SELECT * FROM events ORDER BY created_at`.cursor(500);
  
  for await (const batch of cursor) {
    await processBatch(batch);
    // Natural backpressure - won't fetch next batch until current is processed
  }
}

Install with Tessl CLI

npx tessl i tessl/npm-postgres

docs

connections.md

errors.md

index.md

large-objects.md

notifications.md

query-processing.md

querying.md

replication.md

transactions.md

types.md

tile.json