CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-api-python-client

Google API Client Library for Python that provides discovery-based access to hundreds of Google services with authentication, caching, and media upload/download support.

Pending
Overview
Eval results
Files

channel.mddocs/

Notification Channels and Webhooks

The channel module provides functionality for managing Google API push notifications and webhooks. It enables real-time notifications when resources change, allowing applications to respond immediately to updates without polling.

Capabilities

Channel Management

Create and manage notification channels for receiving push notifications.

class Channel:
    """Represents a notification channel for push notifications."""
    
    def __init__(self, type_, id_, token=None, address=None, expiration=None, 
                 params=None, resource_id=None, resource_uri=None):
        """
        Initialize a notification channel.
        
        Args:
            type_ (str): Channel type ('web_hook', 'webhook')
            id_ (str): Unique channel identifier
            token (str, optional): Verification token for security
            address (str, optional): Callback URL for notifications
            expiration (int, optional): Channel expiration timestamp (milliseconds)
            params (dict, optional): Additional channel parameters
            resource_id (str, optional): Resource identifier being watched
            resource_uri (str, optional): Resource URI being watched
        """
    
    @property
    def type(self):
        """
        Get the channel type.
        
        Returns:
            str: Channel type ('web_hook', 'webhook')
        """
    
    @property
    def id(self):
        """
        Get the channel ID.
        
        Returns:
            str: Unique channel identifier
        """
    
    @property
    def token(self):
        """
        Get the verification token.
        
        Returns:
            str: Verification token for webhook security
        """
    
    @property
    def address(self):
        """
        Get the callback address.
        
        Returns:
            str: URL where notifications will be sent
        """
    
    @property
    def expiration(self):
        """
        Get the channel expiration time.
        
        Returns:
            int: Expiration timestamp in milliseconds since epoch
        """
    
    @property
    def params(self):
        """
        Get additional channel parameters.
        
        Returns:
            dict: Channel-specific parameters
        """
    
    @property
    def resource_id(self):
        """
        Get the resource ID being watched.
        
        Returns:
            str: Identifier of the watched resource
        """
    
    @property
    def resource_uri(self):
        """
        Get the resource URI being watched.
        
        Returns:
            str: URI of the watched resource
        """
    
    def update(self, channel):
        """
        Update channel properties from another channel object.
        
        Args:
            channel (Channel or dict): Channel object or dictionary with updates
        """
    
    def __str__(self):
        """
        Get string representation of the channel.
        
        Returns:
            str: String representation showing channel details
        """

Notification Processing

Process incoming push notifications from Google APIs.

class Notification:
    """Represents a push notification received from Google APIs."""
    
    def __init__(self, message_number, state, resource_state, resource_id, 
                 resource_uri, channel_id, channel_expiration=None, 
                 channel_token=None, changed_attributes=None):
        """
        Initialize a notification object.
        
        Args:
            message_number (int): Sequential message number
            state (str): Notification state ('sync', 'exists', 'not_exists')
            resource_state (str): State of the resource ('exists', 'not_exists', 'sync')
            resource_id (str): Identifier of the changed resource
            resource_uri (str): URI of the changed resource
            channel_id (str): ID of the channel that received the notification
            channel_expiration (int, optional): Channel expiration timestamp
            channel_token (str, optional): Channel verification token
            changed_attributes (list, optional): List of changed resource attributes
        """
    
    @property
    def message_number(self):
        """
        Get the message sequence number.
        
        Returns:
            int: Sequential message number for ordering
        """
    
    @property
    def state(self):
        """
        Get the notification state.
        
        Returns:
            str: Notification state ('sync', 'exists', 'not_exists')
        """
    
    @property
    def resource_state(self):
        """
        Get the resource state.
        
        Returns:
            str: Current state of the resource
        """
    
    @property
    def resource_id(self):
        """
        Get the resource identifier.
        
        Returns:
            str: ID of the resource that changed
        """
    
    @property
    def resource_uri(self):
        """
        Get the resource URI.
        
        Returns:
            str: URI of the resource that changed
        """
    
    @property
    def channel_id(self):
        """
        Get the channel ID.
        
        Returns:
            str: ID of the channel that received this notification
        """
    
    @property
    def channel_expiration(self):
        """
        Get the channel expiration time.
        
        Returns:
            int: Channel expiration timestamp in milliseconds
        """
    
    @property
    def channel_token(self):
        """
        Get the channel verification token.
        
        Returns:
            str: Token for verifying notification authenticity
        """
    
    @property
    def changed_attributes(self):
        """
        Get the list of changed attributes.
        
        Returns:
            list: Names of resource attributes that changed
        """

Constants

EPOCH = datetime.datetime(1970, 1, 1, 0, 0, 0, 0, tzinfo=None)  # Unix epoch reference

