0
# Event Subscriptions
1
2
Subscribe to database events including object changes, query result changes, and Advanced Queuing (AQ) messages for real-time notifications. Database event subscriptions enable applications to receive notifications when specific database changes occur, allowing for reactive programming patterns and real-time data synchronization.
3
4
## Capabilities
5
6
### Subscription Creation
7
8
Create subscriptions for various types of database events with flexible notification protocols.
9
10
```python { .api }
11
# Subscription creation through Connection.subscribe()
12
def subscribe(
13
self,
14
namespace=SUBSCR_NAMESPACE_DBCHANGE,
15
protocol=SUBSCR_PROTO_CALLBACK,
16
callback=None,
17
timeout=0,
18
operations=OPCODE_ALLOPS,
19
port=0,
20
qos=0,
21
ip_address=None,
22
grouping_class=SUBSCR_GROUPING_CLASS_NONE,
23
grouping_value=0,
24
grouping_type=SUBSCR_GROUPING_TYPE_SUMMARY,
25
name=None,
26
client_initiated=False,
27
recipient_name=None
28
) -> Subscription:
29
"""
30
Create a subscription for database event notifications.
31
32
Parameters:
33
- namespace (int): Subscription namespace (SUBSCR_NAMESPACE_DBCHANGE, SUBSCR_NAMESPACE_AQ)
34
- protocol (int): Notification protocol (SUBSCR_PROTO_CALLBACK, SUBSCR_PROTO_HTTP, etc.)
35
- callback (callable): Callback function for notifications
36
- timeout (int): Subscription timeout in seconds (0 for no timeout)
37
- operations (int): Database operations to monitor
38
- port (int): Port for HTTP notifications
39
- qos (int): Quality of service flags
40
- ip_address (str): IP address for notifications
41
- grouping_class (int): Grouping class for notifications
42
- grouping_value (int): Grouping value
43
- grouping_type (int): Grouping type
44
- name (str): Subscription name
45
- client_initiated (bool): Client-initiated subscription
46
- recipient_name (str): Recipient name for notifications
47
48
Returns:
49
Subscription object
50
"""
51
```
52
53
### Message Classes
54
55
Classes representing different types of database event messages.
56
57
```python { .api }
58
class Message:
59
"""Database event notification message."""
60
61
# Properties
62
type: int # Event type (EVENT_OBJCHANGE, EVENT_QUERYCHANGE, etc.)
63
dbname: str # Database name
64
tables: list # List of MessageTable objects
65
queries: list # List of MessageQuery objects
66
consumer_name: str # AQ consumer name
67
queue_name: str # AQ queue name
68
subscription: object # Subscription object that received the message
69
70
class MessageTable:
71
"""Table change notification details."""
72
73
# Properties
74
name: str # Table name
75
operation: int # Operation type (OPCODE_INSERT, OPCODE_UPDATE, etc.)
76
rows: list # List of MessageRow objects
77
78
class MessageRow:
79
"""Row change notification details."""
80
81
# Properties
82
operation: int # Operation type
83
rowid: str # Row identifier
84
85
class MessageQuery:
86
"""Query change notification details."""
87
88
# Properties
89
id: int # Query ID
90
operation: int # Operation type
91
queryctx: object # Query context
92
tables: list # List of affected MessageTable objects
93
```
94
95
### Subscription Constants
96
97
Constants for configuring subscription behavior and identifying event types.
98
99
```python { .api }
100
# Subscription Namespaces
101
SUBSCR_NAMESPACE_DBCHANGE: int # Database change notifications
102
SUBSCR_NAMESPACE_AQ: int # Advanced Queuing notifications
103
104
# Subscription Protocols
105
SUBSCR_PROTO_CALLBACK: int # Python callback function
106
SUBSCR_PROTO_HTTP: int # HTTP notifications
107
SUBSCR_PROTO_MAIL: int # Email notifications
108
SUBSCR_PROTO_SERVER: int # Server-to-server notifications
109
110
# Quality of Service
111
SUBSCR_QOS_DEFAULT: int # Default QoS
112
SUBSCR_QOS_RELIABLE: int # Reliable delivery
113
SUBSCR_QOS_BEST_EFFORT: int # Best effort delivery
114
SUBSCR_QOS_DEREG_NFY: int # Deregistration notification
115
SUBSCR_QOS_ROWIDS: int # Include row IDs
116
SUBSCR_QOS_QUERY: int # Query change notification
117
118
# Grouping Classes
119
SUBSCR_GROUPING_CLASS_NONE: int # No grouping
120
SUBSCR_GROUPING_CLASS_TIME: int # Time-based grouping
121
122
# Grouping Types
123
SUBSCR_GROUPING_TYPE_SUMMARY: int # Summary notifications
124
SUBSCR_GROUPING_TYPE_LAST: int # Last notification only
125
126
# Event Types
127
EVENT_NONE: int # No event
128
EVENT_STARTUP: int # Database startup
129
EVENT_SHUTDOWN: int # Database shutdown
130
EVENT_SHUTDOWN_ANY: int # Any shutdown
131
EVENT_DEREG: int # Deregistration
132
EVENT_OBJCHANGE: int # Object change
133
EVENT_QUERYCHANGE: int # Query result change
134
EVENT_AQ: int # Advanced Queuing
135
136
# Operation Codes
137
OPCODE_ALLOPS: int # All operations
138
OPCODE_ALLROWS: int # All rows
139
OPCODE_INSERT: int # Insert operations
140
OPCODE_UPDATE: int # Update operations
141
OPCODE_DELETE: int # Delete operations
142
OPCODE_ALTER: int # Alter operations
143
OPCODE_DROP: int # Drop operations
144
```
145
146
## Usage Examples
147
148
### Basic Database Change Notifications
149
150
```python
151
import oracledb
152
import time
153
154
def notification_callback(message):
155
"""Callback function for database change notifications."""
156
print(f"Received notification: Type={message.type}, DB={message.dbname}")
157
158
for table in message.tables:
159
print(f" Table: {table.name}, Operation: {table.operation}")
160
for row in table.rows:
161
print(f" Row: {row.rowid}, Operation: {row.operation}")
162
163
# Connect to database
164
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
165
166
# Create subscription for database changes
167
subscription = connection.subscribe(
168
namespace=oracledb.SUBSCR_NAMESPACE_DBCHANGE,
169
protocol=oracledb.SUBSCR_PROTO_CALLBACK,
170
callback=notification_callback,
171
timeout=300, # 5 minutes
172
operations=oracledb.OPCODE_ALLOPS,
173
qos=oracledb.SUBSCR_QOS_ROWIDS
174
)
175
176
print(f"Created subscription with ID: {subscription.id}")
177
178
# Register queries for change notification
179
with connection.cursor() as cursor:
180
# Register interest in employees table changes
181
cursor.execute("SELECT employee_id, first_name, last_name FROM employees WHERE department_id = 10")
182
cursor.fetchall() # Consume results to register query
183
184
# Register interest in departments table
185
cursor.execute("SELECT department_id, department_name FROM departments")
186
cursor.fetchall()
187
188
print("Subscriptions registered. Making changes to trigger notifications...")
189
190
# Make changes to trigger notifications
191
with connection.cursor() as cursor:
192
cursor.execute("""
193
UPDATE employees SET salary = salary * 1.1 WHERE department_id = 10
194
""")
195
connection.commit()
196
197
cursor.execute("""
198
INSERT INTO employees (employee_id, first_name, last_name, department_id)
199
VALUES (9999, 'Test', 'Employee', 10)
200
""")
201
connection.commit()
202
203
# Wait for notifications
204
print("Waiting for notifications...")
205
time.sleep(10)
206
207
# Clean up
208
connection.close()
209
```
210
211
### Query Change Notifications
212
213
```python
214
import oracledb
215
import threading
216
import time
217
218
# Global flag to control notification processing
219
processing_notifications = True
220
221
def query_change_callback(message):
222
"""Handle query change notifications."""
223
global processing_notifications
224
225
if not processing_notifications:
226
return
227
228
print(f"Query change notification received:")
229
print(f" Event type: {message.type}")
230
print(f" Database: {message.dbname}")
231
232
for query in message.queries:
233
print(f" Query ID: {query.id}")
234
print(f" Operation: {query.operation}")
235
236
for table in query.tables:
237
print(f" Affected table: {table.name}")
238
print(f" Table operation: {table.operation}")
239
print(f" Affected rows: {len(table.rows)}")
240
241
def notification_thread():
242
"""Run in separate thread to handle notifications."""
243
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
244
245
try:
246
# Create subscription for query changes
247
subscription = connection.subscribe(
248
namespace=oracledb.SUBSCR_NAMESPACE_DBCHANGE,
249
protocol=oracledb.SUBSCR_PROTO_CALLBACK,
250
callback=query_change_callback,
251
timeout=0, # No timeout
252
qos=oracledb.SUBSCR_QOS_QUERY | oracledb.SUBSCR_QOS_RELIABLE
253
)
254
255
print(f"Query subscription created: {subscription.id}")
256
257
# Register queries to monitor
258
with connection.cursor() as cursor:
259
# Monitor high-salary employees
260
cursor.execute("""
261
SELECT employee_id, first_name, last_name, salary
262
FROM employees
263
WHERE salary > 50000
264
""")
265
cursor.fetchall()
266
267
# Monitor recent hires
268
cursor.execute("""
269
SELECT employee_id, first_name, hire_date
270
FROM employees
271
WHERE hire_date >= SYSDATE - 30
272
""")
273
cursor.fetchall()
274
275
print("Query monitoring active. Waiting for changes...")
276
277
# Keep connection alive for notifications
278
while processing_notifications:
279
time.sleep(1)
280
281
finally:
282
connection.close()
283
284
# Start notification thread
285
notification_thread = threading.Thread(target=notification_thread)
286
notification_thread.daemon = True
287
notification_thread.start()
288
289
# Give subscription time to initialize
290
time.sleep(2)
291
292
# Main thread: make changes to trigger notifications
293
main_connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
294
295
with main_connection.cursor() as cursor:
296
print("Making changes to trigger query notifications...")
297
298
# Change that affects high-salary query
299
cursor.execute("""
300
UPDATE employees SET salary = 55000
301
WHERE employee_id = (SELECT MIN(employee_id) FROM employees WHERE salary < 50000)
302
""")
303
main_connection.commit()
304
305
time.sleep(2)
306
307
# Insert new employee (affects recent hires query)
308
cursor.execute("""
309
INSERT INTO employees (employee_id, first_name, last_name, hire_date, salary, department_id)
310
VALUES (8888, 'Recent', 'Hire', SYSDATE, 45000, 10)
311
""")
312
main_connection.commit()
313
314
# Wait for notifications
315
print("Waiting for notifications...")
316
time.sleep(5)
317
318
# Clean up
319
processing_notifications = False
320
main_connection.close()
321
```
322
323
### Advanced Queuing (AQ) Notifications
324
325
```python
326
import oracledb
327
import time
328
329
def aq_notification_callback(message):
330
"""Handle Advanced Queuing notifications."""
331
print(f"AQ Notification received:")
332
print(f" Event type: {message.type}")
333
print(f" Queue: {message.queue_name}")
334
print(f" Consumer: {message.consumer_name}")
335
print(f" Database: {message.dbname}")
336
337
# Connect to database
338
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
339
340
# Set up Advanced Queuing infrastructure
341
with connection.cursor() as cursor:
342
try:
343
# Create queue table
344
cursor.execute("""
345
BEGIN
346
DBMS_AQADM.CREATE_QUEUE_TABLE(
347
queue_table => 'my_queue_table',
348
queue_payload_type => 'SYS.AQ$_JMS_TEXT_MESSAGE'
349
);
350
END;
351
""")
352
353
# Create queue
354
cursor.execute("""
355
BEGIN
356
DBMS_AQADM.CREATE_QUEUE(
357
queue_name => 'my_notification_queue',
358
queue_table => 'my_queue_table'
359
);
360
END;
361
""")
362
363
# Start queue
364
cursor.execute("""
365
BEGIN
366
DBMS_AQADM.START_QUEUE('my_notification_queue');
367
END;
368
""")
369
370
connection.commit()
371
372
except oracledb.DatabaseError as e:
373
if "ORA-00955" in str(e): # Object already exists
374
print("Queue infrastructure already exists")
375
else:
376
raise
377
378
# Create AQ subscription
379
subscription = connection.subscribe(
380
namespace=oracledb.SUBSCR_NAMESPACE_AQ,
381
protocol=oracledb.SUBSCR_PROTO_CALLBACK,
382
callback=aq_notification_callback,
383
name="my_notification_queue",
384
timeout=300
385
)
386
387
print(f"AQ subscription created: {subscription.id}")
388
389
# Enqueue messages to trigger notifications
390
with connection.cursor() as cursor:
391
# Enqueue a message
392
cursor.execute("""
393
DECLARE
394
enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
395
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
396
message_handle RAW(16);
397
message SYS.AQ$_JMS_TEXT_MESSAGE;
398
BEGIN
399
message := SYS.AQ$_JMS_TEXT_MESSAGE.construct;
400
message.set_text('Hello from AQ notification test!');
401
402
DBMS_AQ.ENQUEUE(
403
queue_name => 'my_notification_queue',
404
enqueue_options => enqueue_options,
405
message_properties => message_properties,
406
payload => message,
407
msgid => message_handle
408
);
409
END;
410
""")
411
412
connection.commit()
413
414
print("Message enqueued. Waiting for notification...")
415
time.sleep(5)
416
417
connection.close()
418
```
419
420
### Subscription Management
421
422
```python
423
import oracledb
424
import time
425
426
class SubscriptionManager:
427
"""Manage multiple database subscriptions."""
428
429
def __init__(self, connection):
430
self.connection = connection
431
self.subscriptions = {}
432
self.active = True
433
434
def create_table_subscription(self, table_name, callback):
435
"""Create subscription for specific table changes."""
436
437
def table_callback(message):
438
# Filter messages for specific table
439
for table in message.tables:
440
if table.name.upper() == table_name.upper():
441
callback(message, table)
442
443
subscription = self.connection.subscribe(
444
namespace=oracledb.SUBSCR_NAMESPACE_DBCHANGE,
445
protocol=oracledb.SUBSCR_PROTO_CALLBACK,
446
callback=table_callback,
447
timeout=0,
448
operations=oracledb.OPCODE_ALLOPS,
449
qos=oracledb.SUBSCR_QOS_ROWIDS | oracledb.SUBSCR_QOS_RELIABLE
450
)
451
452
# Register interest in table
453
with self.connection.cursor() as cursor:
454
cursor.execute(f"SELECT * FROM {table_name} WHERE ROWNUM <= 1")
455
cursor.fetchall()
456
457
self.subscriptions[table_name] = subscription
458
return subscription.id
459
460
def create_query_subscription(self, query, callback):
461
"""Create subscription for query result changes."""
462
463
subscription = self.connection.subscribe(
464
namespace=oracledb.SUBSCR_NAMESPACE_DBCHANGE,
465
protocol=oracledb.SUBSCR_PROTO_CALLBACK,
466
callback=callback,
467
timeout=0,
468
qos=oracledb.SUBSCR_QOS_QUERY | oracledb.SUBSCR_QOS_RELIABLE
469
)
470
471
# Register query
472
with self.connection.cursor() as cursor:
473
cursor.execute(query)
474
cursor.fetchall()
475
476
query_id = f"query_{subscription.id}"
477
self.subscriptions[query_id] = subscription
478
return subscription.id
479
480
def cleanup(self):
481
"""Clean up all subscriptions."""
482
self.active = False
483
# Subscriptions are automatically cleaned up when connection closes
484
485
# Usage example
486
def employee_change_handler(message, table):
487
"""Handle employee table changes."""
488
print(f"Employee table changed: {table.operation}")
489
print(f" Affected rows: {len(table.rows)}")
490
491
def high_salary_query_handler(message):
492
"""Handle high salary query changes."""
493
print("High salary employees query results changed")
494
for query in message.queries:
495
print(f" Query {query.id} affected {len(query.tables)} tables")
496
497
# Set up subscription manager
498
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
499
manager = SubscriptionManager(connection)
500
501
# Create subscriptions
502
emp_sub_id = manager.create_table_subscription("employees", employee_change_handler)
503
query_sub_id = manager.create_query_subscription(
504
"SELECT * FROM employees WHERE salary > 75000",
505
high_salary_query_handler
506
)
507
508
print(f"Created subscriptions: Employee table={emp_sub_id}, High salary query={query_sub_id}")
509
510
# Make changes to trigger notifications
511
with connection.cursor() as cursor:
512
cursor.execute("UPDATE employees SET salary = 80000 WHERE employee_id = 100")
513
connection.commit()
514
515
cursor.execute("INSERT INTO employees (employee_id, first_name, last_name, salary, department_id) VALUES (7777, 'High', 'Earner', 85000, 10)")
516
connection.commit()
517
518
print("Changes made. Waiting for notifications...")
519
time.sleep(5)
520
521
# Cleanup
522
manager.cleanup()
523
connection.close()
524
```
525
526
### Subscription Error Handling
527
528
```python
529
import oracledb
530
import time
531
532
def robust_notification_callback(message):
533
"""Notification callback with error handling."""
534
try:
535
print(f"Processing notification: {message.type}")
536
537
# Process tables
538
for table in message.tables:
539
print(f" Table: {table.name}")
540
541
# Validate table operations
542
if table.operation in [oracledb.OPCODE_INSERT, oracledb.OPCODE_UPDATE, oracledb.OPCODE_DELETE]:
543
print(f" Valid operation: {table.operation}")
544
545
# Process rows safely
546
for row in table.rows[:10]: # Limit processing to avoid overload
547
if hasattr(row, 'rowid') and row.rowid:
548
print(f" Row: {row.rowid}")
549
else:
550
print(f" Unexpected operation: {table.operation}")
551
552
# Process queries
553
for query in message.queries:
554
print(f" Query ID: {query.id}")
555
556
except Exception as e:
557
print(f"Error in notification callback: {e}")
558
# Log error but don't raise to avoid breaking notification system
559
560
def create_resilient_subscription(connection, max_retries=3):
561
"""Create subscription with retry logic."""
562
563
for attempt in range(max_retries):
564
try:
565
subscription = connection.subscribe(
566
namespace=oracledb.SUBSCR_NAMESPACE_DBCHANGE,
567
protocol=oracledb.SUBSCR_PROTO_CALLBACK,
568
callback=robust_notification_callback,
569
timeout=300,
570
operations=oracledb.OPCODE_ALLOPS,
571
qos=oracledb.SUBSCR_QOS_RELIABLE
572
)
573
574
print(f"Subscription created successfully on attempt {attempt + 1}")
575
return subscription
576
577
except oracledb.DatabaseError as e:
578
print(f"Subscription attempt {attempt + 1} failed: {e}")
579
if attempt == max_retries - 1:
580
raise
581
time.sleep(2) # Wait before retry
582
583
# Create resilient subscription
584
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
585
586
try:
587
subscription = create_resilient_subscription(connection)
588
589
# Register queries with error handling
590
try:
591
with connection.cursor() as cursor:
592
cursor.execute("SELECT * FROM employees WHERE department_id <= 50")
593
cursor.fetchall()
594
print("Query registered successfully")
595
except oracledb.DatabaseError as e:
596
print(f"Query registration failed: {e}")
597
598
# Test notifications
599
with connection.cursor() as cursor:
600
cursor.execute("UPDATE employees SET salary = salary + 1 WHERE ROWNUM <= 1")
601
connection.commit()
602
603
print("Waiting for notifications...")
604
time.sleep(5)
605
606
except oracledb.DatabaseError as e:
607
print(f"Subscription setup failed: {e}")
608
609
finally:
610
connection.close()
611
```