CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-web3-eth

Web3 module to interact with the Ethereum blockchain and smart contracts.

67

0.98x
Overview
Eval results
Files

event-monitoring.mddocs/

Event Monitoring & Subscriptions

The event monitoring functionality provides real-time blockchain event monitoring through WebSocket subscriptions and historical log querying capabilities.

Subscription Management

subscribe

Creates real-time subscriptions to blockchain events including new blocks, transactions, logs, and sync status.

subscribe(subscriptionName: 'logs' | 'newHeads' | 'newPendingTransactions' | 'pendingTransactions' | 'newBlockHeaders' | 'syncing', subscriptionOptions?: LogsInput | object): Promise<RegisteredSubscription>;

Parameters:

  • subscriptionName: Type of subscription to create
  • subscriptionOptions: Configuration options specific to subscription type

Usage Example:

// Subscribe to new block headers
const newHeadsSubscription = await eth.subscribe("newHeads");
// Or use alias: await eth.subscribe("newBlockHeaders");
newHeadsSubscription.on("data", (blockHeader) => {
  console.log("New block:", {
    number: blockHeader.number,
    hash: blockHeader.hash,
    timestamp: blockHeader.timestamp,
    gasUsed: blockHeader.gasUsed
  });
});

// Subscribe to new pending transactions
const pendingTxSubscription = await eth.subscribe("newPendingTransactions"); 
// Or use alias: await eth.subscribe("pendingTransactions");
pendingTxSubscription.on("data", (txHash) => {
  console.log("New pending transaction:", txHash);
});

// Subscribe to sync status changes
const syncSubscription = await eth.subscribe("syncing");
syncSubscription.on("data", (syncStatus) => {
  if (syncStatus !== false) {
    console.log(`Syncing: ${syncStatus.currentBlock}/${syncStatus.highestBlock}`);
  } else {
    console.log("Node is fully synced");
  }
});

Log Subscriptions

Contract Event Monitoring

Subscribe to specific contract events with filtering options.

// Logs subscription with filter options
interface LogsInput {
  address?: Address | Address[];
  topics?: (HexString32Bytes | HexString32Bytes[] | null)[];
  fromBlock?: BlockNumberOrTag;
  toBlock?: BlockNumberOrTag;
}

Usage Example:

// Subscribe to all events from a specific contract
const contractLogsSubscription = await eth.subscribe("logs", {
  address: "0x1234567890123456789012345678901234567890"
});

contractLogsSubscription.on("data", (log) => {
  console.log("Contract event:", {
    address: log.address,
    topics: log.topics,
    data: log.data,
    blockNumber: log.blockNumber,
    transactionHash: log.transactionHash
  });
});

// Subscribe to specific ERC20 Transfer events
const transferSubscription = await eth.subscribe("logs", {
  address: "0x1234567890123456789012345678901234567890", // token contract
  topics: [
    "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", // Transfer event signature
    null, // from address (any)
    null  // to address (any)
  ]
});

transferSubscription.on("data", (log) => {
  console.log("ERC20 Transfer:", {
    from: "0x" + log.topics[1].slice(26), // extract address from topic
    to: "0x" + log.topics[2].slice(26),   // extract address from topic
    transactionHash: log.transactionHash
  });
});

// Monitor specific address interactions
const addressLogsSubscription = await eth.subscribe("logs", {
  topics: [
    null, // any event
    "0x000000000000000000000000742d35cc6634c0532925a3b8d7389fc3c1b6c5e" // specific address as topic
  ]
});

Historical Log Queries

getPastLogs

Retrieves historical logs matching filter criteria.

getPastLogs(filter?: Filter, returnFormat?: DataFormat): Promise<LogsOutput>;

Usage Example:

// Get all Transfer events from the last 1000 blocks
const recentTransfers = await eth.getPastLogs({
  address: "0x1234567890123456789012345678901234567890",
  topics: [
    "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef" // Transfer event
  ],
  fromBlock: "latest",
  toBlock: -1000 // 1000 blocks ago
});

console.log(`Found ${recentTransfers.length} transfers`);

// Get logs from specific block range
const historicalLogs = await eth.getPastLogs({
  address: ["0x1234567890123456789012345678901234567890", "0x9876543210987654321098765432109876543210"],
  fromBlock: 15000000,
  toBlock: 15001000,
  topics: [
    ["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", // Transfer
     "0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925"], // Approval
    null, // from/spender
    "0x000000000000000000000000742d35cc6634c0532925a3b8d7389fc3c1b6c5e" // specific to address
  ]
});

Subscription Lifecycle Management

clearSubscriptions

Clears all active subscriptions.

clearSubscriptions(): Promise<boolean>;

Usage Example:

// Clean up all subscriptions
await eth.clearSubscriptions();
console.log("All subscriptions cleared");

Individual Subscription Management

// Manage individual subscriptions
const subscription = await eth.subscribe("newHeads");

// Listen for events
subscription.on("data", (data) => {
  console.log("New data:", data);
});

subscription.on("error", (error) => {
  console.error("Subscription error:", error);
});

subscription.on("connected", (subscriptionId) => {
  console.log("Subscription connected:", subscriptionId);  
});

