or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

access-control.mdauthentication.mdbucket-operations.mdfile-operations.mdindex.mdnotifications.mdstorage-client.mdtransfer-manager.mdutilities.md
tile.json

notifications.mddocs/

Notifications

Google Cloud Storage integrates with Cloud Pub/Sub to send notifications about bucket and object changes, enabling real-time processing workflows and event-driven architectures.

Notification Class

class Notification extends ServiceObject {
  constructor(bucket: Bucket, id: string);
  
  // Properties
  bucket: Bucket;
  id: string;
  
  // Methods
  get(options?: GetNotificationOptions): Promise<GetNotificationResponse>;
  getMetadata(options?: GetNotificationMetadataOptions): Promise<GetNotificationMetadataResponse>;
  delete(options?: DeleteNotificationOptions): Promise<void>;
}

Notification Metadata

interface NotificationMetadata {
  id?: string;
  topic?: string;
  payloadFormat?: string;
  objectNamePrefix?: string;
  etag?: string;
  selfLink?: string;
  eventTypes?: string[];
  customAttributes?: { [key: string]: string };
}

type GetNotificationResponse = [Notification, unknown]; // [notification, apiResponse]
type GetNotificationMetadataResponse = [NotificationMetadata, unknown]; // [metadata, apiResponse]

Creating Notifications

Basic Notification Setup

createNotification(topic: string, options?: CreateNotificationOptions): Promise<CreateNotificationResponse>

interface CreateNotificationOptions {
  eventTypes?: string[];
  objectNamePrefix?: string;
  payloadFormat?: string;
  customAttributes?: { [key: string]: string };
  userProject?: string;
}

type CreateNotificationResponse = [Notification, unknown]; // [notification, apiResponse]

// Create notification for all events
const [notification] = await bucket.createNotification('projects/my-project/topics/bucket-events');

console.log('Notification ID:', notification.id);
console.log('Topic:', notification.metadata?.topic);

// Notification for specific events
const [notification] = await bucket.createNotification('projects/my-project/topics/object-changes', {
  eventTypes: ['OBJECT_FINALIZE', 'OBJECT_DELETE'],
  payloadFormat: 'JSON_API_V1'
});

// Notification with object prefix filter
const [notification] = await bucket.createNotification('projects/my-project/topics/uploads', {
  eventTypes: ['OBJECT_FINALIZE'],
  objectNamePrefix: 'uploads/',
  payloadFormat: 'JSON_API_V1'
});

// Notification with custom attributes
const [notification] = await bucket.createNotification('projects/my-project/topics/processed', {
  customAttributes: {
    environment: 'production',
    version: '2.0',
    team: 'data-processing'
  }
});

Event Types

// Available event types
const eventTypes = [
  'OBJECT_FINALIZE',    // Object created or overwritten
  'OBJECT_DELETE',      // Object deleted
  'OBJECT_METADATA_UPDATE', // Object metadata changed
  'OBJECT_ACL_UPDATE'   // Object ACL changed
];

// Multiple event types
const [notification] = await bucket.createNotification('projects/my-project/topics/all-changes', {
  eventTypes: [
    'OBJECT_FINALIZE',
    'OBJECT_DELETE',
    'OBJECT_METADATA_UPDATE'
  ]
});

// All events (default behavior)
const [notification] = await bucket.createNotification('projects/my-project/topics/everything');

Payload Formats

// JSON_API_V1: Full object metadata (default)
const [jsonNotification] = await bucket.createNotification('projects/my-project/topics/detailed', {
  payloadFormat: 'JSON_API_V1'
});

// NONE: Minimal payload with just event info
const [minimalNotification] = await bucket.createNotification('projects/my-project/topics/minimal', {
  payloadFormat: 'NONE'
});

Managing Notifications

List Notifications

getNotifications(options?: GetNotificationsOptions): Promise<GetNotificationsResponse>

interface GetNotificationsOptions {
  userProject?: string;
}

type GetNotificationsResponse = [Notification[], unknown]; // [notifications, apiResponse]

// List all notifications for bucket
const [notifications] = await bucket.getNotifications();

