or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-queuing.mdconnection-pooling.mdconnectivity.mddata-types.mddatabase-objects.mdindex.mdlobs.mdpipeline.mdsoda.mdsql-execution.mdsubscriptions.md

subscriptions.mddocs/

0

# Event Subscriptions

1

2

Subscribe to database events including object changes, query result changes, and Advanced Queuing (AQ) messages for real-time notifications. Database event subscriptions enable applications to receive notifications when specific database changes occur, allowing for reactive programming patterns and real-time data synchronization.

3

4

## Capabilities

5

6

### Subscription Creation

7

8

Create subscriptions for various types of database events with flexible notification protocols.

9

10

```python { .api }

11

# Subscription creation through Connection.subscribe()

12

def subscribe(

13

self,

14

namespace=SUBSCR_NAMESPACE_DBCHANGE,

15

protocol=SUBSCR_PROTO_CALLBACK,

16

callback=None,

17

timeout=0,

18

operations=OPCODE_ALLOPS,

19

port=0,

20

qos=0,

21

ip_address=None,

22

grouping_class=SUBSCR_GROUPING_CLASS_NONE,

23

grouping_value=0,

24

grouping_type=SUBSCR_GROUPING_TYPE_SUMMARY,

25

name=None,

26

client_initiated=False,

27

recipient_name=None

28

) -> Subscription:

29

"""

30

Create a subscription for database event notifications.

31

32

Parameters:

33

- namespace (int): Subscription namespace (SUBSCR_NAMESPACE_DBCHANGE, SUBSCR_NAMESPACE_AQ)

34

- protocol (int): Notification protocol (SUBSCR_PROTO_CALLBACK, SUBSCR_PROTO_HTTP, etc.)

35

- callback (callable): Callback function for notifications

36

- timeout (int): Subscription timeout in seconds (0 for no timeout)

37

- operations (int): Database operations to monitor

38

- port (int): Port for HTTP notifications

39

- qos (int): Quality of service flags

40

- ip_address (str): IP address for notifications

41

- grouping_class (int): Grouping class for notifications

42

- grouping_value (int): Grouping value

43

- grouping_type (int): Grouping type

44

- name (str): Subscription name

45

- client_initiated (bool): Client-initiated subscription

46

- recipient_name (str): Recipient name for notifications

47

48

Returns:

49

Subscription object

50

"""

51

```

52

53

### Message Classes

54

55

Classes representing different types of database event messages.

56

57

```python { .api }

58

class Message:

59

"""Database event notification message."""

60

61

# Properties

62

type: int # Event type (EVENT_OBJCHANGE, EVENT_QUERYCHANGE, etc.)

63

dbname: str # Database name

64

tables: list # List of MessageTable objects

65

queries: list # List of MessageQuery objects

66

consumer_name: str # AQ consumer name

67

queue_name: str # AQ queue name

68

subscription: object # Subscription object that received the message

69

70

class MessageTable:

71

"""Table change notification details."""

72

73

# Properties

74

name: str # Table name

75

operation: int # Operation type (OPCODE_INSERT, OPCODE_UPDATE, etc.)

76

rows: list # List of MessageRow objects

77

78

class MessageRow:

79

"""Row change notification details."""

80

81

# Properties

82

operation: int # Operation type

83

rowid: str # Row identifier

84

85

class MessageQuery:

86

"""Query change notification details."""

87

88

# Properties

89

id: int # Query ID

90

operation: int # Operation type

91

queryctx: object # Query context

92

tables: list # List of affected MessageTable objects

93

```

94

95

### Subscription Constants

96

97

Constants for configuring subscription behavior and identifying event types.

98

99

