CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-pouchdb-adapter-http

PouchDB adapter using HTTP for remote CouchDB connections and database operations

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

changes-feed.mddocs/

Changes Feed

Real-time database changes streaming with filtering, continuous monitoring support, and comprehensive options for tracking document modifications in remote CouchDB databases.

Capabilities

Changes Feed Monitoring

Streams database changes with extensive filtering and monitoring options.

/**
 * Stream database changes with various filtering options
 * @param opts - Changes feed options including filters, continuous mode, and callbacks
 * @returns Object with cancel method to stop the changes feed
 */
api._changes(opts): { cancel: function };

Usage Examples:

// Basic changes monitoring
const changes = db.changes({
  since: 'now',
  live: true,
  include_docs: true
}).on('change', (change) => {
  console.log('Document changed:', change.id);
  console.log('New revision:', change.changes[0].rev);
  if (change.doc) {
    console.log('Document content:', change.doc);
  }
}).on('error', (err) => {
  console.error('Changes feed error:', err);
});

// Stop monitoring after 30 seconds
setTimeout(() => {
  changes.cancel();
  console.log('Changes feed stopped');
}, 30000);

// One-time changes since last update
db.changes({
  since: lastUpdateSeq,
  include_docs: true
}, (err, result) => {
  if (err) {
    console.error('Error getting changes:', err);
    return;
  }
  
  result.results.forEach(change => {
    console.log('Changed document:', change.id);
    console.log('Current revision:', change.changes[0].rev);
  });
  
  // Store the last sequence for next query
  lastUpdateSeq = result.last_seq;
});

Filtered Changes

Monitor changes with document filtering and specific document sets.

/**
 * Monitor changes with filtering options
 * @param opts - Options including filter functions and document IDs
 * @returns Changes feed controller with cancel method
 */
api._changes(opts): { cancel: function };

Usage Examples:

// Filter by document IDs
const userChanges = db.changes({
  live: true,
  include_docs: true,
  doc_ids: ['user:alice', 'user:bob', 'user:charlie']
}).on('change', (change) => {
  console.log('User document changed:', change.id);
  if (change.doc) {
    console.log('User data:', {
      name: change.doc.name,
      email: change.doc.email
    });
  }
});

// Filter by design document view
const filteredChanges = db.changes({
  live: true,
  include_docs: true,
  filter: '_view',
  view: 'users/active'
}).on('change', (change) => {
  console.log('Active user changed:', change.id);
});

// Custom server-side filter
const customFilterChanges = db.changes({
  live: true,
  include_docs: true,
  filter: 'myapp/important',
  query_params: {
    priority: 'high'
  }
}).on('change', (change) => {
  console.log('Important document changed:', change.id);
});

Continuous Changes Monitoring

Set up persistent changes monitoring with reconnection and error handling.

/**
 * Set up continuous changes monitoring with reconnection
 * @param opts - Continuous monitoring options
 * @returns Changes feed controller
 */
api._changes(opts): { cancel: function };

Usage Examples:

// Continuous monitoring with heartbeat
let changesController;
let reconnectTimeout;

function startChangesMonitoring() {
  changesController = db.changes({
    live: true,
    continuous: true,
    include_docs: true,
    heartbeat: 10000, // 10 seconds
    timeout: 60000,   // 60 seconds
    since: localStorage.getItem('lastSeq') || 'now'
  }).on('change', (change) => {
    console.log('Document updated:', change.id);
    
    // Store sequence for recovery
    localStorage.setItem('lastSeq', change.seq);
    
    // Process the change
    handleDocumentChange(change);
    
  }).on('error', (err) => {
    console.error('Changes feed error:', err);
    
    // Attempt to reconnect after 5 seconds
    if (!reconnectTimeout) {
      reconnectTimeout = setTimeout(() => {
        console.log('Attempting to reconnect changes feed...');
        reconnectTimeout = null;
        startChangesMonitoring();
      }, 5000);
    }
    
  }).on('complete', (info) => {
    console.log('Changes feed completed:', info);
  });
}

