or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-queueing.mdconnections.mdcursors.mdindex.mdlobs.mdnotifications.mdobject-types.mdsession-pools.mdsoda.md

notifications.mddocs/

0

# Database Change Notifications

1

2

Continuous Query Notification (CQN) and Database Change Notification for real-time monitoring of database changes with callback-based event handling.

3

4

## Capabilities

5

6

### Subscription Management

7

8

Create and manage subscriptions for database change notifications.

9

10

```python { .api }

11

class Connection:

12

def subscribe(self, callback, sql=None, operations=None, qos=None,

13

timeout=0, namespace=SUBSCR_NAMESPACE_DBCHANGE,

14

protocol=SUBSCR_PROTO_OCI, port=0, ipAddress=None,

15

groupingClass=0, groupingValue=0, groupingType=0,

16

name=None) -> Subscription:

17

"""

18

Create subscription for database change notifications.

19

20

Parameters:

21

- callback: Function to call when changes occur

22

- sql (str): SQL statement to monitor (for CQN)

23

- operations (int): Operations to monitor (OPCODE_* constants)

24

- qos (int): Quality of service flags

25

- timeout (int): Subscription timeout in seconds (0 = no timeout)

26

- namespace (int): Notification namespace

27

- protocol (int): Notification protocol

28

- port (int): Port for notifications (0 = auto-assign)

29

- ipAddress (str): IP address for notifications

30

- groupingClass (int): Grouping class for batching

31

- groupingValue (int): Grouping value

32

- groupingType (int): Grouping type

33

- name (str): Subscription name

34

35

Returns:

36

Subscription object

37

"""

38

39

def unsubscribe(self, subscription: Subscription) -> None:

40

"""

41

Remove database change notification subscription.

42

43

Parameters:

44

- subscription: Subscription object to remove

45

"""

46

```

47

48

Usage examples:

49

50

```python

51

def change_callback(message):

52

"""Callback function for handling database changes"""

53

print(f"Database change notification received!")

54

print(f"Event type: {message.type}")

55

print(f"Database: {message.dbname}")

56

57

if message.tables:

58

for table in message.tables:

59

print(f"Table changed: {table.name}")

60

print(f"Operation: {table.operation}")

61

62

if table.rows:

63

for row in table.rows:

64

print(f" Row ID: {row.rowid}")

65

print(f" Operation: {row.operation}")

66

67

# Create basic subscription

68

subscription = connection.subscribe(

69

callback=change_callback,

70

operations=cx_Oracle.OPCODE_INSERT | cx_Oracle.OPCODE_UPDATE | cx_Oracle.OPCODE_DELETE,

71

timeout=3600 # 1 hour timeout

72

)

73

74

print(f"Subscription ID: {subscription.id}")

75

print("Monitoring database changes...")

76

77

# Register tables for monitoring (requires additional setup)

78

cursor = connection.cursor()

79

cursor.execute("SELECT * FROM employees") # This query will be monitored

80

cursor.close()

81

82

# Keep subscription active

83

import time

84

time.sleep(60) # Monitor for 1 minute

85

86

# Clean up

87

connection.unsubscribe(subscription)

88

```

89

90

### Subscription Properties and Configuration

91

92

Access subscription properties and configuration options.

93

94

```python { .api }

95

class Subscription:

96

@property

97

def callback(self):

98

"""Callback function for change notifications"""

99

100

@property

101

def connection(self) -> Connection:

102

"""Associated connection object"""

103

104

@property

105

def name(self) -> str:

106

"""Subscription name"""

107

108

@property

109

def namespace(self) -> int:

110

"""Notification namespace"""

111

112

@property

113

def operations(self) -> int:

114

"""Operations being monitored"""

115

116

@property

117

def port(self) -> int:

118

"""Notification port"""

119

120

@property

121

def protocol(self) -> int:

122

"""Notification protocol"""

123

124

@property

125

def qos(self) -> int:

126

"""Quality of service flags"""

127

128

@property

129

def timeout(self) -> int:

130

"""Subscription timeout in seconds"""

131

132

@property

133

def id(self) -> int:

134

"""Subscription ID"""

135

```

