0
# Database Change Notifications
1
2
Continuous Query Notification (CQN) and Database Change Notification for real-time monitoring of database changes with callback-based event handling.
3
4
## Capabilities
5
6
### Subscription Management
7
8
Create and manage subscriptions for database change notifications.
9
10
```python { .api }
11
class Connection:
12
def subscribe(self, callback, sql=None, operations=None, qos=None,
13
timeout=0, namespace=SUBSCR_NAMESPACE_DBCHANGE,
14
protocol=SUBSCR_PROTO_OCI, port=0, ipAddress=None,
15
groupingClass=0, groupingValue=0, groupingType=0,
16
name=None) -> Subscription:
17
"""
18
Create subscription for database change notifications.
19
20
Parameters:
21
- callback: Function to call when changes occur
22
- sql (str): SQL statement to monitor (for CQN)
23
- operations (int): Operations to monitor (OPCODE_* constants)
24
- qos (int): Quality of service flags
25
- timeout (int): Subscription timeout in seconds (0 = no timeout)
26
- namespace (int): Notification namespace
27
- protocol (int): Notification protocol
28
- port (int): Port for notifications (0 = auto-assign)
29
- ipAddress (str): IP address for notifications
30
- groupingClass (int): Grouping class for batching
31
- groupingValue (int): Grouping value
32
- groupingType (int): Grouping type
33
- name (str): Subscription name
34
35
Returns:
36
Subscription object
37
"""
38
39
def unsubscribe(self, subscription: Subscription) -> None:
40
"""
41
Remove database change notification subscription.
42
43
Parameters:
44
- subscription: Subscription object to remove
45
"""
46
```
47
48
Usage examples:
49
50
```python
51
def change_callback(message):
52
"""Callback function for handling database changes"""
53
print(f"Database change notification received!")
54
print(f"Event type: {message.type}")
55
print(f"Database: {message.dbname}")
56
57
if message.tables:
58
for table in message.tables:
59
print(f"Table changed: {table.name}")
60
print(f"Operation: {table.operation}")
61
62
if table.rows:
63
for row in table.rows:
64
print(f" Row ID: {row.rowid}")
65
print(f" Operation: {row.operation}")
66
67
# Create basic subscription
68
subscription = connection.subscribe(
69
callback=change_callback,
70
operations=cx_Oracle.OPCODE_INSERT | cx_Oracle.OPCODE_UPDATE | cx_Oracle.OPCODE_DELETE,
71
timeout=3600 # 1 hour timeout
72
)
73
74
print(f"Subscription ID: {subscription.id}")
75
print("Monitoring database changes...")
76
77
# Register tables for monitoring (requires additional setup)
78
cursor = connection.cursor()
79
cursor.execute("SELECT * FROM employees") # This query will be monitored
80
cursor.close()
81
82
# Keep subscription active
83
import time
84
time.sleep(60) # Monitor for 1 minute
85
86
# Clean up
87
connection.unsubscribe(subscription)
88
```
89
90
### Subscription Properties and Configuration
91
92
Access subscription properties and configuration options.
93
94
```python { .api }
95
class Subscription:
96
@property
97
def callback(self):
98
"""Callback function for change notifications"""
99
100
@property
101
def connection(self) -> Connection:
102
"""Associated connection object"""
103
104
@property
105
def name(self) -> str:
106
"""Subscription name"""
107
108
@property
109
def namespace(self) -> int:
110
"""Notification namespace"""
111
112
@property
113
def operations(self) -> int:
114
"""Operations being monitored"""
115
116
@property
117
def port(self) -> int:
118
"""Notification port"""
119
120
@property
121
def protocol(self) -> int:
122
"""Notification protocol"""
123
124
@property
125
def qos(self) -> int:
126
"""Quality of service flags"""
127
128
@property
129
def timeout(self) -> int:
130
"""Subscription timeout in seconds"""
131
132
@property
133
def id(self) -> int:
134
"""Subscription ID"""
135
```
136
137
### Namespace Constants
138
139
Define notification namespaces for different types of events.
140
141
```python { .api }
142
SUBSCR_NAMESPACE_DBCHANGE: int # Database change notifications
143
SUBSCR_NAMESPACE_AQ: int # Advanced Queueing notifications
144
```
145
146
### Protocol Constants
147
148
Control how notifications are delivered.
149
150
```python { .api }
151
SUBSCR_PROTO_OCI: int # OCI callback protocol (default)
152
SUBSCR_PROTO_MAIL: int # Email notifications
153
SUBSCR_PROTO_HTTP: int # HTTP notifications
154
SUBSCR_PROTO_PLSQL: int # PL/SQL server-side notifications
155
```
156
157
### Quality of Service Flags
158
159
Configure notification behavior and reliability.
160
161
```python { .api }
162
SUBSCR_QOS_RELIABLE: int # Reliable notification delivery
163
SUBSCR_QOS_DEREG_NFY: int # Notify when subscription is deregistered
164
SUBSCR_QOS_ROWIDS: int # Include row IDs in notifications
165
SUBSCR_QOS_QUERY: int # Enable continuous query notification
166
SUBSCR_QOS_BEST_EFFORT: int # Best effort delivery (may lose notifications)
167
```
168
169
Usage examples:
170
171
```python
172
# Create subscription with quality of service options
173
reliable_subscription = connection.subscribe(
174
callback=change_callback,
175
qos=cx_Oracle.SUBSCR_QOS_RELIABLE | cx_Oracle.SUBSCR_QOS_ROWIDS,
176
timeout=7200 # 2 hours
177
)
178
179
# Create CQN subscription for specific query
180
cqn_subscription = connection.subscribe(
181
callback=change_callback,
182
sql="SELECT employee_id, name FROM employees WHERE department = 'IT'",
183
qos=cx_Oracle.SUBSCR_QOS_QUERY | cx_Oracle.SUBSCR_QOS_ROWIDS
184
)
185
```
186
187
## Change Notification Events
188
189
### Event Types
190
191
Different types of database events that can trigger notifications.
192
193
```python { .api }
194
EVENT_NONE: int # No event
195
EVENT_STARTUP: int # Database startup
196
EVENT_SHUTDOWN: int # Database shutdown
197
EVENT_SHUTDOWN_ANY: int # Any shutdown event
198
EVENT_DEREG: int # Subscription deregistration
199
EVENT_OBJCHANGE: int # Object change (table/view)
200
EVENT_QUERYCHANGE: int # Query result change (CQN)
201
EVENT_AQ: int # Advanced Queueing event
202
```
203
204
### Operation Codes
205
206
Specific database operations that triggered the change.
207
208
```python { .api }
209
OPCODE_ALLOPS: int # All operations
210
OPCODE_ALLROWS: int # All rows affected
211
OPCODE_INSERT: int # Insert operation
212
OPCODE_UPDATE: int # Update operation
213
OPCODE_DELETE: int # Delete operation
214
OPCODE_ALTER: int # DDL alter operation
215
OPCODE_DROP: int # DDL drop operation
216
```
217
218
### Message Structure
219
220
Objects passed to callback functions containing event details.
221
222
```python { .api }
223
class Message:
224
@property
225
def type(self) -> int:
226
"""Event type (EVENT_* constants)"""
227
228
@property
229
def dbname(self) -> str:
230
"""Database name"""
231
232
@property
233
def tables(self) -> list:
234
"""List of affected tables (MessageTable objects)"""
235
236
@property
237
def queries(self) -> list:
238
"""List of affected queries (MessageQuery objects)"""
239
240
@property
241
def queueName(self) -> str:
242
"""Queue name (for AQ events)"""
243
244
@property
245
def consumerName(self) -> str:
246
"""Consumer name (for AQ events)"""
247
248
@property
249
def registered(self) -> bool:
250
"""Whether subscription is still registered"""
251
252
class MessageTable:
253
@property
254
def name(self) -> str:
255
"""Table name"""
256
257
@property
258
def operation(self) -> int:
259
"""Operation type (OPCODE_* constants)"""
260
261
@property
262
def rows(self) -> list:
263
"""List of affected rows (MessageRow objects)"""
264
265
class MessageRow:
266
@property
267
def rowid(self) -> str:
268
"""Row ID of affected row"""
269
270
@property
271
def operation(self) -> int:
272
"""Operation type for this row"""
273
274
class MessageQuery:
275
@property
276
def id(self) -> int:
277
"""Query ID"""
278
279
@property
280
def operation(self) -> int:
281
"""Operation type"""
282
283
@property
284
def tables(self) -> list:
285
"""List of tables involved in query"""
286
```
287
288
## Advanced Notification Patterns
289
290
### Continuous Query Notification (CQN)
291
292
Monitor specific query results for changes:
293
294
```python
295
def setup_cqn_monitoring():
296
"""Setup continuous query notification for specific data"""
297
298
def query_change_callback(message):
299
"""Handle query result changes"""
300
print("Query results changed!")
301
302
if message.queries:
303
for query in message.queries:
304
print(f"Query {query.id} changed")
305
print(f"Operation: {query.operation}")
306
307
# Re-execute query to get updated results
308
cursor = connection.cursor()
309
cursor.execute("SELECT * FROM employees WHERE salary > 50000")
310
updated_results = cursor.fetchall()
311
print(f"Updated query returned {len(updated_results)} rows")
312
cursor.close()
313
314
# Create CQN subscription for high-salary employees
315
cqn_sub = connection.subscribe(
316
callback=query_change_callback,
317
sql="SELECT employee_id, name, salary FROM employees WHERE salary > 50000",
318
qos=cx_Oracle.SUBSCR_QOS_QUERY | cx_Oracle.SUBSCR_QOS_ROWIDS
319
)
320
321
return cqn_sub
322
323
# Setup monitoring
324
cqn_subscription = setup_cqn_monitoring()
325
326
# Execute the monitored query to register it
327
cursor = connection.cursor()
328
cursor.execute("SELECT employee_id, name, salary FROM employees WHERE salary > 50000")
329
initial_results = cursor.fetchall()
330
print(f"Initially found {len(initial_results)} high-salary employees")
331
cursor.close()
332
```
333
334
### Table-Level Change Monitoring
335
336
Monitor all changes to specific tables:
337
338
```python
339
def setup_table_monitoring(table_names):
340
"""Monitor changes to specific tables"""
341
342
def table_change_callback(message):
343
"""Handle table change notifications"""
344
print(f"Table changes detected in database: {message.dbname}")
345
346
for table in message.tables:
347
print(f"\nTable: {table.name}")
348
349
operation_name = {
350
cx_Oracle.OPCODE_INSERT: "INSERT",
351
cx_Oracle.OPCODE_UPDATE: "UPDATE",
352
cx_Oracle.OPCODE_DELETE: "DELETE",
353
cx_Oracle.OPCODE_ALTER: "ALTER",
354
cx_Oracle.OPCODE_DROP: "DROP"
355
}.get(table.operation, f"Unknown({table.operation})")
356
357
print(f"Operation: {operation_name}")
358
359
if table.rows:
360
print(f"Affected rows: {len(table.rows)}")
361
for row in table.rows[:5]: # Show first 5 rows
362
print(f" Row ID: {row.rowid}")
363
364
if len(table.rows) > 5:
365
print(f" ... and {len(table.rows) - 5} more rows")
366
367
# Create subscription for table changes
368
table_sub = connection.subscribe(
369
callback=table_change_callback,
370
operations=cx_Oracle.OPCODE_INSERT | cx_Oracle.OPCODE_UPDATE | cx_Oracle.OPCODE_DELETE,
371
qos=cx_Oracle.SUBSCR_QOS_ROWIDS,
372
timeout=3600
373
)
374
375
# Register tables by querying them
376
cursor = connection.cursor()
377
for table_name in table_names:
378
try:
379
cursor.execute(f"SELECT 1 FROM {table_name} WHERE ROWNUM = 1")
380
cursor.fetchall()
381
print(f"Registered table: {table_name}")
382
except cx_Oracle.DatabaseError as e:
383
print(f"Could not register table {table_name}: {e}")
384
cursor.close()
385
386
return table_sub
387
388
# Monitor specific tables
389
monitored_tables = ["employees", "departments", "projects"]
390
table_subscription = setup_table_monitoring(monitored_tables)
391
```
392
393
### Subscription Grouping and Batching
394
395
Group notifications to reduce callback frequency:
396
397
```python
398
def setup_grouped_notifications():
399
"""Setup grouped notifications for better performance"""
400
401
def batch_change_callback(message):
402
"""Handle batched change notifications"""
403
print("Received batch of database changes")
404
405
# Process all tables in batch
406
all_changes = []
407
for table in message.tables:
408
all_changes.append({
409
'table': table.name,
410
'operation': table.operation,
411
'row_count': len(table.rows) if table.rows else 0
412
})
413
414
# Process changes in batch
415
print(f"Processing {len(all_changes)} table changes:")
416
for change in all_changes:
417
print(f" {change['table']}: {change['row_count']} rows")
418
419
# Create subscription with grouping
420
grouped_sub = connection.subscribe(
421
callback=batch_change_callback,
422
operations=cx_Oracle.OPCODE_ALLOPS,
423
qos=cx_Oracle.SUBSCR_QOS_RELIABLE,
424
groupingClass=cx_Oracle.SUBSCR_GROUPING_CLASS_TIME,
425
groupingValue=30, # Batch changes for 30 seconds
426
groupingType=cx_Oracle.SUBSCR_GROUPING_TYPE_SUMMARY
427
)
428
429
return grouped_sub
430
431
# Grouping constants
432
SUBSCR_GROUPING_CLASS_TIME: int = 1 # Time-based grouping
433
SUBSCR_GROUPING_TYPE_SUMMARY: int = 1 # Summary grouping
434
SUBSCR_GROUPING_TYPE_LAST: int = 2 # Last event only
435
436
grouped_subscription = setup_grouped_notifications()
437
```
438
439
## Notification Application Patterns
440
441
### Real-time Cache Invalidation
442
443
Use notifications to maintain cache consistency:
444
445
```python
446
class DatabaseCache:
447
def __init__(self, connection):
448
self.connection = connection
449
self.cache = {}
450
self.setup_invalidation()
451
452
def setup_invalidation(self):
453
"""Setup cache invalidation on data changes"""
454
def invalidate_callback(message):
455
for table in message.tables:
456
cache_key = f"table_{table.name}"
457
if cache_key in self.cache:
458
print(f"Invalidating cache for table: {table.name}")
459
del self.cache[cache_key]
460
461
self.subscription = self.connection.subscribe(
462
callback=invalidate_callback,
463
operations=cx_Oracle.OPCODE_ALLOPS,
464
qos=cx_Oracle.SUBSCR_QOS_RELIABLE
465
)
466
467
# Register tables
468
cursor = self.connection.cursor()
469
cursor.execute("SELECT 1 FROM employees WHERE ROWNUM = 1")
470
cursor.execute("SELECT 1 FROM departments WHERE ROWNUM = 1")
471
cursor.close()
472
473
def get_employees(self):
474
"""Get employees with caching"""
475
cache_key = "table_employees"
476
477
if cache_key not in self.cache:
478
cursor = self.connection.cursor()
479
cursor.execute("SELECT employee_id, name FROM employees")
480
self.cache[cache_key] = cursor.fetchall()
481
cursor.close()
482
print("Loaded employees into cache")
483
484
return self.cache[cache_key]
485
486
def cleanup(self):
487
"""Clean up subscription"""
488
self.connection.unsubscribe(self.subscription)
489
490
# Usage
491
cache = DatabaseCache(connection)
492
employees = cache.get_employees() # Loads from database
493
employees = cache.get_employees() # Returns from cache
494
495
# Simulate data change (in another session)
496
# UPDATE employees SET name = 'Updated Name' WHERE employee_id = 1;
497
# COMMIT;
498
499
employees = cache.get_employees() # Reloads from database after invalidation
500
cache.cleanup()
501
```
502
503
### Event-Driven Processing
504
505
Use notifications to trigger application workflows:
506
507
```python
508
def setup_workflow_triggers():
509
"""Setup event-driven workflow processing"""
510
511
def workflow_callback(message):
512
"""Process database changes as workflow events"""
513
for table in message.tables:
514
if table.name.upper() == "ORDERS":
515
if table.operation == cx_Oracle.OPCODE_INSERT:
516
print("New order created - triggering fulfillment workflow")
517
# trigger_order_fulfillment()
518
519
elif table.operation == cx_Oracle.OPCODE_UPDATE:
520
print("Order updated - checking status changes")
521
# check_order_status_changes()
522
523
elif table.name.upper() == "EMPLOYEES":
524
if table.operation == cx_Oracle.OPCODE_INSERT:
525
print("New employee added - triggering onboarding workflow")
526
# trigger_employee_onboarding()
527
528
workflow_sub = connection.subscribe(
529
callback=workflow_callback,
530
operations=cx_Oracle.OPCODE_INSERT | cx_Oracle.OPCODE_UPDATE,
531
qos=cx_Oracle.SUBSCR_QOS_RELIABLE
532
)
533
534
# Register workflow tables
535
cursor = connection.cursor()
536
cursor.execute("SELECT 1 FROM orders WHERE ROWNUM = 1")
537
cursor.execute("SELECT 1 FROM employees WHERE ROWNUM = 1")
538
cursor.close()
539
540
return workflow_sub
541
542
workflow_subscription = setup_workflow_triggers()
543
```
544
545
## Error Handling and Troubleshooting
546
547
Handle notification-related errors:
548
549
```python
550
try:
551
subscription = connection.subscribe(
552
callback=change_callback,
553
operations=cx_Oracle.OPCODE_ALLOPS
554
)
555
556
except cx_Oracle.DatabaseError as e:
557
error_obj, = e.args
558
559
if error_obj.code == 29972: # Insufficient privileges
560
print("Insufficient privileges for change notification")
561
elif error_obj.code == 29966: # Subscription limit reached
562
print("Maximum number of subscriptions reached")
563
elif error_obj.code == 29970: # Invalid callback
564
print("Invalid callback function")
565
else:
566
print(f"Subscription error: {error_obj.message}")
567
568
def robust_callback(message):
569
"""Callback with error handling"""
570
try:
571
# Process notification
572
for table in message.tables:
573
print(f"Processing changes to {table.name}")
574
575
except Exception as e:
576
print(f"Error processing notification: {e}")
577
# Log error but don't raise to avoid breaking subscription
578
579
# Use robust callback
580
subscription = connection.subscribe(
581
callback=robust_callback,
582
operations=cx_Oracle.OPCODE_ALLOPS
583
)
584
```
585
586
## Notification Best Practices
587
588
1. **Handle callback errors gracefully**: Don't let exceptions in callbacks break subscriptions
589
2. **Use appropriate QoS settings**: Choose between reliability and performance based on requirements
590
3. **Monitor subscription health**: Check subscription.registered status periodically
591
4. **Clean up subscriptions**: Always unsubscribe when done to free resources
592
5. **Batch related operations**: Use grouping to reduce callback frequency for high-volume changes
593
6. **Test with realistic data volumes**: Ensure callbacks can handle expected notification rates
594
7. **Use connection pooling carefully**: Subscriptions are tied to specific connections
595
8. **Consider security implications**: Notifications can reveal data access patterns