Usage Examples

Setting Up a Webhook Channel

from googleapiclient import discovery
from googleapiclient.channel import Channel
import uuid

# Build Gmail service
service = discovery.build('gmail', 'v1', credentials=credentials)

# Create a channel for Gmail message notifications
channel = Channel(
    type_='web_hook',
    id_=str(uuid.uuid4()),  # Unique channel ID
    address='https://myapp.example.com/webhook/gmail',
    token='my-secret-verification-token',
    expiration=int((time.time() + 3600) * 1000)  # 1 hour from now
)

# Start watching for changes
watch_request = service.users().messages().watch(
    userId='me',
    body={
        'id': channel.id,
        'type': channel.type,
        'address': channel.address,
        'token': channel.token,
        'expiration': channel.expiration
    }
)

result = watch_request.execute()
print(f"Watching channel: {result['id']}")
print(f"Resource ID: {result['resourceId']}")

Processing Webhook Notifications

from googleapiclient.channel import Notification
from flask import Flask, request
import json

app = Flask(__name__)

@app.route('/webhook/gmail', methods=['POST'])
def handle_gmail_notification():
    """Handle incoming Gmail push notifications."""
    
    # Extract notification data from headers
    headers = request.headers
    
    # Create notification object
    notification = Notification(
        message_number=int(headers.get('X-Goog-Message-Number', 0)),
        state=headers.get('X-Goog-Resource-State', 'unknown'),
        resource_state=headers.get('X-Goog-Resource-State', 'unknown'),
        resource_id=headers.get('X-Goog-Resource-ID', ''),
        resource_uri=headers.get('X-Goog-Resource-URI', ''),
        channel_id=headers.get('X-Goog-Channel-ID', ''),
        channel_expiration=headers.get('X-Goog-Channel-Expiration'),
        channel_token=headers.get('X-Goog-Channel-Token')
    )
    
    # Verify the token
    if notification.channel_token != 'my-secret-verification-token':
        return 'Unauthorized', 401
    
    # Process the notification
    if notification.resource_state == 'exists':
        print(f"Resource {notification.resource_id} was created or updated")
        # Fetch the actual resource
        fetch_updated_resource(notification.resource_id)
    elif notification.resource_state == 'not_exists':
        print(f"Resource {notification.resource_id} was deleted")
        # Handle resource deletion
        handle_resource_deletion(notification.resource_id)
    
    return 'OK', 200

def fetch_updated_resource(resource_id):
    """Fetch the updated resource from the API."""
    service = discovery.build('gmail', 'v1', credentials=credentials)
    
    try:
        message = service.users().messages().get(
            userId='me', 
            id=resource_id
        ).execute()
        
        # Process the updated message
        subject = next(
            h['value'] for h in message['payload']['headers'] 
            if h['name'] == 'Subject'
        )
        print(f"Updated message subject: {subject}")
        
    except Exception as e:
        print(f"Error fetching resource {resource_id}: {e}")

def handle_resource_deletion(resource_id):
    """Handle resource deletion."""
    print(f"Cleaning up references to deleted resource: {resource_id}")

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080)

Channel Management

from googleapiclient.channel import Channel
import time
import uuid

class ChannelManager:
    """Manage multiple notification channels."""
    
    def __init__(self, service):
        self.service = service
        self.active_channels = {}
    
    def create_channel(self, resource_path, webhook_url, duration_hours=1):
        """
        Create a new notification channel.
        
        Args:
            resource_path (str): API resource path to watch
            webhook_url (str): URL for receiving notifications
            duration_hours (int): How long the channel should remain active
            
        Returns:
            Channel: Created channel object
        """
        channel = Channel(
            type_='web_hook',
            id_=str(uuid.uuid4()),
            address=webhook_url,
            token=f'token-{int(time.time())}',
            expiration=int((time.time() + duration_hours * 3600) * 1000)
        )
        
        # Start watching the resource
        watch_body = {
            'id': channel.id,
            'type': channel.type,
            'address': channel.address,
            'token': channel.token,
            'expiration': channel.expiration
        }
        
        # This would vary based on the specific API and resource
        # Example for Gmail:
        if 'messages' in resource_path:
            result = self.service.users().messages().watch(
                userId='me',
                body=watch_body
            ).execute()
        
        # Update channel with server response
        channel.update(result)
        self.active_channels[channel.id] = channel
        
        return channel
    
    def stop_channel(self, channel_id):
        """Stop a notification channel."""
        if channel_id in self.active_channels:
            channel = self.active_channels[channel_id]
            
            # Stop the channel
            self.service.users().stop(body={
                'id': channel.id,
                'resourceId': channel.resource_id
            }).execute()
            
            del self.active_channels[channel_id]
            print(f"Stopped channel: {channel_id}")
    
    def list_active_channels(self):
        """List all active channels."""
        return list(self.active_channels.values())
    
    def cleanup_expired_channels(self):
        """Remove expired channels from tracking."""
        current_time = int(time.time() * 1000)
        expired = [
            cid for cid, channel in self.active_channels.items()
            if channel.expiration and channel.expiration < current_time
        ]
        
        for channel_id in expired:
            del self.active_channels[channel_id]
            print(f"Removed expired channel: {channel_id}")

