Google Cloud Storage integrates with Cloud Pub/Sub to send notifications about bucket and object changes, enabling real-time processing workflows and event-driven architectures.
class Notification extends ServiceObject {
constructor(bucket: Bucket, id: string);
// Properties
bucket: Bucket;
id: string;
// Methods
get(options?: GetNotificationOptions): Promise<GetNotificationResponse>;
getMetadata(options?: GetNotificationMetadataOptions): Promise<GetNotificationMetadataResponse>;
delete(options?: DeleteNotificationOptions): Promise<void>;
}interface NotificationMetadata {
id?: string;
topic?: string;
payloadFormat?: string;
objectNamePrefix?: string;
etag?: string;
selfLink?: string;
eventTypes?: string[];
customAttributes?: { [key: string]: string };
}
type GetNotificationResponse = [Notification, unknown]; // [notification, apiResponse]
type GetNotificationMetadataResponse = [NotificationMetadata, unknown]; // [metadata, apiResponse]createNotification(topic: string, options?: CreateNotificationOptions): Promise<CreateNotificationResponse>
interface CreateNotificationOptions {
eventTypes?: string[];
objectNamePrefix?: string;
payloadFormat?: string;
customAttributes?: { [key: string]: string };
userProject?: string;
}
type CreateNotificationResponse = [Notification, unknown]; // [notification, apiResponse]
// Create notification for all events
const [notification] = await bucket.createNotification('projects/my-project/topics/bucket-events');
console.log('Notification ID:', notification.id);
console.log('Topic:', notification.metadata?.topic);
// Notification for specific events
const [notification] = await bucket.createNotification('projects/my-project/topics/object-changes', {
eventTypes: ['OBJECT_FINALIZE', 'OBJECT_DELETE'],
payloadFormat: 'JSON_API_V1'
});
// Notification with object prefix filter
const [notification] = await bucket.createNotification('projects/my-project/topics/uploads', {
eventTypes: ['OBJECT_FINALIZE'],
objectNamePrefix: 'uploads/',
payloadFormat: 'JSON_API_V1'
});
// Notification with custom attributes
const [notification] = await bucket.createNotification('projects/my-project/topics/processed', {
customAttributes: {
environment: 'production',
version: '2.0',
team: 'data-processing'
}
});// Available event types
const eventTypes = [
'OBJECT_FINALIZE', // Object created or overwritten
'OBJECT_DELETE', // Object deleted
'OBJECT_METADATA_UPDATE', // Object metadata changed
'OBJECT_ACL_UPDATE' // Object ACL changed
];
// Multiple event types
const [notification] = await bucket.createNotification('projects/my-project/topics/all-changes', {
eventTypes: [
'OBJECT_FINALIZE',
'OBJECT_DELETE',
'OBJECT_METADATA_UPDATE'
]
});
// All events (default behavior)
const [notification] = await bucket.createNotification('projects/my-project/topics/everything');// JSON_API_V1: Full object metadata (default)
const [jsonNotification] = await bucket.createNotification('projects/my-project/topics/detailed', {
payloadFormat: 'JSON_API_V1'
});
// NONE: Minimal payload with just event info
const [minimalNotification] = await bucket.createNotification('projects/my-project/topics/minimal', {
payloadFormat: 'NONE'
});getNotifications(options?: GetNotificationsOptions): Promise<GetNotificationsResponse>
interface GetNotificationsOptions {
userProject?: string;
}
type GetNotificationsResponse = [Notification[], unknown]; // [notifications, apiResponse]
// List all notifications for bucket
const [notifications] = await bucket.getNotifications();
notifications.forEach(notification => {
console.log(`Notification ${notification.id}:`);
console.log(` Topic: ${notification.metadata?.topic}`);
console.log(` Events: ${notification.metadata?.eventTypes?.join(', ')}`);
console.log(` Prefix: ${notification.metadata?.objectNamePrefix || 'none'}`);
console.log('---');
});// Get notification reference
const notification = bucket.notification('notification-id');
// Get notification details
const [notification] = await notification.get();
const [metadata] = await notification.getMetadata();
console.log('Notification Details:');
console.log(' ID:', metadata.id);
console.log(' Topic:', metadata.topic);
console.log(' Payload Format:', metadata.payloadFormat);
console.log(' Object Prefix:', metadata.objectNamePrefix);
console.log(' Event Types:', metadata.eventTypes);
console.log(' Custom Attributes:', metadata.customAttributes);delete(options?: DeleteNotificationOptions): Promise<void>
interface DeleteNotificationOptions {
userProject?: string;
}
// Delete specific notification
await notification.delete();
// Delete all notifications for bucket
const [notifications] = await bucket.getNotifications();
await Promise.all(notifications.map(n => n.delete()));class Channel {
constructor(storage: Storage, id: string, resourceId: string);
// Properties
id: string;
resourceId: string;
// Methods
stop(): Promise<void>;
}createChannel(id: string, config: CreateChannelConfig, options?: CreateChannelOptions): Promise<CreateChannelResponse>
interface CreateChannelConfig {
address: string;
type?: string;
token?: string;
params?: { [key: string]: string };
}
interface CreateChannelOptions {
userProject?: string;
}
type CreateChannelResponse = [Channel, unknown]; // [channel, apiResponse]
// Create watch channel for bucket changes
const [channel] = await bucket.createChannel('my-watch-channel', {
address: 'https://example.com/webhook',
type: 'web_hook'
});
// With authentication token
const [channel] = await bucket.createChannel('secure-channel', {
address: 'https://api.example.com/storage-webhook',
token: 'secret-verification-token',
params: {
'custom-header': 'value'
}
});
// Stop watching
await channel.stop();// Create Pub/Sub topic (using @google-cloud/pubsub)
import { PubSub } from '@google-cloud/pubsub';
const pubsub = new PubSub();
// Create topic
const [topic] = await pubsub.createTopic('bucket-notifications');
console.log(`Topic ${topic.name} created`);
// Grant Cloud Storage permission to publish
await topic.iam.setPolicy({
bindings: [
{
role: 'roles/pubsub.publisher',
members: ['serviceAccount:service-PROJECT_NUMBER@gs-project-accounts.iam.gserviceaccount.com']
}
]
});
// Create subscription
const [subscription] = await topic.createSubscription('storage-processor');// Message handler for notifications
function handleStorageNotification(message: any) {
const data = JSON.parse(message.data.toString());
console.log('Storage Event:');
console.log(' Event Type:', data.eventType);
console.log(' Bucket:', data.bucketId);
console.log(' Object:', data.objectId);
console.log(' Generation:', data.objectGeneration);
console.log(' Event Time:', data.eventTime);
// Process based on event type
switch (data.eventType) {
case 'OBJECT_FINALIZE':
console.log('Object created/updated');
// Trigger processing pipeline
processNewFile(data.bucketId, data.objectId);
break;
case 'OBJECT_DELETE':
console.log('Object deleted');
// Clean up related resources
cleanupResources(data.objectId);
break;
case 'OBJECT_METADATA_UPDATE':
console.log('Object metadata updated');
// Update search index
updateSearchIndex(data.bucketId, data.objectId);
break;
}
message.ack();
}
// Subscribe to messages
subscription.on('message', handleStorageNotification);
subscription.on('error', error => {
console.error('Subscription error:', error);
});// JSON_API_V1 payload structure
interface StorageNotificationPayload {
eventType: 'OBJECT_FINALIZE' | 'OBJECT_DELETE' | 'OBJECT_METADATA_UPDATE' | 'OBJECT_ACL_UPDATE';
bucketId: string;
objectId: string;
objectGeneration: string;
eventTime: string;
// Additional fields for JSON_API_V1
id?: string;
selfLink?: string;
name?: string;
bucket?: string;
generation?: string;
metageneration?: string;
contentType?: string;
timeCreated?: string;
updated?: string;
storageClass?: string;
timeStorageClassUpdated?: string;
size?: string;
md5Hash?: string;
mediaLink?: string;
contentEncoding?: string;
contentDisposition?: string;
contentLanguage?: string;
cacheControl?: string;
metadata?: { [key: string]: string };
acl?: Array<{
entity: string;
role: string;
}>;
owner?: {
entity: string;
entityId: string;
};
}
// NONE payload structure (minimal)
interface MinimalNotificationPayload {
eventType: string;
bucketId: string;
objectId: string;
objectGeneration: string;
eventTime: string;
}// Set up notification for image uploads
const [notification] = await bucket.createNotification('projects/my-project/topics/image-processing', {
eventTypes: ['OBJECT_FINALIZE'],
objectNamePrefix: 'uploads/images/',
payloadFormat: 'JSON_API_V1'
});
// Processing function
async function processImage(bucketName: string, fileName: string) {
const bucket = storage.bucket(bucketName);
const file = bucket.file(fileName);
// Download image
const [imageData] = await file.download();
// Process image (resize, optimize, etc.)
const processedImage = await processImageData(imageData);
// Save processed image
const processedFile = bucket.file(fileName.replace('uploads/', 'processed/'));
await processedFile.save(processedImage);
// Update metadata
await processedFile.setMetadata({
metadata: {
originalFile: fileName,
processedAt: new Date().toISOString(),
processingVersion: '2.0'
}
});
}// Notification for log file uploads
const [notification] = await bucket.createNotification('projects/my-project/topics/log-analysis', {
eventTypes: ['OBJECT_FINALIZE'],
objectNamePrefix: 'logs/',
customAttributes: {
workflow: 'log-analysis',
priority: 'high'
}
});
// Log processing
async function processLogFile(bucketName: string, fileName: string) {
const file = storage.bucket(bucketName).file(fileName);
// Stream process large log files
const readStream = file.createReadStream();
const entries: LogEntry[] = [];
readStream
.pipe(split()) // Split by lines
.on('data', line => {
const entry = parseLogEntry(line);
if (entry.level === 'ERROR') {
entries.push(entry);
}
})
.on('end', async () => {
if (entries.length > 0) {
// Store errors in database
await storeErrorEntries(entries);
// Send alert if critical errors
const criticalErrors = entries.filter(e => e.severity === 'CRITICAL');
if (criticalErrors.length > 0) {
await sendAlert(`${criticalErrors.length} critical errors in ${fileName}`);
}
}
});
}// Monitor backup uploads
const [notification] = await bucket.createNotification('projects/my-project/topics/backup-monitor', {
eventTypes: ['OBJECT_FINALIZE', 'OBJECT_DELETE'],
objectNamePrefix: 'backups/'
});
// Backup monitoring
function monitorBackups(message: any) {
const data = JSON.parse(message.data.toString());
if (data.eventType === 'OBJECT_FINALIZE') {
// Log successful backup
console.log(`Backup completed: ${data.objectId}`);
// Verify backup integrity
verifyBackupIntegrity(data.bucketId, data.objectId);
// Update backup tracking database
updateBackupDatabase({
fileName: data.objectId,
timestamp: data.eventTime,
size: data.size,
status: 'completed'
});
} else if (data.eventType === 'OBJECT_DELETE') {
// Alert on backup deletion
console.warn(`Backup deleted: ${data.objectId}`);
sendBackupDeletionAlert(data.objectId);
}
message.ack();
}// Notification for user uploads
const [notification] = await bucket.createNotification('projects/my-project/topics/content-moderation', {
eventTypes: ['OBJECT_FINALIZE'],
objectNamePrefix: 'user-uploads/',
payloadFormat: 'JSON_API_V1'
});
// Content moderation
async function moderateContent(bucketName: string, fileName: string) {
const file = storage.bucket(bucketName).file(fileName);
const [metadata] = await file.getMetadata();
// Check content type
if (metadata.contentType?.startsWith('image/')) {
// Image moderation
const moderationResult = await moderateImage(bucketName, fileName);
if (moderationResult.safe) {
// Move to approved folder
await file.move(`approved/${fileName}`);
} else {
// Move to quarantine
await file.move(`quarantine/${fileName}`);
// Set metadata with moderation results
await file.setMetadata({
metadata: {
moderationStatus: 'flagged',
moderationReason: moderationResult.reason,
moderatedAt: new Date().toISOString()
}
});
}
}
}// Robust message processing
function handleNotificationWithRetry(message: any) {
const maxRetries = 3;
let retries = 0;
async function processMessage() {
try {
const data = JSON.parse(message.data.toString());
await processStorageEvent(data);
message.ack();
} catch (error) {
console.error('Processing error:', error);
retries++;
if (retries < maxRetries) {
console.log(`Retrying (${retries}/${maxRetries})...`);
setTimeout(processMessage, 1000 * retries); // Exponential backoff
} else {
console.error('Max retries exceeded, nacking message');
message.nack();
}
}
}
processMessage();
}// Monitor notification delivery
async function checkNotificationHealth() {
const [notifications] = await bucket.getNotifications();
for (const notification of notifications) {
const [metadata] = await notification.getMetadata();
// Check if topic still exists
try {
const topic = pubsub.topic(metadata.topic!.split('/').pop()!);
const [exists] = await topic.exists();
if (!exists) {
console.warn(`Topic ${metadata.topic} for notification ${notification.id} does not exist`);
}
} catch (error) {
console.error(`Error checking topic for notification ${notification.id}:`, error);
}
}
}
// Run health check periodically
setInterval(checkNotificationHealth, 60000); // Every minuteimport { ApiError } from '@google-cloud/storage';
try {
const [notification] = await bucket.createNotification('projects/my-project/topics/events');
} catch (error) {
if (error instanceof ApiError) {
if (error.code === 400) {
console.log('Invalid topic name or configuration');
} else if (error.code === 403) {
console.log('Permission denied - check Pub/Sub permissions');
} else if (error.code === 404) {
console.log('Topic not found - create the topic first');
} else {
console.error(`Notification Error ${error.code}: ${error.message}`);
}
}
}// Promise pattern (recommended)
const [notification] = await bucket.createNotification(topicName);
// Callback pattern
bucket.createNotification(topicName, options, (err, notification, apiResponse) => {
if (err) {
console.error('Error:', err);
return;
}
console.log('Notification created:', notification.id);
});
// Callback types
interface CreateNotificationCallback {
(err: Error | null, notification?: Notification, apiResponse?: unknown): void;
}
interface GetNotificationsCallback {
(err: Error | null, notifications?: Notification[], apiResponse?: unknown): void;
}
interface DeleteNotificationCallback {
(err: Error | null, apiResponse?: unknown): void;
}
interface StopCallback {
(err: Error | null): void;
}