0
# Notification Channels and Webhooks
1
2
The channel module provides functionality for managing Google API push notifications and webhooks. It enables real-time notifications when resources change, allowing applications to respond immediately to updates without polling.
3
4
## Capabilities
5
6
### Channel Management
7
8
Create and manage notification channels for receiving push notifications.
9
10
```python { .api }
11
class Channel:
12
"""Represents a notification channel for push notifications."""
13
14
def __init__(self, type_, id_, token=None, address=None, expiration=None,
15
params=None, resource_id=None, resource_uri=None):
16
"""
17
Initialize a notification channel.
18
19
Args:
20
type_ (str): Channel type ('web_hook', 'webhook')
21
id_ (str): Unique channel identifier
22
token (str, optional): Verification token for security
23
address (str, optional): Callback URL for notifications
24
expiration (int, optional): Channel expiration timestamp (milliseconds)
25
params (dict, optional): Additional channel parameters
26
resource_id (str, optional): Resource identifier being watched
27
resource_uri (str, optional): Resource URI being watched
28
"""
29
30
@property
31
def type(self):
32
"""
33
Get the channel type.
34
35
Returns:
36
str: Channel type ('web_hook', 'webhook')
37
"""
38
39
@property
40
def id(self):
41
"""
42
Get the channel ID.
43
44
Returns:
45
str: Unique channel identifier
46
"""
47
48
@property
49
def token(self):
50
"""
51
Get the verification token.
52
53
Returns:
54
str: Verification token for webhook security
55
"""
56
57
@property
58
def address(self):
59
"""
60
Get the callback address.
61
62
Returns:
63
str: URL where notifications will be sent
64
"""
65
66
@property
67
def expiration(self):
68
"""
69
Get the channel expiration time.
70
71
Returns:
72
int: Expiration timestamp in milliseconds since epoch
73
"""
74
75
@property
76
def params(self):
77
"""
78
Get additional channel parameters.
79
80
Returns:
81
dict: Channel-specific parameters
82
"""
83
84
@property
85
def resource_id(self):
86
"""
87
Get the resource ID being watched.
88
89
Returns:
90
str: Identifier of the watched resource
91
"""
92
93
@property
94
def resource_uri(self):
95
"""
96
Get the resource URI being watched.
97
98
Returns:
99
str: URI of the watched resource
100
"""
101
102
def update(self, channel):
103
"""
104
Update channel properties from another channel object.
105
106
Args:
107
channel (Channel or dict): Channel object or dictionary with updates
108
"""
109
110
def __str__(self):
111
"""
112
Get string representation of the channel.
113
114
Returns:
115
str: String representation showing channel details
116
"""
117
```
118
119
### Notification Processing
120
121
Process incoming push notifications from Google APIs.
122
123
```python { .api }
124
class Notification:
125
"""Represents a push notification received from Google APIs."""
126
127
def __init__(self, message_number, state, resource_state, resource_id,
128
resource_uri, channel_id, channel_expiration=None,
129
channel_token=None, changed_attributes=None):
130
"""
131
Initialize a notification object.
132
133
Args:
134
message_number (int): Sequential message number
135
state (str): Notification state ('sync', 'exists', 'not_exists')
136
resource_state (str): State of the resource ('exists', 'not_exists', 'sync')
137
resource_id (str): Identifier of the changed resource
138
resource_uri (str): URI of the changed resource
139
channel_id (str): ID of the channel that received the notification
140
channel_expiration (int, optional): Channel expiration timestamp
141
channel_token (str, optional): Channel verification token
142
changed_attributes (list, optional): List of changed resource attributes
143
"""
144
145
@property
146
def message_number(self):
147
"""
148
Get the message sequence number.
149
150
Returns:
151
int: Sequential message number for ordering
152
"""
153
154
@property
155
def state(self):
156
"""
157
Get the notification state.
158
159
Returns:
160
str: Notification state ('sync', 'exists', 'not_exists')
161
"""
162
163
@property
164
def resource_state(self):
165
"""
166
Get the resource state.
167
168
Returns:
169
str: Current state of the resource
170
"""
171
172
@property
173
def resource_id(self):
174
"""
175
Get the resource identifier.
176
177
Returns:
178
str: ID of the resource that changed
179
"""
180
181
@property
182
def resource_uri(self):
183
"""
184
Get the resource URI.
185
186
Returns:
187
str: URI of the resource that changed
188
"""
189
190
@property
191
def channel_id(self):
192
"""
193
Get the channel ID.
194
195
Returns:
196
str: ID of the channel that received this notification
197
"""
198
199
@property
200
def channel_expiration(self):
201
"""
202
Get the channel expiration time.
203
204
Returns:
205
int: Channel expiration timestamp in milliseconds
206
"""
207
208
@property
209
def channel_token(self):
210
"""
211
Get the channel verification token.
212
213
Returns:
214
str: Token for verifying notification authenticity
215
"""
216
217
@property
218
def changed_attributes(self):
219
"""
220
Get the list of changed attributes.
221
222
Returns:
223
list: Names of resource attributes that changed
224
"""
225
```
226
227
### Constants
228
229
```python { .api }
230
EPOCH = datetime.datetime(1970, 1, 1, 0, 0, 0, 0, tzinfo=None) # Unix epoch reference
231
```
232
233
## Usage Examples
234
235
### Setting Up a Webhook Channel
236
237
```python
238
from googleapiclient import discovery
239
from googleapiclient.channel import Channel
240
import uuid
241
242
# Build Gmail service
243
service = discovery.build('gmail', 'v1', credentials=credentials)
244
245
# Create a channel for Gmail message notifications
246
channel = Channel(
247
type_='web_hook',
248
id_=str(uuid.uuid4()), # Unique channel ID
249
address='https://myapp.example.com/webhook/gmail',
250
token='my-secret-verification-token',
251
expiration=int((time.time() + 3600) * 1000) # 1 hour from now
252
)
253
254
# Start watching for changes
255
watch_request = service.users().messages().watch(
256
userId='me',
257
body={
258
'id': channel.id,
259
'type': channel.type,
260
'address': channel.address,
261
'token': channel.token,
262
'expiration': channel.expiration
263
}
264
)
265
266
result = watch_request.execute()
267
print(f"Watching channel: {result['id']}")
268
print(f"Resource ID: {result['resourceId']}")
269
```
270
271
### Processing Webhook Notifications
272
273
```python
274
from googleapiclient.channel import Notification
275
from flask import Flask, request
276
import json
277
278
app = Flask(__name__)
279
280
@app.route('/webhook/gmail', methods=['POST'])
281
def handle_gmail_notification():
282
"""Handle incoming Gmail push notifications."""
283
284
# Extract notification data from headers
285
headers = request.headers
286
287
# Create notification object
288
notification = Notification(
289
message_number=int(headers.get('X-Goog-Message-Number', 0)),
290
state=headers.get('X-Goog-Resource-State', 'unknown'),
291
resource_state=headers.get('X-Goog-Resource-State', 'unknown'),
292
resource_id=headers.get('X-Goog-Resource-ID', ''),
293
resource_uri=headers.get('X-Goog-Resource-URI', ''),
294
channel_id=headers.get('X-Goog-Channel-ID', ''),
295
channel_expiration=headers.get('X-Goog-Channel-Expiration'),
296
channel_token=headers.get('X-Goog-Channel-Token')
297
)
298
299
# Verify the token
300
if notification.channel_token != 'my-secret-verification-token':
301
return 'Unauthorized', 401
302
303
# Process the notification
304
if notification.resource_state == 'exists':
305
print(f"Resource {notification.resource_id} was created or updated")
306
# Fetch the actual resource
307
fetch_updated_resource(notification.resource_id)
308
elif notification.resource_state == 'not_exists':
309
print(f"Resource {notification.resource_id} was deleted")
310
# Handle resource deletion
311
handle_resource_deletion(notification.resource_id)
312
313
return 'OK', 200
314
315
def fetch_updated_resource(resource_id):
316
"""Fetch the updated resource from the API."""
317
service = discovery.build('gmail', 'v1', credentials=credentials)
318
319
try:
320
message = service.users().messages().get(
321
userId='me',
322
id=resource_id
323
).execute()
324
325
# Process the updated message
326
subject = next(
327
h['value'] for h in message['payload']['headers']
328
if h['name'] == 'Subject'
329
)
330
print(f"Updated message subject: {subject}")
331
332
except Exception as e:
333
print(f"Error fetching resource {resource_id}: {e}")
334
335
def handle_resource_deletion(resource_id):
336
"""Handle resource deletion."""
337
print(f"Cleaning up references to deleted resource: {resource_id}")
338
339
if __name__ == '__main__':
340
app.run(host='0.0.0.0', port=8080)
341
```
342
343
### Channel Management
344
345
```python
346
from googleapiclient.channel import Channel
347
import time
348
import uuid
349
350
class ChannelManager:
351
"""Manage multiple notification channels."""
352
353
def __init__(self, service):
354
self.service = service
355
self.active_channels = {}
356
357
def create_channel(self, resource_path, webhook_url, duration_hours=1):
358
"""
359
Create a new notification channel.
360
361
Args:
362
resource_path (str): API resource path to watch
363
webhook_url (str): URL for receiving notifications
364
duration_hours (int): How long the channel should remain active
365
366
Returns:
367
Channel: Created channel object
368
"""
369
channel = Channel(
370
type_='web_hook',
371
id_=str(uuid.uuid4()),
372
address=webhook_url,
373
token=f'token-{int(time.time())}',
374
expiration=int((time.time() + duration_hours * 3600) * 1000)
375
)
376
377
# Start watching the resource
378
watch_body = {
379
'id': channel.id,
380
'type': channel.type,
381
'address': channel.address,
382
'token': channel.token,
383
'expiration': channel.expiration
384
}
385
386
# This would vary based on the specific API and resource
387
# Example for Gmail:
388
if 'messages' in resource_path:
389
result = self.service.users().messages().watch(
390
userId='me',
391
body=watch_body
392
).execute()
393
394
# Update channel with server response
395
channel.update(result)
396
self.active_channels[channel.id] = channel
397
398
return channel
399
400
def stop_channel(self, channel_id):
401
"""Stop a notification channel."""
402
if channel_id in self.active_channels:
403
channel = self.active_channels[channel_id]
404
405
# Stop the channel
406
self.service.users().stop(body={
407
'id': channel.id,
408
'resourceId': channel.resource_id
409
}).execute()
410
411
del self.active_channels[channel_id]
412
print(f"Stopped channel: {channel_id}")
413
414
def list_active_channels(self):
415
"""List all active channels."""
416
return list(self.active_channels.values())
417
418
def cleanup_expired_channels(self):
419
"""Remove expired channels from tracking."""
420
current_time = int(time.time() * 1000)
421
expired = [
422
cid for cid, channel in self.active_channels.items()
423
if channel.expiration and channel.expiration < current_time
424
]
425
426
for channel_id in expired:
427
del self.active_channels[channel_id]
428
print(f"Removed expired channel: {channel_id}")
429
430
# Usage
431
service = discovery.build('gmail', 'v1', credentials=credentials)
432
manager = ChannelManager(service)
433
434
# Create a channel
435
channel = manager.create_channel(
436
'users/me/messages',
437
'https://myapp.example.com/webhook/gmail',
438
duration_hours=24
439
)
440
441
print(f"Created channel: {channel.id}")
442
443
# Later, stop the channel
444
manager.stop_channel(channel.id)
445
```
446
447
### Notification Validation
448
449
```python
450
from googleapiclient.channel import Notification
451
import hmac
452
import hashlib
453
454
class NotificationValidator:
455
"""Validate incoming webhook notifications."""
456
457
def __init__(self, expected_tokens):
458
"""
459
Initialize validator with expected tokens.
460
461
Args:
462
expected_tokens (dict): Mapping of channel IDs to tokens
463
"""
464
self.expected_tokens = expected_tokens
465
466
def validate_notification(self, headers, body=None):
467
"""
468
Validate an incoming notification.
469
470
Args:
471
headers (dict): HTTP headers from the webhook request
472
body (bytes, optional): Request body content
473
474
Returns:
475
tuple: (is_valid, notification) - validation result and notification object
476
"""
477
# Extract notification data
478
notification = Notification(
479
message_number=int(headers.get('X-Goog-Message-Number', 0)),
480
state=headers.get('X-Goog-Resource-State', 'unknown'),
481
resource_state=headers.get('X-Goog-Resource-State', 'unknown'),
482
resource_id=headers.get('X-Goog-Resource-ID', ''),
483
resource_uri=headers.get('X-Goog-Resource-URI', ''),
484
channel_id=headers.get('X-Goog-Channel-ID', ''),
485
channel_expiration=headers.get('X-Goog-Channel-Expiration'),
486
channel_token=headers.get('X-Goog-Channel-Token')
487
)
488
489
# Validate token
490
expected_token = self.expected_tokens.get(notification.channel_id)
491
if not expected_token or notification.channel_token != expected_token:
492
return False, notification
493
494
# Check channel expiration
495
if notification.channel_expiration:
496
current_time = int(time.time() * 1000)
497
if int(notification.channel_expiration) < current_time:
498
return False, notification
499
500
return True, notification
501
502
# Usage in Flask webhook handler
503
validator = NotificationValidator({
504
'channel-id-1': 'secret-token-1',
505
'channel-id-2': 'secret-token-2'
506
})
507
508
@app.route('/webhook', methods=['POST'])
509
def webhook_handler():
510
is_valid, notification = validator.validate_notification(request.headers)
511
512
if not is_valid:
513
return 'Invalid notification', 401
514
515
# Process valid notification
516
print(f"Valid notification from channel: {notification.channel_id}")
517
return 'OK', 200
518
```
519
520
### Batch Channel Operations
521
522
```python
523
from googleapiclient import http
524
from googleapiclient.channel import Channel
525
import uuid
526
527
def create_multiple_channels(service, resources, webhook_base_url):
528
"""Create multiple channels in a batch operation."""
529
530
batch = http.BatchHttpRequest()
531
channels = []
532
533
def channel_callback(request_id, response, exception):
534
if exception:
535
print(f"Failed to create channel {request_id}: {exception}")
536
else:
537
print(f"Created channel {request_id}: {response['id']}")
538
539
# Create channels for multiple resources
540
for i, resource in enumerate(resources):
541
channel = Channel(
542
type_='web_hook',
543
id_=str(uuid.uuid4()),
544
address=f'{webhook_base_url}/{resource}',
545
token=f'token-{resource}-{int(time.time())}',
546
expiration=int((time.time() + 3600) * 1000)
547
)
548
549
channels.append(channel)
550
551
# Add to batch
552
watch_request = service.users().messages().watch(
553
userId='me',
554
body={
555
'id': channel.id,
556
'type': channel.type,
557
'address': channel.address,
558
'token': channel.token,
559
'expiration': channel.expiration
560
}
561
)
562
563
batch.add(watch_request, callback=channel_callback, request_id=f'channel-{i}')
564
565
# Execute batch
566
batch.execute()
567
return channels
568
569
# Usage
570
resources = ['messages', 'drafts', 'labels']
571
channels = create_multiple_channels(
572
service,
573
resources,
574
'https://myapp.example.com/webhook'
575
)
576
```