notifications.forEach(notification => {
  console.log(`Notification ${notification.id}:`);
  console.log(`  Topic: ${notification.metadata?.topic}`);
  console.log(`  Events: ${notification.metadata?.eventTypes?.join(', ')}`);
  console.log(`  Prefix: ${notification.metadata?.objectNamePrefix || 'none'}`);
  console.log('---');
});

Get Notification Details

// Get notification reference
const notification = bucket.notification('notification-id');

// Get notification details
const [notification] = await notification.get();
const [metadata] = await notification.getMetadata();

console.log('Notification Details:');
console.log('  ID:', metadata.id);
console.log('  Topic:', metadata.topic);
console.log('  Payload Format:', metadata.payloadFormat);
console.log('  Object Prefix:', metadata.objectNamePrefix);
console.log('  Event Types:', metadata.eventTypes);
console.log('  Custom Attributes:', metadata.customAttributes);

Delete Notifications

delete(options?: DeleteNotificationOptions): Promise<void>

interface DeleteNotificationOptions {
  userProject?: string;
}

// Delete specific notification
await notification.delete();

// Delete all notifications for bucket
const [notifications] = await bucket.getNotifications();
await Promise.all(notifications.map(n => n.delete()));

Channel Class (Watch API)

class Channel {
  constructor(storage: Storage, id: string, resourceId: string);
  
  // Properties
  id: string;
  resourceId: string;
  
  // Methods
  stop(): Promise<void>;
}

Creating Watch Channels

createChannel(id: string, config: CreateChannelConfig, options?: CreateChannelOptions): Promise<CreateChannelResponse>

interface CreateChannelConfig {
  address: string;
  type?: string;
  token?: string;
  params?: { [key: string]: string };
}

interface CreateChannelOptions {
  userProject?: string;
}

type CreateChannelResponse = [Channel, unknown]; // [channel, apiResponse]

// Create watch channel for bucket changes
const [channel] = await bucket.createChannel('my-watch-channel', {
  address: 'https://example.com/webhook',
  type: 'web_hook'
});

// With authentication token
const [channel] = await bucket.createChannel('secure-channel', {
  address: 'https://api.example.com/storage-webhook',
  token: 'secret-verification-token',
  params: {
    'custom-header': 'value'
  }
});

// Stop watching
await channel.stop();

Pub/Sub Integration

Topic Setup

// Create Pub/Sub topic (using @google-cloud/pubsub)
import { PubSub } from '@google-cloud/pubsub';

const pubsub = new PubSub();

// Create topic
const [topic] = await pubsub.createTopic('bucket-notifications');
console.log(`Topic ${topic.name} created`);

// Grant Cloud Storage permission to publish
await topic.iam.setPolicy({
  bindings: [
    {
      role: 'roles/pubsub.publisher',
      members: ['serviceAccount:service-PROJECT_NUMBER@gs-project-accounts.iam.gserviceaccount.com']
    }
  ]
});

// Create subscription
const [subscription] = await topic.createSubscription('storage-processor');

Processing Notifications

// Message handler for notifications
function handleStorageNotification(message: any) {
  const data = JSON.parse(message.data.toString());
  
  console.log('Storage Event:');
  console.log('  Event Type:', data.eventType);
  console.log('  Bucket:', data.bucketId);
  console.log('  Object:', data.objectId);
  console.log('  Generation:', data.objectGeneration);
  console.log('  Event Time:', data.eventTime);
  
  // Process based on event type
  switch (data.eventType) {
    case 'OBJECT_FINALIZE':
      console.log('Object created/updated');
      // Trigger processing pipeline
      processNewFile(data.bucketId, data.objectId);
      break;
      
    case 'OBJECT_DELETE':
      console.log('Object deleted');
      // Clean up related resources
      cleanupResources(data.objectId);
      break;
      
    case 'OBJECT_METADATA_UPDATE':
      console.log('Object metadata updated');
      // Update search index
      updateSearchIndex(data.bucketId, data.objectId);
      break;
  }
  
  message.ack();
}

// Subscribe to messages
subscription.on('message', handleStorageNotification);
subscription.on('error', error => {
  console.error('Subscription error:', error);
});