```python { .api }

100

# Subscription Namespaces

101

SUBSCR_NAMESPACE_DBCHANGE: int # Database change notifications

102

SUBSCR_NAMESPACE_AQ: int # Advanced Queuing notifications

103

104

# Subscription Protocols

105

SUBSCR_PROTO_CALLBACK: int # Python callback function

106

SUBSCR_PROTO_HTTP: int # HTTP notifications

107

SUBSCR_PROTO_MAIL: int # Email notifications

108

SUBSCR_PROTO_SERVER: int # Server-to-server notifications

109

110

# Quality of Service

111

SUBSCR_QOS_DEFAULT: int # Default QoS

112

SUBSCR_QOS_RELIABLE: int # Reliable delivery

113

SUBSCR_QOS_BEST_EFFORT: int # Best effort delivery

114

SUBSCR_QOS_DEREG_NFY: int # Deregistration notification

115

SUBSCR_QOS_ROWIDS: int # Include row IDs

116

SUBSCR_QOS_QUERY: int # Query change notification

117

118

# Grouping Classes

119

SUBSCR_GROUPING_CLASS_NONE: int # No grouping

120

SUBSCR_GROUPING_CLASS_TIME: int # Time-based grouping

121

122

# Grouping Types

123

SUBSCR_GROUPING_TYPE_SUMMARY: int # Summary notifications

124

SUBSCR_GROUPING_TYPE_LAST: int # Last notification only

125

126

# Event Types

127

EVENT_NONE: int # No event

128

EVENT_STARTUP: int # Database startup

129

EVENT_SHUTDOWN: int # Database shutdown

130

EVENT_SHUTDOWN_ANY: int # Any shutdown

131

EVENT_DEREG: int # Deregistration

132

EVENT_OBJCHANGE: int # Object change

133

EVENT_QUERYCHANGE: int # Query result change

134

EVENT_AQ: int # Advanced Queuing

135

136

# Operation Codes

137

OPCODE_ALLOPS: int # All operations

138

OPCODE_ALLROWS: int # All rows

139

OPCODE_INSERT: int # Insert operations

140

OPCODE_UPDATE: int # Update operations

141

OPCODE_DELETE: int # Delete operations

142

OPCODE_ALTER: int # Alter operations

143

OPCODE_DROP: int # Drop operations

144

```

145

146

## Usage Examples

147

148

### Basic Database Change Notifications

149

150

```python

151

import oracledb

152

import time

153

154

def notification_callback(message):

155

"""Callback function for database change notifications."""

156

print(f"Received notification: Type={message.type}, DB={message.dbname}")

157

158

for table in message.tables:

159

print(f" Table: {table.name}, Operation: {table.operation}")

160

for row in table.rows:

161

print(f" Row: {row.rowid}, Operation: {row.operation}")

162

163

# Connect to database

164

connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")

165

166

# Create subscription for database changes

167

subscription = connection.subscribe(

168

namespace=oracledb.SUBSCR_NAMESPACE_DBCHANGE,

169

protocol=oracledb.SUBSCR_PROTO_CALLBACK,

170

callback=notification_callback,

171

timeout=300, # 5 minutes

172

operations=oracledb.OPCODE_ALLOPS,

173

qos=oracledb.SUBSCR_QOS_ROWIDS

174

)

175

176

print(f"Created subscription with ID: {subscription.id}")

177

178

# Register queries for change notification

179

with connection.cursor() as cursor:

180

# Register interest in employees table changes

181

cursor.execute("SELECT employee_id, first_name, last_name FROM employees WHERE department_id = 10")

182

cursor.fetchall() # Consume results to register query

183

184

# Register interest in departments table

185

cursor.execute("SELECT department_id, department_name FROM departments")

186

cursor.fetchall()

187

188

print("Subscriptions registered. Making changes to trigger notifications...")

189

190

# Make changes to trigger notifications

191

with connection.cursor() as cursor:

192

cursor.execute("""

193

UPDATE employees SET salary = salary * 1.1 WHERE department_id = 10

194

""")

195

connection.commit()

196

197

cursor.execute("""

198

INSERT INTO employees (employee_id, first_name, last_name, department_id)

199

VALUES (9999, 'Test', 'Employee', 10)

200

""")

201

connection.commit()

202

203

# Wait for notifications

204

print("Waiting for notifications...")

205

time.sleep(10)

206

207

# Clean up

208

connection.close()

209

```

210

211

### Query Change Notifications

212

213