136

137

### Namespace Constants

138

139

Define notification namespaces for different types of events.

140

141

```python { .api }

142

SUBSCR_NAMESPACE_DBCHANGE: int # Database change notifications

143

SUBSCR_NAMESPACE_AQ: int # Advanced Queueing notifications

144

```

145

146

### Protocol Constants

147

148

Control how notifications are delivered.

149

150

```python { .api }

151

SUBSCR_PROTO_OCI: int # OCI callback protocol (default)

152

SUBSCR_PROTO_MAIL: int # Email notifications

153

SUBSCR_PROTO_HTTP: int # HTTP notifications

154

SUBSCR_PROTO_PLSQL: int # PL/SQL server-side notifications

155

```

156

157

### Quality of Service Flags

158

159

Configure notification behavior and reliability.

160

161

```python { .api }

162

SUBSCR_QOS_RELIABLE: int # Reliable notification delivery

163

SUBSCR_QOS_DEREG_NFY: int # Notify when subscription is deregistered

164

SUBSCR_QOS_ROWIDS: int # Include row IDs in notifications

165

SUBSCR_QOS_QUERY: int # Enable continuous query notification

166

SUBSCR_QOS_BEST_EFFORT: int # Best effort delivery (may lose notifications)

167

```

168

169

Usage examples:

170

171

```python

172

# Create subscription with quality of service options

173

reliable_subscription = connection.subscribe(

174

callback=change_callback,

175

qos=cx_Oracle.SUBSCR_QOS_RELIABLE | cx_Oracle.SUBSCR_QOS_ROWIDS,

176

timeout=7200 # 2 hours

177

)

178

179

# Create CQN subscription for specific query

180

cqn_subscription = connection.subscribe(

181

callback=change_callback,

182

sql="SELECT employee_id, name FROM employees WHERE department = 'IT'",

183

qos=cx_Oracle.SUBSCR_QOS_QUERY | cx_Oracle.SUBSCR_QOS_ROWIDS

184

)

185

```

186

187

## Change Notification Events

188

189

### Event Types

190

191

Different types of database events that can trigger notifications.

192

193

```python { .api }

194

EVENT_NONE: int # No event

195

EVENT_STARTUP: int # Database startup

196

EVENT_SHUTDOWN: int # Database shutdown

197

EVENT_SHUTDOWN_ANY: int # Any shutdown event

198

EVENT_DEREG: int # Subscription deregistration

199

EVENT_OBJCHANGE: int # Object change (table/view)

200

EVENT_QUERYCHANGE: int # Query result change (CQN)

201

EVENT_AQ: int # Advanced Queueing event

202

```

203

204

### Operation Codes

205

206

Specific database operations that triggered the change.

207

208

```python { .api }

209

OPCODE_ALLOPS: int # All operations

210

OPCODE_ALLROWS: int # All rows affected

211

OPCODE_INSERT: int # Insert operation

212

OPCODE_UPDATE: int # Update operation

213

OPCODE_DELETE: int # Delete operation

214

OPCODE_ALTER: int # DDL alter operation

215

OPCODE_DROP: int # DDL drop operation

216

```

217

218

### Message Structure

219

220

Objects passed to callback functions containing event details.

221

222

