PouchDB adapter using HTTP for remote CouchDB connections and database operations
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Real-time database changes streaming with filtering, continuous monitoring support, and comprehensive options for tracking document modifications in remote CouchDB databases.
Streams database changes with extensive filtering and monitoring options.
/**
* Stream database changes with various filtering options
* @param opts - Changes feed options including filters, continuous mode, and callbacks
* @returns Object with cancel method to stop the changes feed
*/
api._changes(opts): { cancel: function };Usage Examples:
// Basic changes monitoring
const changes = db.changes({
since: 'now',
live: true,
include_docs: true
}).on('change', (change) => {
console.log('Document changed:', change.id);
console.log('New revision:', change.changes[0].rev);
if (change.doc) {
console.log('Document content:', change.doc);
}
}).on('error', (err) => {
console.error('Changes feed error:', err);
});
// Stop monitoring after 30 seconds
setTimeout(() => {
changes.cancel();
console.log('Changes feed stopped');
}, 30000);
// One-time changes since last update
db.changes({
since: lastUpdateSeq,
include_docs: true
}, (err, result) => {
if (err) {
console.error('Error getting changes:', err);
return;
}
result.results.forEach(change => {
console.log('Changed document:', change.id);
console.log('Current revision:', change.changes[0].rev);
});
// Store the last sequence for next query
lastUpdateSeq = result.last_seq;
});Monitor changes with document filtering and specific document sets.
/**
* Monitor changes with filtering options
* @param opts - Options including filter functions and document IDs
* @returns Changes feed controller with cancel method
*/
api._changes(opts): { cancel: function };Usage Examples:
// Filter by document IDs
const userChanges = db.changes({
live: true,
include_docs: true,
doc_ids: ['user:alice', 'user:bob', 'user:charlie']
}).on('change', (change) => {
console.log('User document changed:', change.id);
if (change.doc) {
console.log('User data:', {
name: change.doc.name,
email: change.doc.email
});
}
});
// Filter by design document view
const filteredChanges = db.changes({
live: true,
include_docs: true,
filter: '_view',
view: 'users/active'
}).on('change', (change) => {
console.log('Active user changed:', change.id);
});
// Custom server-side filter
const customFilterChanges = db.changes({
live: true,
include_docs: true,
filter: 'myapp/important',
query_params: {
priority: 'high'
}
}).on('change', (change) => {
console.log('Important document changed:', change.id);
});Set up persistent changes monitoring with reconnection and error handling.
/**
* Set up continuous changes monitoring with reconnection
* @param opts - Continuous monitoring options
* @returns Changes feed controller
*/
api._changes(opts): { cancel: function };Usage Examples:
// Continuous monitoring with heartbeat
let changesController;
let reconnectTimeout;
function startChangesMonitoring() {
changesController = db.changes({
live: true,
continuous: true,
include_docs: true,
heartbeat: 10000, // 10 seconds
timeout: 60000, // 60 seconds
since: localStorage.getItem('lastSeq') || 'now'
}).on('change', (change) => {
console.log('Document updated:', change.id);
// Store sequence for recovery
localStorage.setItem('lastSeq', change.seq);
// Process the change
handleDocumentChange(change);
}).on('error', (err) => {
console.error('Changes feed error:', err);
// Attempt to reconnect after 5 seconds
if (!reconnectTimeout) {
reconnectTimeout = setTimeout(() => {
console.log('Attempting to reconnect changes feed...');
reconnectTimeout = null;
startChangesMonitoring();
}, 5000);
}
}).on('complete', (info) => {
console.log('Changes feed completed:', info);
});
}
function handleDocumentChange(change) {
if (change.deleted) {
console.log('Document deleted:', change.id);
removeFromLocalCache(change.id);
} else {
console.log('Document updated:', change.id);
updateLocalCache(change.id, change.doc);
}
}
function stopChangesMonitoring() {
if (changesController) {
changesController.cancel();
changesController = null;
}
if (reconnectTimeout) {
clearTimeout(reconnectTimeout);
reconnectTimeout = null;
}
}
// Start monitoring
startChangesMonitoring();Monitor changes while detecting and handling document conflicts.
/**
* Monitor changes with conflict detection
* @param opts - Options including conflicts flag
* @returns Changes feed controller
*/
api._changes(opts): { cancel: function };Usage Examples:
// Monitor with conflict detection
const conflictAwareChanges = db.changes({
live: true,
include_docs: true,
conflicts: true
}).on('change', (change) => {
console.log('Document changed:', change.id);
if (change.doc._conflicts) {
console.warn('Document has conflicts:', change.doc._conflicts);
// Handle conflicts by fetching all conflicting revisions
handleDocumentConflicts(change.id, change.doc._conflicts);
} else {
// Normal document update
console.log('Clean document update:', change.doc);
}
});
async function handleDocumentConflicts(docId, conflicts) {
console.log(`Resolving conflicts for ${docId}`);
try {
// Get all conflicting revisions
const conflictingDocs = await Promise.all(
conflicts.map(rev =>
new Promise((resolve, reject) => {
db.get(docId, { rev }, (err, doc) => {
if (err) reject(err);
else resolve(doc);
});
})
)
);
// Implement conflict resolution logic
const resolvedDoc = resolveConflicts(conflictingDocs);
// Save resolved document
db.put(resolvedDoc, (err, result) => {
if (err) {
console.error('Failed to resolve conflict:', err);
} else {
console.log('Conflict resolved:', result.rev);
}
});
} catch (err) {
console.error('Error handling conflicts:', err);
}
}
function resolveConflicts(docs) {
// Simple conflict resolution: use most recent document
return docs.reduce((latest, current) => {
const latestTime = new Date(latest.updated || latest._rev);
const currentTime = new Date(current.updated || current._rev);
return currentTime > latestTime ? current : latest;
});
}// Process changes in batches for better performance
let changesBatch = [];
const BATCH_SIZE = 10;
const BATCH_TIMEOUT = 5000;
let batchTimeout;
const batchChanges = db.changes({
live: true,
include_docs: true
}).on('change', (change) => {
changesBatch.push(change);
// Process batch when it reaches target size
if (changesBatch.length >= BATCH_SIZE) {
processBatch();
} else {
// Set timeout to process partial batch
if (batchTimeout) {
clearTimeout(batchTimeout);
}
batchTimeout = setTimeout(processBatch, BATCH_TIMEOUT);
}
});
function processBatch() {
if (changesBatch.length === 0) return;
console.log(`Processing batch of ${changesBatch.length} changes`);
// Process all changes in the batch
const batch = changesBatch.slice();
changesBatch = [];
if (batchTimeout) {
clearTimeout(batchTimeout);
batchTimeout = null;
}
// Handle batch processing
batch.forEach(change => {
console.log('Processing change:', change.id);
// Your batch processing logic here
});
}class RobustChangesMonitor {
constructor(db, options = {}) {
this.db = db;
this.options = {
maxRetries: 5,
retryDelay: 1000,
backoffMultiplier: 2,
...options
};
this.currentChanges = null;
this.retryCount = 0;
this.isRunning = false;
}
start() {
if (this.isRunning) return;
this.isRunning = true;
this.connectChanges();
}
stop() {
this.isRunning = false;
if (this.currentChanges) {
this.currentChanges.cancel();
this.currentChanges = null;
}
}
connectChanges() {
if (!this.isRunning) return;
console.log('Starting changes feed...');
this.currentChanges = this.db.changes({
live: true,
include_docs: true,
since: this.options.since || 'now',
heartbeat: 30000
}).on('change', (change) => {
// Reset retry count on successful change
this.retryCount = 0;
this.handleChange(change);
}).on('error', (err) => {
console.error('Changes feed error:', err);
this.handleError(err);
}).on('complete', (info) => {
console.log('Changes feed completed');
if (this.isRunning) {
this.handleError(new Error('Changes feed completed unexpectedly'));
}
});
}
handleChange(change) {
console.log('Document changed:', change.id);
// Emit change event for external handlers
if (this.options.onChange) {
this.options.onChange(change);
}
}
handleError(err) {
if (!this.isRunning) return;
this.retryCount++;
if (this.retryCount <= this.options.maxRetries) {
const delay = this.options.retryDelay * Math.pow(this.options.backoffMultiplier, this.retryCount - 1);
console.log(`Retrying changes feed in ${delay}ms (attempt ${this.retryCount}/${this.options.maxRetries})`);
setTimeout(() => {
if (this.isRunning) {
this.connectChanges();
}
}, delay);
} else {
console.error(`Max retries (${this.options.maxRetries}) exceeded, stopping changes feed`);
this.stop();
if (this.options.onMaxRetriesReached) {
this.options.onMaxRetriesReached(err);
}
}
}
}
// Usage
const monitor = new RobustChangesMonitor(db, {
since: 'now',
maxRetries: 10,
retryDelay: 2000,
onChange: (change) => {
console.log('Robust change handler:', change.id);
},
onMaxRetriesReached: (err) => {
console.error('Changes monitoring failed permanently:', err);
}
});
monitor.start();// Changes options
interface ChangesOptions {
conflicts?: boolean;
include_docs?: boolean;
attachments?: boolean;
descending?: boolean;
since?: string | number;
limit?: number;
timeout?: number;
heartbeat?: number | boolean;
live?: boolean;
continuous?: boolean;
filter?: string;
view?: string;
doc_ids?: string[];
query_params?: { [key: string]: any };
selector?: any;
style?: 'main_only' | 'all_docs';
seq_interval?: number;
batch_size?: number;
onChange?: (change: Change) => void;
onError?: (error: Error) => void;
onComplete?: (result: ChangesResult) => void;
}
// Change object
interface Change {
id: string;
seq: string | number;
changes: ChangeRevision[];
doc?: PouchDoc;
deleted?: boolean;
}
interface ChangeRevision {
rev: string;
}
// Changes result
interface ChangesResult {
results: Change[];
last_seq: string | number;
pending?: number;
}
// Changes controller
interface ChangesController {
cancel(): void;
on(event: 'change', callback: (change: Change) => void): ChangesController;
on(event: 'error', callback: (error: Error) => void): ChangesController;
on(event: 'complete', callback: (result: ChangesResult) => void): ChangesController;
}Install with Tessl CLI
npx tessl i tessl/npm-pouchdb-adapter-http