```python

214

import oracledb

215

import threading

216

import time

217

218

# Global flag to control notification processing

219

processing_notifications = True

220

221

def query_change_callback(message):

222

"""Handle query change notifications."""

223

global processing_notifications

224

225

if not processing_notifications:

226

return

227

228

print(f"Query change notification received:")

229

print(f" Event type: {message.type}")

230

print(f" Database: {message.dbname}")

231

232

for query in message.queries:

233

print(f" Query ID: {query.id}")

234

print(f" Operation: {query.operation}")

235

236

for table in query.tables:

237

print(f" Affected table: {table.name}")

238

print(f" Table operation: {table.operation}")

239

print(f" Affected rows: {len(table.rows)}")

240

241

def notification_thread():

242

"""Run in separate thread to handle notifications."""

243

connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")

244

245

try:

246

# Create subscription for query changes

247

subscription = connection.subscribe(

248

namespace=oracledb.SUBSCR_NAMESPACE_DBCHANGE,

249

protocol=oracledb.SUBSCR_PROTO_CALLBACK,

250

callback=query_change_callback,

251

timeout=0, # No timeout

252

qos=oracledb.SUBSCR_QOS_QUERY | oracledb.SUBSCR_QOS_RELIABLE

253

)

254

255

print(f"Query subscription created: {subscription.id}")

256

257

# Register queries to monitor

258

with connection.cursor() as cursor:

259

# Monitor high-salary employees

260

cursor.execute("""

261

SELECT employee_id, first_name, last_name, salary

262

FROM employees

263

WHERE salary > 50000

264

""")

265

cursor.fetchall()

266

267

# Monitor recent hires

268

cursor.execute("""

269

SELECT employee_id, first_name, hire_date

270

FROM employees

271

WHERE hire_date >= SYSDATE - 30

272

""")

273

cursor.fetchall()

274

275

print("Query monitoring active. Waiting for changes...")

276

277

# Keep connection alive for notifications

278

while processing_notifications:

279

time.sleep(1)

280

281

finally:

282

connection.close()

283

284

# Start notification thread

285

notification_thread = threading.Thread(target=notification_thread)

286

notification_thread.daemon = True

287

notification_thread.start()

288

289

# Give subscription time to initialize

290

time.sleep(2)

291

292

# Main thread: make changes to trigger notifications

293

main_connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")

294

295

with main_connection.cursor() as cursor:

296

print("Making changes to trigger query notifications...")

297

298

# Change that affects high-salary query

299

cursor.execute("""

300

UPDATE employees SET salary = 55000

301

WHERE employee_id = (SELECT MIN(employee_id) FROM employees WHERE salary < 50000)

302

""")

303

main_connection.commit()

304

305

time.sleep(2)

306

307

# Insert new employee (affects recent hires query)

308

cursor.execute("""

309

INSERT INTO employees (employee_id, first_name, last_name, hire_date, salary, department_id)

310

VALUES (8888, 'Recent', 'Hire', SYSDATE, 45000, 10)

311

""")

312

main_connection.commit()

313

314

# Wait for notifications

315

print("Waiting for notifications...")

316

time.sleep(5)

317

318

# Clean up

319

processing_notifications = False

320

main_connection.close()

321

```

322

323

### Advanced Queuing (AQ) Notifications

324

325