```python { .api }

223

class Message:

224

@property

225

def type(self) -> int:

226

"""Event type (EVENT_* constants)"""

227

228

@property

229

def dbname(self) -> str:

230

"""Database name"""

231

232

@property

233

def tables(self) -> list:

234

"""List of affected tables (MessageTable objects)"""

235

236

@property

237

def queries(self) -> list:

238

"""List of affected queries (MessageQuery objects)"""

239

240

@property

241

def queueName(self) -> str:

242

"""Queue name (for AQ events)"""

243

244

@property

245

def consumerName(self) -> str:

246

"""Consumer name (for AQ events)"""

247

248

@property

249

def registered(self) -> bool:

250

"""Whether subscription is still registered"""

251

252

class MessageTable:

253

@property

254

def name(self) -> str:

255

"""Table name"""

256

257

@property

258

def operation(self) -> int:

259

"""Operation type (OPCODE_* constants)"""

260

261

@property

262

def rows(self) -> list:

263

"""List of affected rows (MessageRow objects)"""

264

265

class MessageRow:

266

@property

267

def rowid(self) -> str:

268

"""Row ID of affected row"""

269

270

@property

271

def operation(self) -> int:

272

"""Operation type for this row"""

273

274

class MessageQuery:

275

@property

276

def id(self) -> int:

277

"""Query ID"""

278

279

@property

280

def operation(self) -> int:

281

"""Operation type"""

282

283

@property

284

def tables(self) -> list:

285

"""List of tables involved in query"""

286

```

287

288

## Advanced Notification Patterns

289

290

### Continuous Query Notification (CQN)

291

292

Monitor specific query results for changes:

293

294

```python

295

def setup_cqn_monitoring():

296

"""Setup continuous query notification for specific data"""

297

298

def query_change_callback(message):

299

"""Handle query result changes"""

300

print("Query results changed!")

301

302

if message.queries:

303

for query in message.queries:

304

print(f"Query {query.id} changed")

305

print(f"Operation: {query.operation}")

306

307

# Re-execute query to get updated results

308

cursor = connection.cursor()

309

cursor.execute("SELECT * FROM employees WHERE salary > 50000")

310

updated_results = cursor.fetchall()

311

print(f"Updated query returned {len(updated_results)} rows")

312

cursor.close()

313

314

# Create CQN subscription for high-salary employees

315

cqn_sub = connection.subscribe(

316

callback=query_change_callback,

317

sql="SELECT employee_id, name, salary FROM employees WHERE salary > 50000",

318

qos=cx_Oracle.SUBSCR_QOS_QUERY | cx_Oracle.SUBSCR_QOS_ROWIDS

319

)

320

321

return cqn_sub

322

323

# Setup monitoring

324

cqn_subscription = setup_cqn_monitoring()

325

326

# Execute the monitored query to register it

327

cursor = connection.cursor()

328

cursor.execute("SELECT employee_id, name, salary FROM employees WHERE salary > 50000")

329

initial_results = cursor.fetchall()

330

print(f"Initially found {len(initial_results)} high-salary employees")

331

cursor.close()

332

```

333

334

### Table-Level Change Monitoring

335

336

Monitor all changes to specific tables:

337

338

```python

339

def setup_table_monitoring(table_names):

340

"""Monitor changes to specific tables"""

341

342

def table_change_callback(message):

343

"""Handle table change notifications"""

344

print(f"Table changes detected in database: {message.dbname}")

345

346

for table in message.tables:

347

print(f"\nTable: {table.name}")

348

349

operation_name = {

350

cx_Oracle.OPCODE_INSERT: "INSERT",

351

cx_Oracle.OPCODE_UPDATE: "UPDATE",

352

cx_Oracle.OPCODE_DELETE: "DELETE",

353

cx_Oracle.OPCODE_ALTER: "ALTER",

354

cx_Oracle.OPCODE_DROP: "DROP"

355

}.get(table.operation, f"Unknown({table.operation})")

356

357

print(f"Operation: {operation_name}")

358

359

if table.rows:

360

print(f"Affected rows: {len(table.rows)}")

361

for row in table.rows[:5]: # Show first 5 rows

362

print(f" Row ID: {row.rowid}")

363

364

if len(table.rows) > 5:

365

print(f" ... and {len(table.rows) - 5} more rows")

366

367

# Create subscription for table changes

368

table_sub = connection.subscribe(

369

callback=table_change_callback,

370

operations=cx_Oracle.OPCODE_INSERT | cx_Oracle.OPCODE_UPDATE | cx_Oracle.OPCODE_DELETE,

371

qos=cx_Oracle.SUBSCR_QOS_ROWIDS,

372

timeout=3600

373

)

374

375

# Register tables by querying them

376

cursor = connection.cursor()

377

for table_name in table_names:

378

try:

379

cursor.execute(f"SELECT 1 FROM {table_name} WHERE ROWNUM = 1")

380

cursor.fetchall()

381

print(f"Registered table: {table_name}")

382

except cx_Oracle.DatabaseError as e:

383

print(f"Could not register table {table_name}: {e}")

384

cursor.close()

385

386

return table_sub

387

388

# Monitor specific tables

389

monitored_tables = ["employees", "departments", "projects"]

390

table_subscription = setup_table_monitoring(monitored_tables)

391

```