# Usage
service = discovery.build('gmail', 'v1', credentials=credentials)
manager = ChannelManager(service)

# Create a channel
channel = manager.create_channel(
    'users/me/messages',
    'https://myapp.example.com/webhook/gmail',
    duration_hours=24
)

print(f"Created channel: {channel.id}")

# Later, stop the channel
manager.stop_channel(channel.id)

Notification Validation

from googleapiclient.channel import Notification
import hmac
import hashlib

class NotificationValidator:
    """Validate incoming webhook notifications."""
    
    def __init__(self, expected_tokens):
        """
        Initialize validator with expected tokens.
        
        Args:
            expected_tokens (dict): Mapping of channel IDs to tokens
        """
        self.expected_tokens = expected_tokens
    
    def validate_notification(self, headers, body=None):
        """
        Validate an incoming notification.
        
        Args:
            headers (dict): HTTP headers from the webhook request
            body (bytes, optional): Request body content
            
        Returns:
            tuple: (is_valid, notification) - validation result and notification object
        """
        # Extract notification data
        notification = Notification(
            message_number=int(headers.get('X-Goog-Message-Number', 0)),
            state=headers.get('X-Goog-Resource-State', 'unknown'),
            resource_state=headers.get('X-Goog-Resource-State', 'unknown'),
            resource_id=headers.get('X-Goog-Resource-ID', ''),
            resource_uri=headers.get('X-Goog-Resource-URI', ''),
            channel_id=headers.get('X-Goog-Channel-ID', ''),
            channel_expiration=headers.get('X-Goog-Channel-Expiration'),
            channel_token=headers.get('X-Goog-Channel-Token')
        )
        
        # Validate token
        expected_token = self.expected_tokens.get(notification.channel_id)
        if not expected_token or notification.channel_token != expected_token:
            return False, notification
        
        # Check channel expiration
        if notification.channel_expiration:
            current_time = int(time.time() * 1000)
            if int(notification.channel_expiration) < current_time:
                return False, notification
        
        return True, notification

# Usage in Flask webhook handler
validator = NotificationValidator({
    'channel-id-1': 'secret-token-1',
    'channel-id-2': 'secret-token-2'
})

@app.route('/webhook', methods=['POST'])
def webhook_handler():
    is_valid, notification = validator.validate_notification(request.headers)
    
    if not is_valid:
        return 'Invalid notification', 401
    
    # Process valid notification
    print(f"Valid notification from channel: {notification.channel_id}")
    return 'OK', 200

Batch Channel Operations

from googleapiclient import http
from googleapiclient.channel import Channel
import uuid

def create_multiple_channels(service, resources, webhook_base_url):
    """Create multiple channels in a batch operation."""
    
    batch = http.BatchHttpRequest()
    channels = []
    
    def channel_callback(request_id, response, exception):
        if exception:
            print(f"Failed to create channel {request_id}: {exception}")
        else:
            print(f"Created channel {request_id}: {response['id']}")
    
    # Create channels for multiple resources
    for i, resource in enumerate(resources):
        channel = Channel(
            type_='web_hook',
            id_=str(uuid.uuid4()),
            address=f'{webhook_base_url}/{resource}',
            token=f'token-{resource}-{int(time.time())}',
            expiration=int((time.time() + 3600) * 1000)
        )
        
        channels.append(channel)
        
        # Add to batch
        watch_request = service.users().messages().watch(
            userId='me',
            body={
                'id': channel.id,
                'type': channel.type,
                'address': channel.address,
                'token': channel.token,
                'expiration': channel.expiration
            }
        )
        
        batch.add(watch_request, callback=channel_callback, request_id=f'channel-{i}')
    
    # Execute batch
    batch.execute()
    return channels

# Usage
resources = ['messages', 'drafts', 'labels']
channels = create_multiple_channels(
    service, 
    resources, 
    'https://myapp.example.com/webhook'
)

Install with Tessl CLI

npx tessl i tessl/pypi-google-api-python-client@2.181.1

docs

auth.md

channel.md

discovery.md

errors.md

http.md

index.md

media.md

mimeparse.md

model.md

schema.md

testing.md

tile.json