```python

326

import oracledb

327

import time

328

329

def aq_notification_callback(message):

330

"""Handle Advanced Queuing notifications."""

331

print(f"AQ Notification received:")

332

print(f" Event type: {message.type}")

333

print(f" Queue: {message.queue_name}")

334

print(f" Consumer: {message.consumer_name}")

335

print(f" Database: {message.dbname}")

336

337

# Connect to database

338

connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")

339

340

# Set up Advanced Queuing infrastructure

341

with connection.cursor() as cursor:

342

try:

343

# Create queue table

344

cursor.execute("""

345

BEGIN

346

DBMS_AQADM.CREATE_QUEUE_TABLE(

347

queue_table => 'my_queue_table',

348

queue_payload_type => 'SYS.AQ$_JMS_TEXT_MESSAGE'

349

);

350

END;

351

""")

352

353

# Create queue

354

cursor.execute("""

355

BEGIN

356

DBMS_AQADM.CREATE_QUEUE(

357

queue_name => 'my_notification_queue',

358

queue_table => 'my_queue_table'

359

);

360

END;

361

""")

362

363

# Start queue

364

cursor.execute("""

365

BEGIN

366

DBMS_AQADM.START_QUEUE('my_notification_queue');

367

END;

368

""")

369

370

connection.commit()

371

372

except oracledb.DatabaseError as e:

373

if "ORA-00955" in str(e): # Object already exists

374

print("Queue infrastructure already exists")

375

else:

376

raise

377

378

# Create AQ subscription

379

subscription = connection.subscribe(

380

namespace=oracledb.SUBSCR_NAMESPACE_AQ,

381

protocol=oracledb.SUBSCR_PROTO_CALLBACK,

382

callback=aq_notification_callback,

383

name="my_notification_queue",

384

timeout=300

385

)

386

387

print(f"AQ subscription created: {subscription.id}")

388

389

# Enqueue messages to trigger notifications

390

with connection.cursor() as cursor:

391

# Enqueue a message

392

cursor.execute("""

393

DECLARE

394

enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;

395

message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;

396

message_handle RAW(16);

397

message SYS.AQ$_JMS_TEXT_MESSAGE;

398

BEGIN

399

message := SYS.AQ$_JMS_TEXT_MESSAGE.construct;

400

message.set_text('Hello from AQ notification test!');

401

402

DBMS_AQ.ENQUEUE(

403

queue_name => 'my_notification_queue',

404

enqueue_options => enqueue_options,

405

message_properties => message_properties,

406

payload => message,

407

msgid => message_handle

408

);

409

END;

410

""")

411

412

connection.commit()

413

414

print("Message enqueued. Waiting for notification...")

415

time.sleep(5)

416

417

connection.close()

418

```

419

420

### Subscription Management

421

422

```python

423

import oracledb

424

import time

425

426

class SubscriptionManager:

427

"""Manage multiple database subscriptions."""

428

429

def __init__(self, connection):

430

self.connection = connection

431

self.subscriptions = {}

432

self.active = True

433

434

def create_table_subscription(self, table_name, callback):

435

"""Create subscription for specific table changes."""

436

437

def table_callback(message):

438

# Filter messages for specific table

439

for table in message.tables:

440

if table.name.upper() == table_name.upper():

441

callback(message, table)

442

443

subscription = self.connection.subscribe(

444

namespace=oracledb.SUBSCR_NAMESPACE_DBCHANGE,

445

protocol=oracledb.SUBSCR_PROTO_CALLBACK,

446

callback=table_callback,

447

timeout=0,

448

operations=oracledb.OPCODE_ALLOPS,

449

qos=oracledb.SUBSCR_QOS_ROWIDS | oracledb.SUBSCR_QOS_RELIABLE

450

)

451

452

# Register interest in table

453

with self.connection.cursor() as cursor:

454

cursor.execute(f"SELECT * FROM {table_name} WHERE ROWNUM <= 1")

455

cursor.fetchall()

456

457

self.subscriptions[table_name] = subscription

458

return subscription.id

459

460

def create_query_subscription(self, query, callback):

461

"""Create subscription for query result changes."""

462

463

subscription = self.connection.subscribe(

464

namespace=oracledb.SUBSCR_NAMESPACE_DBCHANGE,

465

protocol=oracledb.SUBSCR_PROTO_CALLBACK,

466

callback=callback,

467

timeout=0,

468

qos=oracledb.SUBSCR_QOS_QUERY | oracledb.SUBSCR_QOS_RELIABLE

469

)

470

471

# Register query

472

with self.connection.cursor() as cursor:

473

cursor.execute(query)

474

cursor.fetchall()

475

476

query_id = f"query_{subscription.id}"

477

self.subscriptions[query_id] = subscription

478

return subscription.id

479

480

def cleanup(self):

481

"""Clean up all subscriptions."""

482

self.active = False

483

# Subscriptions are automatically cleaned up when connection closes

484

485

# Usage example

486

def employee_change_handler(message, table):

487

"""Handle employee table changes."""

488

print(f"Employee table changed: {table.operation}")

489

print(f" Affected rows: {len(table.rows)}")

490

491

def high_salary_query_handler(message):

492

"""Handle high salary query changes."""

493

print("High salary employees query results changed")

494

for query in message.queries:

495

print(f" Query {query.id} affected {len(query.tables)} tables")

496

497

# Set up subscription manager

498

connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")

499

manager = SubscriptionManager(connection)

500

501

# Create subscriptions

502

emp_sub_id = manager.create_table_subscription("employees", employee_change_handler)

503

query_sub_id = manager.create_query_subscription(

504

"SELECT * FROM employees WHERE salary > 75000",

505

high_salary_query_handler

506

)

507

508

print(f"Created subscriptions: Employee table={emp_sub_id}, High salary query={query_sub_id}")

509

510

# Make changes to trigger notifications

511

with connection.cursor() as cursor:

512

cursor.execute("UPDATE employees SET salary = 80000 WHERE employee_id = 100")

513

connection.commit()

514

515

cursor.execute("INSERT INTO employees (employee_id, first_name, last_name, salary, department_id) VALUES (7777, 'High', 'Earner', 85000, 10)")

516

connection.commit()

517

518

print("Changes made. Waiting for notifications...")

519

time.sleep(5)

520

521

# Cleanup

522

manager.cleanup()

523

connection.close()

524

```