function handleDocumentChange(change) {
  if (change.deleted) {
    console.log('Document deleted:', change.id);
    removeFromLocalCache(change.id);
  } else {
    console.log('Document updated:', change.id);
    updateLocalCache(change.id, change.doc);
  }
}

function stopChangesMonitoring() {
  if (changesController) {
    changesController.cancel();
    changesController = null;
  }
  
  if (reconnectTimeout) {
    clearTimeout(reconnectTimeout);
    reconnectTimeout = null;
  }
}

// Start monitoring
startChangesMonitoring();

Changes with Conflict Detection

Monitor changes while detecting and handling document conflicts.

/**
 * Monitor changes with conflict detection
 * @param opts - Options including conflicts flag
 * @returns Changes feed controller
 */
api._changes(opts): { cancel: function };

Usage Examples:

// Monitor with conflict detection
const conflictAwareChanges = db.changes({
  live: true,
  include_docs: true,
  conflicts: true
}).on('change', (change) => {
  console.log('Document changed:', change.id);
  
  if (change.doc._conflicts) {
    console.warn('Document has conflicts:', change.doc._conflicts);
    
    // Handle conflicts by fetching all conflicting revisions
    handleDocumentConflicts(change.id, change.doc._conflicts);
  } else {
    // Normal document update
    console.log('Clean document update:', change.doc);
  }
});

async function handleDocumentConflicts(docId, conflicts) {
  console.log(`Resolving conflicts for ${docId}`);
  
  try {
    // Get all conflicting revisions
    const conflictingDocs = await Promise.all(
      conflicts.map(rev => 
        new Promise((resolve, reject) => {
          db.get(docId, { rev }, (err, doc) => {
            if (err) reject(err);
            else resolve(doc);
          });
        })
      )
    );
    
    // Implement conflict resolution logic
    const resolvedDoc = resolveConflicts(conflictingDocs);
    
    // Save resolved document
    db.put(resolvedDoc, (err, result) => {
      if (err) {
        console.error('Failed to resolve conflict:', err);
      } else {
        console.log('Conflict resolved:', result.rev);
      }
    });
    
  } catch (err) {
    console.error('Error handling conflicts:', err);
  }
}

function resolveConflicts(docs) {
  // Simple conflict resolution: use most recent document
  return docs.reduce((latest, current) => {
    const latestTime = new Date(latest.updated || latest._rev);
    const currentTime = new Date(current.updated || current._rev);
    return currentTime > latestTime ? current : latest;
  });
}

Advanced Usage Patterns

Batch Processing Changes

// Process changes in batches for better performance
let changesBatch = [];
const BATCH_SIZE = 10;
const BATCH_TIMEOUT = 5000;
let batchTimeout;

const batchChanges = db.changes({
  live: true,
  include_docs: true
}).on('change', (change) => {
  changesBatch.push(change);
  
  // Process batch when it reaches target size
  if (changesBatch.length >= BATCH_SIZE) {
    processBatch();
  } else {
    // Set timeout to process partial batch
    if (batchTimeout) {
      clearTimeout(batchTimeout);
    }
    batchTimeout = setTimeout(processBatch, BATCH_TIMEOUT);
  }
});

function processBatch() {
  if (changesBatch.length === 0) return;
  
  console.log(`Processing batch of ${changesBatch.length} changes`);
  
  // Process all changes in the batch
  const batch = changesBatch.slice();
  changesBatch = [];
  
  if (batchTimeout) {
    clearTimeout(batchTimeout);
    batchTimeout = null;
  }
  
  // Handle batch processing
  batch.forEach(change => {
    console.log('Processing change:', change.id);
    // Your batch processing logic here
  });
}

Changes Feed with Retry Logic

class RobustChangesMonitor {
  constructor(db, options = {}) {
    this.db = db;
    this.options = {
      maxRetries: 5,
      retryDelay: 1000,
      backoffMultiplier: 2,
      ...options
    };
    this.currentChanges = null;
    this.retryCount = 0;
    this.isRunning = false;
  }
  
