CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-postgres

Fastest full featured PostgreSQL client for Node.js and Deno with tagged template literals, transactions, streaming, and logical replication support

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

large-objects.mddocs/

Large Objects

PostgreSQL large object support for efficient storage and streaming of binary data, files, and multimedia content.

Capabilities

Large Object Creation

Create and access PostgreSQL large objects for storing binary data that exceeds standard column size limits.

/**
 * Create or access a large object
 * @param oid - Optional object identifier (omit to create new)
 * @param mode - Access mode flags (default: 0x00020000 | 0x00040000 = read/write)
 * @returns Promise resolving to LargeObject instance
 */
largeObject(oid?: number, mode?: number): Promise<LargeObject>;

Usage Examples:

// Create a new large object
const newLargeObject = await sql.largeObject();
console.log('Created large object with OID:', newLargeObject.oid);

// Open existing large object for reading
const existingObject = await sql.largeObject(12345, 'r');

// Open for writing
const writableObject = await sql.largeObject(12345, 'w');

// Open for reading and writing (default)
const readWriteObject = await sql.largeObject(12345);

LargeObject Interface

Comprehensive API for manipulating large object data with streaming support.

interface LargeObject {
  /** Create readable stream from large object */
  readable(options?: ReadableOptions): Promise<Readable>;
  
  /** Create writable stream to large object */
  writable(options?: WritableOptions): Promise<Writable>;
  
  /** Close the large object */
  close(): Promise<void>;
  
  /** Get current position in object */
  tell(): Promise<void>;
  
  /** Read data from current position */
  read(size: number): Promise<void>;
  
  /** Write data at current position */
  write(buffer: Uint8Array): Promise<[{ data: Uint8Array }]>;
  
  /** Truncate object to specified size */
  truncate(size: number): Promise<void>;
  
  /** Seek to position in object */
  seek(offset: number, whence?: number): Promise<void>;
  
  /** Get size and current position */
  size(): Promise<[{ position: bigint, size: bigint }]>;
}

interface ReadableOptions {
  /** Buffer size for reading */
  highWaterMark?: number;
  
  /** Start position in object */
  start?: number;
  
  /** End position in object */
  end?: number;
}

interface WritableOptions {
  /** Buffer size for writing */
  highWaterMark?: number;
  
  /** Start position in object */
  start?: number;
}

Usage Examples:

// Basic large object operations
const largeObj = await sql.largeObject();

// Write data
const data = Buffer.from('Hello, large object world!');
await largeObj.write(data);

// Get current position
const position = await largeObj.tell();
console.log('Current position:', position);

// Seek to beginning
await largeObj.seek(0);

// Read data
const readData = await largeObj.read(26);
console.log('Read data:', readData.toString());

// Get size information
const { size, position: currentPos } = await largeObj.size();
console.log(`Size: ${size}, Position: ${currentPos}`);

// Close when done
await largeObj.close();

Streaming Operations

Reading Large Objects

Efficiently read large objects using streams for memory-optimized processing.

/**
 * Seek position constants
 */
const SEEK_SET = 0; // Absolute position
const SEEK_CUR = 1; // Relative to current position  
const SEEK_END = 2; // Relative to end of object

Usage Examples:

import { createWriteStream } from 'fs';

// Stream large object to file
async function exportLargeObjectToFile(oid, filePath) {
  const largeObj = await sql.largeObject(oid, 'r');
  const readable = largeObj.readable();
  const writeStream = createWriteStream(filePath);
  
  return new Promise((resolve, reject) => {
    readable.pipe(writeStream);
    
    writeStream.on('finish', async () => {
      await largeObj.close();
      resolve();
    });
    
    writeStream.on('error', async (error) => {
      await largeObj.close();
      reject(error);
    });
  });
}

// Usage
await exportLargeObjectToFile(12345, './exported-file.bin');

// Stream with custom options
const largeObj = await sql.largeObject(oid, 'r');
const readable = largeObj.readable({
  highWaterMark: 64 * 1024, // 64KB chunks
  start: 1000,              // Start at byte 1000
  end: 50000               // End at byte 50000
});

readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes`);
  processChunk(chunk);
});

readable.on('end', async () => {
  console.log('Stream finished');
  await largeObj.close();
});

Writing Large Objects

Upload files and data to large objects using streams.

import { createReadStream } from 'fs';

// Upload file to large object
async function uploadFileToLargeObject(filePath) {
  const largeObj = await sql.largeObject(); // Create new
  const writable = largeObj.writable();
  const readStream = createReadStream(filePath);
  
  return new Promise((resolve, reject) => {
    readStream.pipe(writable);
    
    writable.on('finish', async () => {
      await largeObj.close();
      resolve(largeObj.oid);
    });
    
    writable.on('error', async (error) => {
      await largeObj.close();
      reject(error);
    });
  });
}

// Usage
const oid = await uploadFileToLargeObject('./large-file.pdf');
console.log('Uploaded file to large object:', oid);

// Write with custom buffer size
const largeObj = await sql.largeObject();
const writable = largeObj.writable({
  highWaterMark: 128 * 1024 // 128KB buffer
});

// Write multiple chunks
const chunks = [
  Buffer.from('First chunk of data'),
  Buffer.from('Second chunk of data'),
  Buffer.from('Final chunk')
];

for (const chunk of chunks) {
  writable.write(chunk);
}

writable.end();

writable.on('finish', async () => {
  console.log('All chunks written');
  await largeObj.close();
});

Advanced Usage Patterns

Image Processing Pipeline

Process images stored as large objects with streaming transformations.

import { Transform } from 'stream';
import sharp from 'sharp';

class ImageProcessor {
  async processImage(sourceOid, targetOid, transformOptions) {
    const sourceObj = await sql.largeObject(sourceOid, 'r');
    const targetObj = await sql.largeObject(targetOid, 'w');
    
    const readable = sourceObj.readable();
    const writable = targetObj.writable();
    
    // Create image transformation stream
    const transformer = sharp()
      .resize(transformOptions.width, transformOptions.height)
      .jpeg({ quality: transformOptions.quality || 80 });
    
    return new Promise((resolve, reject) => {
      readable
        .pipe(transformer)
        .pipe(writable);
      
      writable.on('finish', async () => {
        await Promise.all([
          sourceObj.close(),
          targetObj.close()
        ]);
        resolve(targetOid);
      });
      
      writable.on('error', async (error) => {
        await Promise.all([
          sourceObj.close(),
          targetObj.close()
        ]);
        reject(error);
      });
    });
  }
  
  async createThumbnail(imageOid) {
    const thumbnailObj = await sql.largeObject(); // Create new for thumbnail
    
    await this.processImage(imageOid, thumbnailObj.oid, {
      width: 200,
      height: 200,
      quality: 70
    });
    
    return thumbnailObj.oid;
  }
}

// Usage
const processor = new ImageProcessor();
const thumbnailOid = await processor.createThumbnail(originalImageOid);

File Chunking and Assembly

Split large files into chunks for efficient transfer and storage.

class FileChunker {
  constructor(chunkSize = 1024 * 1024) { // 1MB chunks
    this.chunkSize = chunkSize;
  }
  
  async splitFile(sourceOid) {
    const sourceObj = await sql.largeObject(sourceOid, 'r');
    const { size } = await sourceObj.size();
    const chunks = [];
    
    let bytesRead = 0;
    while (bytesRead < size) {
      const chunkObj = await sql.largeObject(); // New chunk
      const chunkSize = Math.min(this.chunkSize, size - bytesRead);
      
      // Read chunk from source
      await sourceObj.seek(bytesRead);
      const chunkData = await sourceObj.read(chunkSize);
      
      // Write to chunk object
      await chunkObj.write(chunkData);
      await chunkObj.close();
      
      chunks.push({
        oid: chunkObj.oid,
        offset: bytesRead,
        size: chunkSize
      });
      
      bytesRead += chunkSize;
    }
    
    await sourceObj.close();
    return chunks;
  }
  
  async assembleFile(chunks) {
    const assembledObj = await sql.largeObject();
    
    // Sort chunks by offset
    chunks.sort((a, b) => a.offset - b.offset);
    
    for (const chunk of chunks) {
      const chunkObj = await sql.largeObject(chunk.oid, 'r');
      const chunkData = await chunkObj.read(chunk.size);
      
      await assembledObj.write(chunkData);
      await chunkObj.close();
    }
    
    return assembledObj.oid;
  }
}

// Usage
const chunker = new FileChunker();
const chunks = await chunker.splitFile(largeFileOid);
console.log(`File split into ${chunks.length} chunks`);

const reassembledOid = await chunker.assembleFile(chunks);
console.log('File reassembled:', reassembledOid);

Content Management System

Build a content management system using large objects for file storage.

class ContentManager {
  async storeFile(fileBuffer, metadata) {
    const transaction = await sql.begin(async (sql) => {
      // Create large object for file data
      const largeObj = await sql.largeObject();
      await largeObj.write(fileBuffer);
      await largeObj.close();
      
      // Store metadata in regular table
      const [file] = await sql`
        INSERT INTO files (
          oid, filename, content_type, size, 
          uploaded_at, metadata
        ) VALUES (
          ${largeObj.oid},
          ${metadata.filename},
          ${metadata.contentType},
          ${fileBuffer.length},
          NOW(),
          ${JSON.stringify(metadata)}
        )
        RETURNING *
      `;
      
      return file;
    });
    
    return transaction;
  }
  
  async getFileStream(fileId) {
    const [file] = await sql`
      SELECT oid, filename, content_type 
      FROM files 
      WHERE id = ${fileId}
    `;
    
    if (!file) {
      throw new Error(`File ${fileId} not found`);
    }
    
    const largeObj = await sql.largeObject(file.oid, 'r');
    const readable = largeObj.readable();
    
    // Add metadata to stream
    readable.filename = file.filename;
    readable.contentType = file.content_type;
    
    // Clean up when stream ends
    readable.on('end', () => largeObj.close());
    readable.on('error', () => largeObj.close());
    
    return readable;
  }
  
  async deleteFile(fileId) {
    await sql.begin(async (sql) => {
      const [file] = await sql`
        DELETE FROM files 
        WHERE id = ${fileId} 
        RETURNING oid
      `;
      
      if (file) {
        // Delete the large object
        await sql`SELECT lo_unlink(${file.oid})`;
      }
    });
  }
  
  async duplicateFile(fileId) {
    const [originalFile] = await sql`
      SELECT * FROM files WHERE id = ${fileId}
    `;
    
    if (!originalFile) {
      throw new Error(`File ${fileId} not found`);
    }
    
    return await sql.begin(async (sql) => {
      // Create new large object
      const newLargeObj = await sql.largeObject();
      const originalObj = await sql.largeObject(originalFile.oid, 'r');
      
      // Copy data
      const readable = originalObj.readable();
      const writable = newLargeObj.writable();
      
      await new Promise((resolve, reject) => {
        readable.pipe(writable);
        writable.on('finish', resolve);
        writable.on('error', reject);
      });
      
      await Promise.all([
        originalObj.close(),
        newLargeObj.close()
      ]);
      
      // Create database record
      const [duplicate] = await sql`
        INSERT INTO files (
          oid, filename, content_type, size, 
          uploaded_at, metadata
        ) VALUES (
          ${newLargeObj.oid},
          ${'copy_of_' + originalFile.filename},
          ${originalFile.content_type},
          ${originalFile.size},
          NOW(),
          ${originalFile.metadata}
        )
        RETURNING *
      `;
      
      return duplicate;
    });
  }
}

// Usage
const contentManager = new ContentManager();

// Store a file
const fileBuffer = await fs.readFile('./document.pdf');
const file = await contentManager.storeFile(fileBuffer, {
  filename: 'document.pdf',
  contentType: 'application/pdf',
  tags: ['important', 'legal']
});

// Retrieve file stream
const stream = await contentManager.getFileStream(file.id);
stream.pipe(response); // In Express.js

// Duplicate file
const duplicate = await contentManager.duplicateFile(file.id);

Performance Optimization

Memory-Efficient Processing

Process large objects without loading entire content into memory.

async function processLargeObjectInChunks(oid, processor) {
  const largeObj = await sql.largeObject(oid, 'r');
  const chunkSize = 64 * 1024; // 64KB chunks
  let position = 0;
  
  try {
    while (true) {
      const chunk = await largeObj.read(chunkSize);
      
      if (chunk.length === 0) {
        break; // End of object
      }
      
      await processor(chunk, position);
      position += chunk.length;
    }
  } finally {
    await largeObj.close();
  }
}

// Usage: Calculate checksum without loading entire file
import { createHash } from 'crypto';

async function calculateChecksum(oid) {
  const hash = createHash('sha256');
  
  await processLargeObjectInChunks(oid, (chunk) => {
    hash.update(chunk);
  });
  
  return hash.digest('hex');
}

const checksum = await calculateChecksum(fileOid);

Concurrent Operations

Handle multiple large objects efficiently with connection pooling.

class LargeObjectPool {
  constructor(maxConcurrent = 5) {
    this.maxConcurrent = maxConcurrent;
    this.active = new Set();
  }
  
  async process(oids, processor) {
    const promises = oids.map(oid => this.processOne(oid, processor));
    return Promise.all(promises);
  }
  
  async processOne(oid, processor) {
    while (this.active.size >= this.maxConcurrent) {
      await new Promise(resolve => setTimeout(resolve, 100));
    }
    
    this.active.add(oid);
    
    try {
      const largeObj = await sql.largeObject(oid, 'r');
      const result = await processor(largeObj);
      await largeObj.close();
      return result;
    } finally {
      this.active.delete(oid);
    }
  }
}

// Usage
const pool = new LargeObjectPool(10);

const results = await pool.process([123, 456, 789], async (largeObj) => {
  const { size } = await largeObj.size();
  return size;
});

console.log('File sizes:', results);

Install with Tessl CLI

npx tessl i tessl/npm-postgres

docs

connections.md

errors.md

index.md

large-objects.md

notifications.md

query-processing.md

querying.md

replication.md

transactions.md

types.md

tile.json