525

526

### Subscription Error Handling

527

528

```python

529

import oracledb

530

import time

531

532

def robust_notification_callback(message):

533

"""Notification callback with error handling."""

534

try:

535

print(f"Processing notification: {message.type}")

536

537

# Process tables

538

for table in message.tables:

539

print(f" Table: {table.name}")

540

541

# Validate table operations

542

if table.operation in [oracledb.OPCODE_INSERT, oracledb.OPCODE_UPDATE, oracledb.OPCODE_DELETE]:

543

print(f" Valid operation: {table.operation}")

544

545

# Process rows safely

546

for row in table.rows[:10]: # Limit processing to avoid overload

547

if hasattr(row, 'rowid') and row.rowid:

548

print(f" Row: {row.rowid}")

549

else:

550

print(f" Unexpected operation: {table.operation}")

551

552

# Process queries

553

for query in message.queries:

554

print(f" Query ID: {query.id}")

555

556

except Exception as e:

557

print(f"Error in notification callback: {e}")

558

# Log error but don't raise to avoid breaking notification system

559

560

def create_resilient_subscription(connection, max_retries=3):

561

"""Create subscription with retry logic."""

562

563

for attempt in range(max_retries):

564

try:

565

subscription = connection.subscribe(

566

namespace=oracledb.SUBSCR_NAMESPACE_DBCHANGE,

567

protocol=oracledb.SUBSCR_PROTO_CALLBACK,

568

callback=robust_notification_callback,

569

timeout=300,

570

operations=oracledb.OPCODE_ALLOPS,

571

qos=oracledb.SUBSCR_QOS_RELIABLE

572

)

573

574

print(f"Subscription created successfully on attempt {attempt + 1}")

575

return subscription

576

577

except oracledb.DatabaseError as e:

578

print(f"Subscription attempt {attempt + 1} failed: {e}")

579

if attempt == max_retries - 1:

580

raise

581

time.sleep(2) # Wait before retry

582

583

# Create resilient subscription

584

connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")

585

586

try:

587

subscription = create_resilient_subscription(connection)

588

589

# Register queries with error handling

590

try:

591

with connection.cursor() as cursor:

592

cursor.execute("SELECT * FROM employees WHERE department_id <= 50")

593

cursor.fetchall()

594

print("Query registered successfully")

595

except oracledb.DatabaseError as e:

596

print(f"Query registration failed: {e}")

597

598

# Test notifications

599

with connection.cursor() as cursor:

600

cursor.execute("UPDATE employees SET salary = salary + 1 WHERE ROWNUM <= 1")

601

connection.commit()

602

603

print("Waiting for notifications...")

604

time.sleep(5)

605

606

except oracledb.DatabaseError as e:

607

print(f"Subscription setup failed: {e}")

608

609

finally:

610

connection.close()

611

```