Google API Client Library for Python that provides discovery-based access to hundreds of Google services with authentication, caching, and media upload/download support.
—
The channel module provides functionality for managing Google API push notifications and webhooks. It enables real-time notifications when resources change, allowing applications to respond immediately to updates without polling.
Create and manage notification channels for receiving push notifications.
class Channel:
"""Represents a notification channel for push notifications."""
def __init__(self, type_, id_, token=None, address=None, expiration=None,
params=None, resource_id=None, resource_uri=None):
"""
Initialize a notification channel.
Args:
type_ (str): Channel type ('web_hook', 'webhook')
id_ (str): Unique channel identifier
token (str, optional): Verification token for security
address (str, optional): Callback URL for notifications
expiration (int, optional): Channel expiration timestamp (milliseconds)
params (dict, optional): Additional channel parameters
resource_id (str, optional): Resource identifier being watched
resource_uri (str, optional): Resource URI being watched
"""
@property
def type(self):
"""
Get the channel type.
Returns:
str: Channel type ('web_hook', 'webhook')
"""
@property
def id(self):
"""
Get the channel ID.
Returns:
str: Unique channel identifier
"""
@property
def token(self):
"""
Get the verification token.
Returns:
str: Verification token for webhook security
"""
@property
def address(self):
"""
Get the callback address.
Returns:
str: URL where notifications will be sent
"""
@property
def expiration(self):
"""
Get the channel expiration time.
Returns:
int: Expiration timestamp in milliseconds since epoch
"""
@property
def params(self):
"""
Get additional channel parameters.
Returns:
dict: Channel-specific parameters
"""
@property
def resource_id(self):
"""
Get the resource ID being watched.
Returns:
str: Identifier of the watched resource
"""
@property
def resource_uri(self):
"""
Get the resource URI being watched.
Returns:
str: URI of the watched resource
"""
def update(self, channel):
"""
Update channel properties from another channel object.
Args:
channel (Channel or dict): Channel object or dictionary with updates
"""
def __str__(self):
"""
Get string representation of the channel.
Returns:
str: String representation showing channel details
"""Process incoming push notifications from Google APIs.
class Notification:
"""Represents a push notification received from Google APIs."""
def __init__(self, message_number, state, resource_state, resource_id,
resource_uri, channel_id, channel_expiration=None,
channel_token=None, changed_attributes=None):
"""
Initialize a notification object.
Args:
message_number (int): Sequential message number
state (str): Notification state ('sync', 'exists', 'not_exists')
resource_state (str): State of the resource ('exists', 'not_exists', 'sync')
resource_id (str): Identifier of the changed resource
resource_uri (str): URI of the changed resource
channel_id (str): ID of the channel that received the notification
channel_expiration (int, optional): Channel expiration timestamp
channel_token (str, optional): Channel verification token
changed_attributes (list, optional): List of changed resource attributes
"""
@property
def message_number(self):
"""
Get the message sequence number.
Returns:
int: Sequential message number for ordering
"""
@property
def state(self):
"""
Get the notification state.
Returns:
str: Notification state ('sync', 'exists', 'not_exists')
"""
@property
def resource_state(self):
"""
Get the resource state.
Returns:
str: Current state of the resource
"""
@property
def resource_id(self):
"""
Get the resource identifier.
Returns:
str: ID of the resource that changed
"""
@property
def resource_uri(self):
"""
Get the resource URI.
Returns:
str: URI of the resource that changed
"""
@property
def channel_id(self):
"""
Get the channel ID.
Returns:
str: ID of the channel that received this notification
"""
@property
def channel_expiration(self):
"""
Get the channel expiration time.
Returns:
int: Channel expiration timestamp in milliseconds
"""
@property
def channel_token(self):
"""
Get the channel verification token.
Returns:
str: Token for verifying notification authenticity
"""
@property
def changed_attributes(self):
"""
Get the list of changed attributes.
Returns:
list: Names of resource attributes that changed
"""EPOCH = datetime.datetime(1970, 1, 1, 0, 0, 0, 0, tzinfo=None) # Unix epoch referencefrom googleapiclient import discovery
from googleapiclient.channel import Channel
import uuid
# Build Gmail service
service = discovery.build('gmail', 'v1', credentials=credentials)
# Create a channel for Gmail message notifications
channel = Channel(
type_='web_hook',
id_=str(uuid.uuid4()), # Unique channel ID
address='https://myapp.example.com/webhook/gmail',
token='my-secret-verification-token',
expiration=int((time.time() + 3600) * 1000) # 1 hour from now
)
# Start watching for changes
watch_request = service.users().messages().watch(
userId='me',
body={
'id': channel.id,
'type': channel.type,
'address': channel.address,
'token': channel.token,
'expiration': channel.expiration
}
)
result = watch_request.execute()
print(f"Watching channel: {result['id']}")
print(f"Resource ID: {result['resourceId']}")from googleapiclient.channel import Notification
from flask import Flask, request
import json
app = Flask(__name__)
@app.route('/webhook/gmail', methods=['POST'])
def handle_gmail_notification():
"""Handle incoming Gmail push notifications."""
# Extract notification data from headers
headers = request.headers
# Create notification object
notification = Notification(
message_number=int(headers.get('X-Goog-Message-Number', 0)),
state=headers.get('X-Goog-Resource-State', 'unknown'),
resource_state=headers.get('X-Goog-Resource-State', 'unknown'),
resource_id=headers.get('X-Goog-Resource-ID', ''),
resource_uri=headers.get('X-Goog-Resource-URI', ''),
channel_id=headers.get('X-Goog-Channel-ID', ''),
channel_expiration=headers.get('X-Goog-Channel-Expiration'),
channel_token=headers.get('X-Goog-Channel-Token')
)
# Verify the token
if notification.channel_token != 'my-secret-verification-token':
return 'Unauthorized', 401
# Process the notification
if notification.resource_state == 'exists':
print(f"Resource {notification.resource_id} was created or updated")
# Fetch the actual resource
fetch_updated_resource(notification.resource_id)
elif notification.resource_state == 'not_exists':
print(f"Resource {notification.resource_id} was deleted")
# Handle resource deletion
handle_resource_deletion(notification.resource_id)
return 'OK', 200
def fetch_updated_resource(resource_id):
"""Fetch the updated resource from the API."""
service = discovery.build('gmail', 'v1', credentials=credentials)
try:
message = service.users().messages().get(
userId='me',
id=resource_id
).execute()
# Process the updated message
subject = next(
h['value'] for h in message['payload']['headers']
if h['name'] == 'Subject'
)
print(f"Updated message subject: {subject}")
except Exception as e:
print(f"Error fetching resource {resource_id}: {e}")
def handle_resource_deletion(resource_id):
"""Handle resource deletion."""
print(f"Cleaning up references to deleted resource: {resource_id}")
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)from googleapiclient.channel import Channel
import time
import uuid
class ChannelManager:
"""Manage multiple notification channels."""
def __init__(self, service):
self.service = service
self.active_channels = {}
def create_channel(self, resource_path, webhook_url, duration_hours=1):
"""
Create a new notification channel.
Args:
resource_path (str): API resource path to watch
webhook_url (str): URL for receiving notifications
duration_hours (int): How long the channel should remain active
Returns:
Channel: Created channel object
"""
channel = Channel(
type_='web_hook',
id_=str(uuid.uuid4()),
address=webhook_url,
token=f'token-{int(time.time())}',
expiration=int((time.time() + duration_hours * 3600) * 1000)
)
# Start watching the resource
watch_body = {
'id': channel.id,
'type': channel.type,
'address': channel.address,
'token': channel.token,
'expiration': channel.expiration
}
# This would vary based on the specific API and resource
# Example for Gmail:
if 'messages' in resource_path:
result = self.service.users().messages().watch(
userId='me',
body=watch_body
).execute()
# Update channel with server response
channel.update(result)
self.active_channels[channel.id] = channel
return channel
def stop_channel(self, channel_id):
"""Stop a notification channel."""
if channel_id in self.active_channels:
channel = self.active_channels[channel_id]
# Stop the channel
self.service.users().stop(body={
'id': channel.id,
'resourceId': channel.resource_id
}).execute()
del self.active_channels[channel_id]
print(f"Stopped channel: {channel_id}")
def list_active_channels(self):
"""List all active channels."""
return list(self.active_channels.values())
def cleanup_expired_channels(self):
"""Remove expired channels from tracking."""
current_time = int(time.time() * 1000)
expired = [
cid for cid, channel in self.active_channels.items()
if channel.expiration and channel.expiration < current_time
]
for channel_id in expired:
del self.active_channels[channel_id]
print(f"Removed expired channel: {channel_id}")
# Usage
service = discovery.build('gmail', 'v1', credentials=credentials)
manager = ChannelManager(service)
# Create a channel
channel = manager.create_channel(
'users/me/messages',
'https://myapp.example.com/webhook/gmail',
duration_hours=24
)
print(f"Created channel: {channel.id}")
# Later, stop the channel
manager.stop_channel(channel.id)from googleapiclient.channel import Notification
import hmac
import hashlib
class NotificationValidator:
"""Validate incoming webhook notifications."""
def __init__(self, expected_tokens):
"""
Initialize validator with expected tokens.
Args:
expected_tokens (dict): Mapping of channel IDs to tokens
"""
self.expected_tokens = expected_tokens
def validate_notification(self, headers, body=None):
"""
Validate an incoming notification.
Args:
headers (dict): HTTP headers from the webhook request
body (bytes, optional): Request body content
Returns:
tuple: (is_valid, notification) - validation result and notification object
"""
# Extract notification data
notification = Notification(
message_number=int(headers.get('X-Goog-Message-Number', 0)),
state=headers.get('X-Goog-Resource-State', 'unknown'),
resource_state=headers.get('X-Goog-Resource-State', 'unknown'),
resource_id=headers.get('X-Goog-Resource-ID', ''),
resource_uri=headers.get('X-Goog-Resource-URI', ''),
channel_id=headers.get('X-Goog-Channel-ID', ''),
channel_expiration=headers.get('X-Goog-Channel-Expiration'),
channel_token=headers.get('X-Goog-Channel-Token')
)
# Validate token
expected_token = self.expected_tokens.get(notification.channel_id)
if not expected_token or notification.channel_token != expected_token:
return False, notification
# Check channel expiration
if notification.channel_expiration:
current_time = int(time.time() * 1000)
if int(notification.channel_expiration) < current_time:
return False, notification
return True, notification
# Usage in Flask webhook handler
validator = NotificationValidator({
'channel-id-1': 'secret-token-1',
'channel-id-2': 'secret-token-2'
})
@app.route('/webhook', methods=['POST'])
def webhook_handler():
is_valid, notification = validator.validate_notification(request.headers)
if not is_valid:
return 'Invalid notification', 401
# Process valid notification
print(f"Valid notification from channel: {notification.channel_id}")
return 'OK', 200from googleapiclient import http
from googleapiclient.channel import Channel
import uuid
def create_multiple_channels(service, resources, webhook_base_url):
"""Create multiple channels in a batch operation."""
batch = http.BatchHttpRequest()
channels = []
def channel_callback(request_id, response, exception):
if exception:
print(f"Failed to create channel {request_id}: {exception}")
else:
print(f"Created channel {request_id}: {response['id']}")
# Create channels for multiple resources
for i, resource in enumerate(resources):
channel = Channel(
type_='web_hook',
id_=str(uuid.uuid4()),
address=f'{webhook_base_url}/{resource}',
token=f'token-{resource}-{int(time.time())}',
expiration=int((time.time() + 3600) * 1000)
)
channels.append(channel)
# Add to batch
watch_request = service.users().messages().watch(
userId='me',
body={
'id': channel.id,
'type': channel.type,
'address': channel.address,
'token': channel.token,
'expiration': channel.expiration
}
)
batch.add(watch_request, callback=channel_callback, request_id=f'channel-{i}')
# Execute batch
batch.execute()
return channels
# Usage
resources = ['messages', 'drafts', 'labels']
channels = create_multiple_channels(
service,
resources,
'https://myapp.example.com/webhook'
)Install with Tessl CLI
npx tessl i tessl/pypi-google-api-python-client@2.181.1