Notification Message Structure

// JSON_API_V1 payload structure
interface StorageNotificationPayload {
  eventType: 'OBJECT_FINALIZE' | 'OBJECT_DELETE' | 'OBJECT_METADATA_UPDATE' | 'OBJECT_ACL_UPDATE';
  bucketId: string;
  objectId: string;
  objectGeneration: string;
  eventTime: string;
  
  // Additional fields for JSON_API_V1
  id?: string;
  selfLink?: string;
  name?: string;
  bucket?: string;
  generation?: string;
  metageneration?: string;
  contentType?: string;
  timeCreated?: string;
  updated?: string;
  storageClass?: string;
  timeStorageClassUpdated?: string;
  size?: string;
  md5Hash?: string;
  mediaLink?: string;
  contentEncoding?: string;
  contentDisposition?: string;
  contentLanguage?: string;
  cacheControl?: string;
  metadata?: { [key: string]: string };
  acl?: Array<{
    entity: string;
    role: string;
  }>;
  owner?: {
    entity: string;
    entityId: string;
  };
}

// NONE payload structure (minimal)
interface MinimalNotificationPayload {
  eventType: string;
  bucketId: string;
  objectId: string;
  objectGeneration: string;
  eventTime: string;
}

Common Notification Patterns

Image Processing Pipeline

// Set up notification for image uploads
const [notification] = await bucket.createNotification('projects/my-project/topics/image-processing', {
  eventTypes: ['OBJECT_FINALIZE'],
  objectNamePrefix: 'uploads/images/',
  payloadFormat: 'JSON_API_V1'
});

// Processing function
async function processImage(bucketName: string, fileName: string) {
  const bucket = storage.bucket(bucketName);
  const file = bucket.file(fileName);
  
  // Download image
  const [imageData] = await file.download();
  
  // Process image (resize, optimize, etc.)
  const processedImage = await processImageData(imageData);
  
  // Save processed image
  const processedFile = bucket.file(fileName.replace('uploads/', 'processed/'));
  await processedFile.save(processedImage);
  
  // Update metadata
  await processedFile.setMetadata({
    metadata: {
      originalFile: fileName,
      processedAt: new Date().toISOString(),
      processingVersion: '2.0'
    }
  });
}

Log Analysis Workflow

// Notification for log file uploads
const [notification] = await bucket.createNotification('projects/my-project/topics/log-analysis', {
  eventTypes: ['OBJECT_FINALIZE'],
  objectNamePrefix: 'logs/',
  customAttributes: {
    workflow: 'log-analysis',
    priority: 'high'
  }
});

// Log processing
async function processLogFile(bucketName: string, fileName: string) {
  const file = storage.bucket(bucketName).file(fileName);
  
  // Stream process large log files
  const readStream = file.createReadStream();
  const entries: LogEntry[] = [];
  
  readStream
    .pipe(split()) // Split by lines
    .on('data', line => {
      const entry = parseLogEntry(line);
      if (entry.level === 'ERROR') {
        entries.push(entry);
      }
    })
    .on('end', async () => {
      if (entries.length > 0) {
        // Store errors in database
        await storeErrorEntries(entries);
        
        // Send alert if critical errors
        const criticalErrors = entries.filter(e => e.severity === 'CRITICAL');
        if (criticalErrors.length > 0) {
          await sendAlert(`${criticalErrors.length} critical errors in ${fileName}`);
        }
      }
    });
}

Data Backup Monitoring

// Monitor backup uploads
const [notification] = await bucket.createNotification('projects/my-project/topics/backup-monitor', {
  eventTypes: ['OBJECT_FINALIZE', 'OBJECT_DELETE'],
  objectNamePrefix: 'backups/'
});

// Backup monitoring
function monitorBackups(message: any) {
  const data = JSON.parse(message.data.toString());
  
  if (data.eventType === 'OBJECT_FINALIZE') {
    // Log successful backup
    console.log(`Backup completed: ${data.objectId}`);
    
    // Verify backup integrity
    verifyBackupIntegrity(data.bucketId, data.objectId);
    
    // Update backup tracking database
    updateBackupDatabase({
      fileName: data.objectId,
      timestamp: data.eventTime,
      size: data.size,
      status: 'completed'
    });
    
  } else if (data.eventType === 'OBJECT_DELETE') {
    // Alert on backup deletion
    console.warn(`Backup deleted: ${data.objectId}`);
    sendBackupDeletionAlert(data.objectId);
  }
  
  message.ack();
}

