0
# Event Monitoring & Subscriptions
1
2
The event monitoring functionality provides real-time blockchain event monitoring through WebSocket subscriptions and historical log querying capabilities.
3
4
## Subscription Management
5
6
### subscribe
7
8
Creates real-time subscriptions to blockchain events including new blocks, transactions, logs, and sync status.
9
10
```typescript { .api }
11
subscribe(subscriptionName: 'logs' | 'newHeads' | 'newPendingTransactions' | 'pendingTransactions' | 'newBlockHeaders' | 'syncing', subscriptionOptions?: LogsInput | object): Promise<RegisteredSubscription>;
12
```
13
14
**Parameters:**
15
- `subscriptionName`: Type of subscription to create
16
- `subscriptionOptions`: Configuration options specific to subscription type
17
18
**Usage Example:**
19
```typescript
20
// Subscribe to new block headers
21
const newHeadsSubscription = await eth.subscribe("newHeads");
22
// Or use alias: await eth.subscribe("newBlockHeaders");
23
newHeadsSubscription.on("data", (blockHeader) => {
24
console.log("New block:", {
25
number: blockHeader.number,
26
hash: blockHeader.hash,
27
timestamp: blockHeader.timestamp,
28
gasUsed: blockHeader.gasUsed
29
});
30
});
31
32
// Subscribe to new pending transactions
33
const pendingTxSubscription = await eth.subscribe("newPendingTransactions");
34
// Or use alias: await eth.subscribe("pendingTransactions");
35
pendingTxSubscription.on("data", (txHash) => {
36
console.log("New pending transaction:", txHash);
37
});
38
39
// Subscribe to sync status changes
40
const syncSubscription = await eth.subscribe("syncing");
41
syncSubscription.on("data", (syncStatus) => {
42
if (syncStatus !== false) {
43
console.log(`Syncing: ${syncStatus.currentBlock}/${syncStatus.highestBlock}`);
44
} else {
45
console.log("Node is fully synced");
46
}
47
});
48
```
49
50
## Log Subscriptions
51
52
### Contract Event Monitoring
53
54
Subscribe to specific contract events with filtering options.
55
56
```typescript { .api }
57
// Logs subscription with filter options
58
interface LogsInput {
59
address?: Address | Address[];
60
topics?: (HexString32Bytes | HexString32Bytes[] | null)[];
61
fromBlock?: BlockNumberOrTag;
62
toBlock?: BlockNumberOrTag;
63
}
64
```
65
66
**Usage Example:**
67
```typescript
68
// Subscribe to all events from a specific contract
69
const contractLogsSubscription = await eth.subscribe("logs", {
70
address: "0x1234567890123456789012345678901234567890"
71
});
72
73
contractLogsSubscription.on("data", (log) => {
74
console.log("Contract event:", {
75
address: log.address,
76
topics: log.topics,
77
data: log.data,
78
blockNumber: log.blockNumber,
79
transactionHash: log.transactionHash
80
});
81
});
82
83
// Subscribe to specific ERC20 Transfer events
84
const transferSubscription = await eth.subscribe("logs", {
85
address: "0x1234567890123456789012345678901234567890", // token contract
86
topics: [
87
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", // Transfer event signature
88
null, // from address (any)
89
null // to address (any)
90
]
91
});
92
93
transferSubscription.on("data", (log) => {
94
console.log("ERC20 Transfer:", {
95
from: "0x" + log.topics[1].slice(26), // extract address from topic
96
to: "0x" + log.topics[2].slice(26), // extract address from topic
97
transactionHash: log.transactionHash
98
});
99
});
100
101
// Monitor specific address interactions
102
const addressLogsSubscription = await eth.subscribe("logs", {
103
topics: [
104
null, // any event
105
"0x000000000000000000000000742d35cc6634c0532925a3b8d7389fc3c1b6c5e" // specific address as topic
106
]
107
});
108
```
109
110
## Historical Log Queries
111
112
### getPastLogs
113
114
Retrieves historical logs matching filter criteria.
115
116
```typescript { .api }
117
getPastLogs(filter?: Filter, returnFormat?: DataFormat): Promise<LogsOutput>;
118
```
119
120
**Usage Example:**
121
```typescript
122
// Get all Transfer events from the last 1000 blocks
123
const recentTransfers = await eth.getPastLogs({
124
address: "0x1234567890123456789012345678901234567890",
125
topics: [
126
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef" // Transfer event
127
],
128
fromBlock: "latest",
129
toBlock: -1000 // 1000 blocks ago
130
});
131
132
console.log(`Found ${recentTransfers.length} transfers`);
133
134
// Get logs from specific block range
135
const historicalLogs = await eth.getPastLogs({
136
address: ["0x1234567890123456789012345678901234567890", "0x9876543210987654321098765432109876543210"],
137
fromBlock: 15000000,
138
toBlock: 15001000,
139
topics: [
140
["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", // Transfer
141
"0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925"], // Approval
142
null, // from/spender
143
"0x000000000000000000000000742d35cc6634c0532925a3b8d7389fc3c1b6c5e" // specific to address
144
]
145
});
146
```
147
148
## Subscription Lifecycle Management
149
150
### clearSubscriptions
151
152
Clears all active subscriptions.
153
154
```typescript { .api }
155
clearSubscriptions(): Promise<boolean>;
156
```
157
158
**Usage Example:**
159
```typescript
160
// Clean up all subscriptions
161
await eth.clearSubscriptions();
162
console.log("All subscriptions cleared");
163
```
164
165
### Individual Subscription Management
166
167
```typescript
168
// Manage individual subscriptions
169
const subscription = await eth.subscribe("newHeads");
170
171
// Listen for events
172
subscription.on("data", (data) => {
173
console.log("New data:", data);
174
});
175
176
subscription.on("error", (error) => {
177
console.error("Subscription error:", error);
178
});
179
180
subscription.on("connected", (subscriptionId) => {
181
console.log("Subscription connected:", subscriptionId);
182
});
183
184
// Unsubscribe when done
185
await subscription.unsubscribe();
186
console.log("Subscription ended");
187
```
188
189
## Event Processing Patterns
190
191
### Real-time Transaction Monitoring
192
193
```typescript
194
// Monitor specific address transactions in real-time
195
async function monitorAddress(address: Address) {
196
// Subscribe to new blocks to check for address activity
197
const subscription = await eth.subscribe("newHeads");
198
199
subscription.on("data", async (blockHeader) => {
200
// Get full block with transactions
201
const block = await eth.getBlock(blockHeader.number, true);
202
203
// Filter transactions involving the address
204
const relevantTxs = block.transactions.filter(tx =>
205
tx.from === address || tx.to === address
206
);
207
208
if (relevantTxs.length > 0) {
209
console.log(`Found ${relevantTxs.length} transactions for ${address} in block ${block.number}`);
210
relevantTxs.forEach(tx => {
211
console.log(` ${tx.hash}: ${tx.from} -> ${tx.to} (${tx.value} wei)`);
212
});
213
}
214
});
215
216
return subscription;
217
}
218
```
219
220
### Contract Event Processing
221
222
```typescript
223
// Process and decode contract events
224
import { decodeFunctionCall } from "web3-eth-abi";
225
226
class ContractEventProcessor {
227
constructor(private eth: Web3Eth, private contractAddress: Address) {}
228
229
async subscribeToAllEvents() {
230
const subscription = await this.eth.subscribe("logs", {
231
address: this.contractAddress
232
});
233
234
subscription.on("data", (log) => {
235
this.processEvent(log);
236
});
237
238
return subscription;
239
}
240
241
private processEvent(log: any) {
242
// Decode common events
243
switch (log.topics[0]) {
244
case "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef":
245
this.processTransferEvent(log);
246
break;
247
case "0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925":
248
this.processApprovalEvent(log);
249
break;
250
default:
251
console.log("Unknown event:", log.topics[0]);
252
}
253
}
254
255
private processTransferEvent(log: any) {
256
const from = "0x" + log.topics[1].slice(26);
257
const to = "0x" + log.topics[2].slice(26);
258
// Value would need to be decoded from log.data
259
console.log(`Transfer: ${from} -> ${to}`);
260
}
261
262
private processApprovalEvent(log: any) {
263
const owner = "0x" + log.topics[1].slice(26);
264
const spender = "0x" + log.topics[2].slice(26);
265
console.log(`Approval: ${owner} approved ${spender}`);
266
}
267
}
268
```
269
270
## Error Handling and Reconnection
271
272
```typescript
273
// Robust subscription with error handling
274
async function createRobustSubscription(subscriptionType: string, options?: any) {
275
let subscription: any;
276
let reconnectAttempts = 0;
277
const maxReconnects = 5;
278
279
async function connect() {
280
try {
281
subscription = await eth.subscribe(subscriptionType, options);
282
reconnectAttempts = 0;
283
284
subscription.on("data", (data) => {
285
console.log("Received data:", data);
286
});
287
288
subscription.on("error", async (error) => {
289
console.error("Subscription error:", error);
290
291
if (reconnectAttempts < maxReconnects) {
292
console.log(`Attempting reconnection ${reconnectAttempts + 1}/${maxReconnects}`);
293
reconnectAttempts++;
294
295
// Wait before reconnecting
296
await new Promise(resolve => setTimeout(resolve, 2000 * reconnectAttempts));
297
await connect();
298
} else {
299
console.error("Max reconnection attempts reached");
300
}
301
});
302
303
console.log("Subscription connected successfully");
304
} catch (error) {
305
console.error("Failed to create subscription:", error);
306
throw error;
307
}
308
}
309
310
await connect();
311
return subscription;
312
}
313
```
314
315
## Core Types
316
317
```typescript { .api }
318
interface LogsInput {
319
address?: Address | Address[];
320
topics?: (HexString32Bytes | HexString32Bytes[] | null)[];
321
fromBlock?: BlockNumberOrTag;
322
toBlock?: BlockNumberOrTag;
323
}
324
325
interface Filter {
326
address?: Address | Address[];
327
topics?: (HexString32Bytes | HexString32Bytes[] | null)[];
328
fromBlock?: BlockNumberOrTag;
329
toBlock?: BlockNumberOrTag;
330
}
331
332
interface Log {
333
address: Address;
334
topics: HexString32Bytes[];
335
data: HexString;
336
blockNumber: Numbers;
337
transactionHash: HexString32Bytes;
338
transactionIndex: Numbers;
339
blockHash: HexString32Bytes;
340
logIndex: Numbers;
341
removed: boolean;
342
}
343
344
type LogsOutput = Log[];
345
346
interface RegisteredSubscription {
347
logs: typeof LogsSubscription;
348
newPendingTransactions: typeof NewPendingTransactionsSubscription;
349
pendingTransactions: typeof NewPendingTransactionsSubscription;
350
newHeads: typeof NewHeadsSubscription;
351
newBlockHeaders: typeof NewHeadsSubscription;
352
syncing: typeof SyncingSubscription;
353
}
354
355
// Subscription classes
356
class LogsSubscription extends Web3Subscription<LogsInput, Log> {}
357
class NewPendingTransactionsSubscription extends Web3Subscription<object, HexString32Bytes> {}
358
class NewHeadsSubscription extends Web3Subscription<object, BlockHeader> {}
359
class SyncingSubscription extends Web3Subscription<object, SyncingStatusAPI | boolean> {}
360
```