Web3 module to interact with the Ethereum blockchain and smart contracts.
67
The event monitoring functionality provides real-time blockchain event monitoring through WebSocket subscriptions and historical log querying capabilities.
Creates real-time subscriptions to blockchain events including new blocks, transactions, logs, and sync status.
subscribe(subscriptionName: 'logs' | 'newHeads' | 'newPendingTransactions' | 'pendingTransactions' | 'newBlockHeaders' | 'syncing', subscriptionOptions?: LogsInput | object): Promise<RegisteredSubscription>;Parameters:
subscriptionName: Type of subscription to createsubscriptionOptions: Configuration options specific to subscription typeUsage Example:
// Subscribe to new block headers
const newHeadsSubscription = await eth.subscribe("newHeads");
// Or use alias: await eth.subscribe("newBlockHeaders");
newHeadsSubscription.on("data", (blockHeader) => {
console.log("New block:", {
number: blockHeader.number,
hash: blockHeader.hash,
timestamp: blockHeader.timestamp,
gasUsed: blockHeader.gasUsed
});
});
// Subscribe to new pending transactions
const pendingTxSubscription = await eth.subscribe("newPendingTransactions");
// Or use alias: await eth.subscribe("pendingTransactions");
pendingTxSubscription.on("data", (txHash) => {
console.log("New pending transaction:", txHash);
});
// Subscribe to sync status changes
const syncSubscription = await eth.subscribe("syncing");
syncSubscription.on("data", (syncStatus) => {
if (syncStatus !== false) {
console.log(`Syncing: ${syncStatus.currentBlock}/${syncStatus.highestBlock}`);
} else {
console.log("Node is fully synced");
}
});Subscribe to specific contract events with filtering options.
// Logs subscription with filter options
interface LogsInput {
address?: Address | Address[];
topics?: (HexString32Bytes | HexString32Bytes[] | null)[];
fromBlock?: BlockNumberOrTag;
toBlock?: BlockNumberOrTag;
}Usage Example:
// Subscribe to all events from a specific contract
const contractLogsSubscription = await eth.subscribe("logs", {
address: "0x1234567890123456789012345678901234567890"
});
contractLogsSubscription.on("data", (log) => {
console.log("Contract event:", {
address: log.address,
topics: log.topics,
data: log.data,
blockNumber: log.blockNumber,
transactionHash: log.transactionHash
});
});
// Subscribe to specific ERC20 Transfer events
const transferSubscription = await eth.subscribe("logs", {
address: "0x1234567890123456789012345678901234567890", // token contract
topics: [
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", // Transfer event signature
null, // from address (any)
null // to address (any)
]
});
transferSubscription.on("data", (log) => {
console.log("ERC20 Transfer:", {
from: "0x" + log.topics[1].slice(26), // extract address from topic
to: "0x" + log.topics[2].slice(26), // extract address from topic
transactionHash: log.transactionHash
});
});
// Monitor specific address interactions
const addressLogsSubscription = await eth.subscribe("logs", {
topics: [
null, // any event
"0x000000000000000000000000742d35cc6634c0532925a3b8d7389fc3c1b6c5e" // specific address as topic
]
});Retrieves historical logs matching filter criteria.
getPastLogs(filter?: Filter, returnFormat?: DataFormat): Promise<LogsOutput>;Usage Example:
// Get all Transfer events from the last 1000 blocks
const recentTransfers = await eth.getPastLogs({
address: "0x1234567890123456789012345678901234567890",
topics: [
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef" // Transfer event
],
fromBlock: "latest",
toBlock: -1000 // 1000 blocks ago
});
console.log(`Found ${recentTransfers.length} transfers`);
// Get logs from specific block range
const historicalLogs = await eth.getPastLogs({
address: ["0x1234567890123456789012345678901234567890", "0x9876543210987654321098765432109876543210"],
fromBlock: 15000000,
toBlock: 15001000,
topics: [
["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", // Transfer
"0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925"], // Approval
null, // from/spender
"0x000000000000000000000000742d35cc6634c0532925a3b8d7389fc3c1b6c5e" // specific to address
]
});Clears all active subscriptions.
clearSubscriptions(): Promise<boolean>;Usage Example:
// Clean up all subscriptions
await eth.clearSubscriptions();
console.log("All subscriptions cleared");// Manage individual subscriptions
const subscription = await eth.subscribe("newHeads");
// Listen for events
subscription.on("data", (data) => {
console.log("New data:", data);
});
subscription.on("error", (error) => {
console.error("Subscription error:", error);
});
subscription.on("connected", (subscriptionId) => {
console.log("Subscription connected:", subscriptionId);
});
// Unsubscribe when done
await subscription.unsubscribe();
console.log("Subscription ended");// Monitor specific address transactions in real-time
async function monitorAddress(address: Address) {
// Subscribe to new blocks to check for address activity
const subscription = await eth.subscribe("newHeads");
subscription.on("data", async (blockHeader) => {
// Get full block with transactions
const block = await eth.getBlock(blockHeader.number, true);
// Filter transactions involving the address
const relevantTxs = block.transactions.filter(tx =>
tx.from === address || tx.to === address
);
if (relevantTxs.length > 0) {
console.log(`Found ${relevantTxs.length} transactions for ${address} in block ${block.number}`);
relevantTxs.forEach(tx => {
console.log(` ${tx.hash}: ${tx.from} -> ${tx.to} (${tx.value} wei)`);
});
}
});
return subscription;
}// Process and decode contract events
import { decodeFunctionCall } from "web3-eth-abi";
class ContractEventProcessor {
constructor(private eth: Web3Eth, private contractAddress: Address) {}
async subscribeToAllEvents() {
const subscription = await this.eth.subscribe("logs", {
address: this.contractAddress
});
subscription.on("data", (log) => {
this.processEvent(log);
});
return subscription;
}
private processEvent(log: any) {
// Decode common events
switch (log.topics[0]) {
case "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef":
this.processTransferEvent(log);
break;
case "0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925":
this.processApprovalEvent(log);
break;
default:
console.log("Unknown event:", log.topics[0]);
}
}
private processTransferEvent(log: any) {
const from = "0x" + log.topics[1].slice(26);
const to = "0x" + log.topics[2].slice(26);
// Value would need to be decoded from log.data
console.log(`Transfer: ${from} -> ${to}`);
}
private processApprovalEvent(log: any) {
const owner = "0x" + log.topics[1].slice(26);
const spender = "0x" + log.topics[2].slice(26);
console.log(`Approval: ${owner} approved ${spender}`);
}
}// Robust subscription with error handling
async function createRobustSubscription(subscriptionType: string, options?: any) {
let subscription: any;
let reconnectAttempts = 0;
const maxReconnects = 5;
async function connect() {
try {
subscription = await eth.subscribe(subscriptionType, options);
reconnectAttempts = 0;
subscription.on("data", (data) => {
console.log("Received data:", data);
});
subscription.on("error", async (error) => {
console.error("Subscription error:", error);
if (reconnectAttempts < maxReconnects) {
console.log(`Attempting reconnection ${reconnectAttempts + 1}/${maxReconnects}`);
reconnectAttempts++;
// Wait before reconnecting
await new Promise(resolve => setTimeout(resolve, 2000 * reconnectAttempts));
await connect();
} else {
console.error("Max reconnection attempts reached");
}
});
console.log("Subscription connected successfully");
} catch (error) {
console.error("Failed to create subscription:", error);
throw error;
}
}
await connect();
return subscription;
}interface LogsInput {
address?: Address | Address[];
topics?: (HexString32Bytes | HexString32Bytes[] | null)[];
fromBlock?: BlockNumberOrTag;
toBlock?: BlockNumberOrTag;
}
interface Filter {
address?: Address | Address[];
topics?: (HexString32Bytes | HexString32Bytes[] | null)[];
fromBlock?: BlockNumberOrTag;
toBlock?: BlockNumberOrTag;
}
interface Log {
address: Address;
topics: HexString32Bytes[];
data: HexString;
blockNumber: Numbers;
transactionHash: HexString32Bytes;
transactionIndex: Numbers;
blockHash: HexString32Bytes;
logIndex: Numbers;
removed: boolean;
}
type LogsOutput = Log[];
interface RegisteredSubscription {
logs: typeof LogsSubscription;
newPendingTransactions: typeof NewPendingTransactionsSubscription;
pendingTransactions: typeof NewPendingTransactionsSubscription;
newHeads: typeof NewHeadsSubscription;
newBlockHeaders: typeof NewHeadsSubscription;
syncing: typeof SyncingSubscription;
}
// Subscription classes
class LogsSubscription extends Web3Subscription<LogsInput, Log> {}
class NewPendingTransactionsSubscription extends Web3Subscription<object, HexString32Bytes> {}
class NewHeadsSubscription extends Web3Subscription<object, BlockHeader> {}
class SyncingSubscription extends Web3Subscription<object, SyncingStatusAPI | boolean> {}Install with Tessl CLI
npx tessl i tessl/npm-web3-ethdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10