392

393

### Subscription Grouping and Batching

394

395

Group notifications to reduce callback frequency:

396

397

```python

398

def setup_grouped_notifications():

399

"""Setup grouped notifications for better performance"""

400

401

def batch_change_callback(message):

402

"""Handle batched change notifications"""

403

print("Received batch of database changes")

404

405

# Process all tables in batch

406

all_changes = []

407

for table in message.tables:

408

all_changes.append({

409

'table': table.name,

410

'operation': table.operation,

411

'row_count': len(table.rows) if table.rows else 0

412

})

413

414

# Process changes in batch

415

print(f"Processing {len(all_changes)} table changes:")

416

for change in all_changes:

417

print(f" {change['table']}: {change['row_count']} rows")

418

419

# Create subscription with grouping

420

grouped_sub = connection.subscribe(

421

callback=batch_change_callback,

422

operations=cx_Oracle.OPCODE_ALLOPS,

423

qos=cx_Oracle.SUBSCR_QOS_RELIABLE,

424

groupingClass=cx_Oracle.SUBSCR_GROUPING_CLASS_TIME,

425

groupingValue=30, # Batch changes for 30 seconds

426

groupingType=cx_Oracle.SUBSCR_GROUPING_TYPE_SUMMARY

427

)

428

429

return grouped_sub

430

431

# Grouping constants

432

SUBSCR_GROUPING_CLASS_TIME: int = 1 # Time-based grouping

433

SUBSCR_GROUPING_TYPE_SUMMARY: int = 1 # Summary grouping

434

SUBSCR_GROUPING_TYPE_LAST: int = 2 # Last event only

435

436

grouped_subscription = setup_grouped_notifications()

437

```

438

439

## Notification Application Patterns

440

441

### Real-time Cache Invalidation

442

443

Use notifications to maintain cache consistency:

444

445

```python

446

class DatabaseCache:

447

def __init__(self, connection):

448

self.connection = connection

449

self.cache = {}

450

self.setup_invalidation()

451

452

def setup_invalidation(self):

453

"""Setup cache invalidation on data changes"""

454

def invalidate_callback(message):

455

for table in message.tables:

456

cache_key = f"table_{table.name}"

457

if cache_key in self.cache:

458

print(f"Invalidating cache for table: {table.name}")

459

del self.cache[cache_key]

460

461

self.subscription = self.connection.subscribe(

462

callback=invalidate_callback,

463

operations=cx_Oracle.OPCODE_ALLOPS,

464

qos=cx_Oracle.SUBSCR_QOS_RELIABLE

465

)

466

467

# Register tables

468

cursor = self.connection.cursor()

469

cursor.execute("SELECT 1 FROM employees WHERE ROWNUM = 1")

470

cursor.execute("SELECT 1 FROM departments WHERE ROWNUM = 1")

471

cursor.close()

472

473

def get_employees(self):

474

"""Get employees with caching"""

475

cache_key = "table_employees"

476

477

if cache_key not in self.cache:

478

cursor = self.connection.cursor()

479

cursor.execute("SELECT employee_id, name FROM employees")

480

self.cache[cache_key] = cursor.fetchall()

481

cursor.close()

482

print("Loaded employees into cache")

483

484

return self.cache[cache_key]

485

486

def cleanup(self):

487

"""Clean up subscription"""

488

self.connection.unsubscribe(self.subscription)

489

490

# Usage

491

cache = DatabaseCache(connection)

492

employees = cache.get_employees() # Loads from database

493

employees = cache.get_employees() # Returns from cache

494

495

# Simulate data change (in another session)

496

# UPDATE employees SET name = 'Updated Name' WHERE employee_id = 1;

497

# COMMIT;

498

499

employees = cache.get_employees() # Reloads from database after invalidation

500

cache.cleanup()

501

```