Content Moderation

// Notification for user uploads
const [notification] = await bucket.createNotification('projects/my-project/topics/content-moderation', {
  eventTypes: ['OBJECT_FINALIZE'],
  objectNamePrefix: 'user-uploads/',
  payloadFormat: 'JSON_API_V1'
});

// Content moderation
async function moderateContent(bucketName: string, fileName: string) {
  const file = storage.bucket(bucketName).file(fileName);
  const [metadata] = await file.getMetadata();
  
  // Check content type
  if (metadata.contentType?.startsWith('image/')) {
    // Image moderation
    const moderationResult = await moderateImage(bucketName, fileName);
    
    if (moderationResult.safe) {
      // Move to approved folder
      await file.move(`approved/${fileName}`);
    } else {
      // Move to quarantine
      await file.move(`quarantine/${fileName}`);
      
      // Set metadata with moderation results
      await file.setMetadata({
        metadata: {
          moderationStatus: 'flagged',
          moderationReason: moderationResult.reason,
          moderatedAt: new Date().toISOString()
        }
      });
    }
  }
}

Error Handling and Reliability

Handling Notification Failures

// Robust message processing
function handleNotificationWithRetry(message: any) {
  const maxRetries = 3;
  let retries = 0;
  
  async function processMessage() {
    try {
      const data = JSON.parse(message.data.toString());
      await processStorageEvent(data);
      message.ack();
    } catch (error) {
      console.error('Processing error:', error);
      
      retries++;
      if (retries < maxRetries) {
        console.log(`Retrying (${retries}/${maxRetries})...`);
        setTimeout(processMessage, 1000 * retries); // Exponential backoff
      } else {
        console.error('Max retries exceeded, nacking message');
        message.nack();
      }
    }
  }
  
  processMessage();
}

Monitoring Notification Health

// Monitor notification delivery
async function checkNotificationHealth() {
  const [notifications] = await bucket.getNotifications();
  
  for (const notification of notifications) {
    const [metadata] = await notification.getMetadata();
    
    // Check if topic still exists
    try {
      const topic = pubsub.topic(metadata.topic!.split('/').pop()!);
      const [exists] = await topic.exists();
      
      if (!exists) {
        console.warn(`Topic ${metadata.topic} for notification ${notification.id} does not exist`);
      }
    } catch (error) {
      console.error(`Error checking topic for notification ${notification.id}:`, error);
    }
  }
}

// Run health check periodically
setInterval(checkNotificationHealth, 60000); // Every minute

Error Handling

import { ApiError } from '@google-cloud/storage';

try {
  const [notification] = await bucket.createNotification('projects/my-project/topics/events');
} catch (error) {
  if (error instanceof ApiError) {
    if (error.code === 400) {
      console.log('Invalid topic name or configuration');
    } else if (error.code === 403) {
      console.log('Permission denied - check Pub/Sub permissions');
    } else if (error.code === 404) {
      console.log('Topic not found - create the topic first');
    } else {
      console.error(`Notification Error ${error.code}: ${error.message}`);
    }
  }
}

Callback Support

// Promise pattern (recommended)
const [notification] = await bucket.createNotification(topicName);

// Callback pattern
bucket.createNotification(topicName, options, (err, notification, apiResponse) => {
  if (err) {
    console.error('Error:', err);
    return;
  }
  console.log('Notification created:', notification.id);
});

// Callback types
interface CreateNotificationCallback {
  (err: Error | null, notification?: Notification, apiResponse?: unknown): void;
}

interface GetNotificationsCallback {
  (err: Error | null, notifications?: Notification[], apiResponse?: unknown): void;
}

interface DeleteNotificationCallback {
  (err: Error | null, apiResponse?: unknown): void;
}

interface StopCallback {
  (err: Error | null): void;
}