// Unsubscribe when done
await subscription.unsubscribe();
console.log("Subscription ended");

Event Processing Patterns

Real-time Transaction Monitoring

// Monitor specific address transactions in real-time
async function monitorAddress(address: Address) {
  // Subscribe to new blocks to check for address activity
  const subscription = await eth.subscribe("newHeads");
  
  subscription.on("data", async (blockHeader) => {
    // Get full block with transactions
    const block = await eth.getBlock(blockHeader.number, true);
    
    // Filter transactions involving the address
    const relevantTxs = block.transactions.filter(tx => 
      tx.from === address || tx.to === address
    );
    
    if (relevantTxs.length > 0) {
      console.log(`Found ${relevantTxs.length} transactions for ${address} in block ${block.number}`);
      relevantTxs.forEach(tx => {
        console.log(`  ${tx.hash}: ${tx.from} -> ${tx.to} (${tx.value} wei)`);
      });
    }
  });
  
  return subscription;
}

Contract Event Processing

// Process and decode contract events
import { decodeFunctionCall } from "web3-eth-abi";

class ContractEventProcessor {
  constructor(private eth: Web3Eth, private contractAddress: Address) {}

  async subscribeToAllEvents() {
    const subscription = await this.eth.subscribe("logs", {
      address: this.contractAddress
    });

    subscription.on("data", (log) => {
      this.processEvent(log);
    });

    return subscription;
  }

  private processEvent(log: any) {
    // Decode common events
    switch (log.topics[0]) {
      case "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef":
        this.processTransferEvent(log);
        break;
      case "0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925":
        this.processApprovalEvent(log);
        break;
      default:
        console.log("Unknown event:", log.topics[0]);
    }
  }

  private processTransferEvent(log: any) {
    const from = "0x" + log.topics[1].slice(26);
    const to = "0x" + log.topics[2].slice(26);
    // Value would need to be decoded from log.data
    console.log(`Transfer: ${from} -> ${to}`);
  }

  private processApprovalEvent(log: any) {
    const owner = "0x" + log.topics[1].slice(26);
    const spender = "0x" + log.topics[2].slice(26);
    console.log(`Approval: ${owner} approved ${spender}`);
  }
}

Error Handling and Reconnection

// Robust subscription with error handling
async function createRobustSubscription(subscriptionType: string, options?: any) {
  let subscription: any;
  let reconnectAttempts = 0;
  const maxReconnects = 5;

  async function connect() {
    try {
      subscription = await eth.subscribe(subscriptionType, options);
      reconnectAttempts = 0;

      subscription.on("data", (data) => {
        console.log("Received data:", data);
      });

      subscription.on("error", async (error) => {
        console.error("Subscription error:", error);
        
        if (reconnectAttempts < maxReconnects) {
          console.log(`Attempting reconnection ${reconnectAttempts + 1}/${maxReconnects}`);
          reconnectAttempts++;
          
          // Wait before reconnecting
          await new Promise(resolve => setTimeout(resolve, 2000 * reconnectAttempts));
          await connect();
        } else {
          console.error("Max reconnection attempts reached");
        }
      });

      console.log("Subscription connected successfully");
    } catch (error) {
      console.error("Failed to create subscription:", error);
      throw error;
    }
  }

  await connect();
  return subscription;
}

Core Types

interface LogsInput {
  address?: Address | Address[];
  topics?: (HexString32Bytes | HexString32Bytes[] | null)[];
  fromBlock?: BlockNumberOrTag;
  toBlock?: BlockNumberOrTag;
}

interface Filter {
  address?: Address | Address[];
  topics?: (HexString32Bytes | HexString32Bytes[] | null)[];
  fromBlock?: BlockNumberOrTag;
  toBlock?: BlockNumberOrTag;
}

interface Log {
  address: Address;
  topics: HexString32Bytes[];
  data: HexString;
  blockNumber: Numbers;
  transactionHash: HexString32Bytes;
  transactionIndex: Numbers;
  blockHash: HexString32Bytes;
  logIndex: Numbers;
  removed: boolean;
}

type LogsOutput = Log[];

interface RegisteredSubscription {
  logs: typeof LogsSubscription;
  newPendingTransactions: typeof NewPendingTransactionsSubscription;
  pendingTransactions: typeof NewPendingTransactionsSubscription;
  newHeads: typeof NewHeadsSubscription;
  newBlockHeaders: typeof NewHeadsSubscription;
  syncing: typeof SyncingSubscription;
}

// Subscription classes
class LogsSubscription extends Web3Subscription<LogsInput, Log> {}
class NewPendingTransactionsSubscription extends Web3Subscription<object, HexString32Bytes> {}
class NewHeadsSubscription extends Web3Subscription<object, BlockHeader> {}
class SyncingSubscription extends Web3Subscription<object, SyncingStatusAPI | boolean> {}

Install with Tessl CLI

npx tessl i tessl/npm-web3-eth

docs

account-operations.md

blockchain-state.md

cryptographic-operations.md

event-monitoring.md

gas-fee-management.md

index.md

network-information.md

smart-contract-interaction.md

transaction-management.md

transaction-utilities.md

tile.json