502

503

### Event-Driven Processing

504

505

Use notifications to trigger application workflows:

506

507

```python

508

def setup_workflow_triggers():

509

"""Setup event-driven workflow processing"""

510

511

def workflow_callback(message):

512

"""Process database changes as workflow events"""

513

for table in message.tables:

514

if table.name.upper() == "ORDERS":

515

if table.operation == cx_Oracle.OPCODE_INSERT:

516

print("New order created - triggering fulfillment workflow")

517

# trigger_order_fulfillment()

518

519

elif table.operation == cx_Oracle.OPCODE_UPDATE:

520

print("Order updated - checking status changes")

521

# check_order_status_changes()

522

523

elif table.name.upper() == "EMPLOYEES":

524

if table.operation == cx_Oracle.OPCODE_INSERT:

525

print("New employee added - triggering onboarding workflow")

526

# trigger_employee_onboarding()

527

528

workflow_sub = connection.subscribe(

529

callback=workflow_callback,

530

operations=cx_Oracle.OPCODE_INSERT | cx_Oracle.OPCODE_UPDATE,

531

qos=cx_Oracle.SUBSCR_QOS_RELIABLE

532

)

533

534

# Register workflow tables

535

cursor = connection.cursor()

536

cursor.execute("SELECT 1 FROM orders WHERE ROWNUM = 1")

537

cursor.execute("SELECT 1 FROM employees WHERE ROWNUM = 1")

538

cursor.close()

539

540

return workflow_sub

541

542

workflow_subscription = setup_workflow_triggers()

543

```

544

545

## Error Handling and Troubleshooting

546

547

Handle notification-related errors:

548

549

```python

550

try:

551

subscription = connection.subscribe(

552

callback=change_callback,

553

operations=cx_Oracle.OPCODE_ALLOPS

554

)

555

556

except cx_Oracle.DatabaseError as e:

557

error_obj, = e.args

558

559

if error_obj.code == 29972: # Insufficient privileges

560

print("Insufficient privileges for change notification")

561

elif error_obj.code == 29966: # Subscription limit reached

562

print("Maximum number of subscriptions reached")

563

elif error_obj.code == 29970: # Invalid callback

564

print("Invalid callback function")

565

else:

566

print(f"Subscription error: {error_obj.message}")

567

568

def robust_callback(message):

569

"""Callback with error handling"""

570

try:

571

# Process notification

572

for table in message.tables:

573

print(f"Processing changes to {table.name}")

574

575

except Exception as e:

576

print(f"Error processing notification: {e}")

577

# Log error but don't raise to avoid breaking subscription

578

579

# Use robust callback

580

subscription = connection.subscribe(

581

callback=robust_callback,

582

operations=cx_Oracle.OPCODE_ALLOPS

583

)

584

```

585

586

## Notification Best Practices

587

588

1. **Handle callback errors gracefully**: Don't let exceptions in callbacks break subscriptions

589

2. **Use appropriate QoS settings**: Choose between reliability and performance based on requirements

590

3. **Monitor subscription health**: Check subscription.registered status periodically

591

4. **Clean up subscriptions**: Always unsubscribe when done to free resources

592

5. **Batch related operations**: Use grouping to reduce callback frequency for high-volume changes

593

6. **Test with realistic data volumes**: Ensure callbacks can handle expected notification rates

594

7. **Use connection pooling carefully**: Subscriptions are tied to specific connections

595

8. **Consider security implications**: Notifications can reveal data access patterns