  start() {
    if (this.isRunning) return;
    
    this.isRunning = true;
    this.connectChanges();
  }
  
  stop() {
    this.isRunning = false;
    
    if (this.currentChanges) {
      this.currentChanges.cancel();
      this.currentChanges = null;
    }
  }
  
  connectChanges() {
    if (!this.isRunning) return;
    
    console.log('Starting changes feed...');
    
    this.currentChanges = this.db.changes({
      live: true,
      include_docs: true,
      since: this.options.since || 'now',
      heartbeat: 30000
    }).on('change', (change) => {
      // Reset retry count on successful change
      this.retryCount = 0;
      this.handleChange(change);
      
    }).on('error', (err) => {
      console.error('Changes feed error:', err);
      this.handleError(err);
      
    }).on('complete', (info) => {
      console.log('Changes feed completed');
      if (this.isRunning) {
        this.handleError(new Error('Changes feed completed unexpectedly'));
      }
    });
  }
  
  handleChange(change) {
    console.log('Document changed:', change.id);
    
    // Emit change event for external handlers
    if (this.options.onChange) {
      this.options.onChange(change);
    }
  }
  
  handleError(err) {
    if (!this.isRunning) return;
    
    this.retryCount++;
    
    if (this.retryCount <= this.options.maxRetries) {
      const delay = this.options.retryDelay * Math.pow(this.options.backoffMultiplier, this.retryCount - 1);
      
      console.log(`Retrying changes feed in ${delay}ms (attempt ${this.retryCount}/${this.options.maxRetries})`);
      
      setTimeout(() => {
        if (this.isRunning) {
          this.connectChanges();
        }
      }, delay);
    } else {
      console.error(`Max retries (${this.options.maxRetries}) exceeded, stopping changes feed`);
      this.stop();
      
      if (this.options.onMaxRetriesReached) {
        this.options.onMaxRetriesReached(err);
      }
    }
  }
}

// Usage
const monitor = new RobustChangesMonitor(db, {
  since: 'now',
  maxRetries: 10,
  retryDelay: 2000,
  onChange: (change) => {
    console.log('Robust change handler:', change.id);
  },
  onMaxRetriesReached: (err) => {
    console.error('Changes monitoring failed permanently:', err);
  }
});

monitor.start();

Types

// Changes options
interface ChangesOptions {
  conflicts?: boolean;
  include_docs?: boolean;
  attachments?: boolean;
  descending?: boolean;
  since?: string | number;
  limit?: number;
  timeout?: number;
  heartbeat?: number | boolean;
  live?: boolean;
  continuous?: boolean;
  filter?: string;
  view?: string;
  doc_ids?: string[];
  query_params?: { [key: string]: any };
  selector?: any;
  style?: 'main_only' | 'all_docs';
  seq_interval?: number;
  batch_size?: number;
  onChange?: (change: Change) => void;
  onError?: (error: Error) => void;
  onComplete?: (result: ChangesResult) => void;
}

// Change object
interface Change {
  id: string;
  seq: string | number;
  changes: ChangeRevision[];
  doc?: PouchDoc;
  deleted?: boolean;
}

interface ChangeRevision {
  rev: string;
}

// Changes result
interface ChangesResult {
  results: Change[];
  last_seq: string | number;
  pending?: number;
}

// Changes controller
interface ChangesController {
  cancel(): void;
  on(event: 'change', callback: (change: Change) => void): ChangesController;
  on(event: 'error', callback: (error: Error) => void): ChangesController;
  on(event: 'complete', callback: (result: ChangesResult) => void): ChangesController;
}

Install with Tessl CLI

npx tessl i tessl/npm-pouchdb-adapter-http

docs

attachment-operations.md

changes-feed.md

database-operations.md

document-operations.md

http-utilities.md

index.md

tile.json