CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-web3-core

Core tools and utilities for the web3.js ecosystem, providing foundational layer functionality for blockchain interactions.

Pending
Overview
Eval results
Files

subscription-management.mddocs/

Subscription Management

The Web3 subscription management system provides real-time WebSocket connectivity for blockchain events with automatic reconnection, event handling, and comprehensive subscription lifecycle management. It enables applications to receive live updates for blockchain changes.

Capabilities

Web3SubscriptionManager Class

Central manager for handling multiple WebSocket subscriptions with lifecycle management and event coordination.

/**
 * Manages WebSocket subscriptions to blockchain events with lifecycle management
 * @template API - JSON-RPC API specification type
 * @template RegisteredSubs - Registered subscription constructor types
 */
class Web3SubscriptionManager<
  API extends Web3APISpec = Web3APISpec, 
  RegisteredSubs extends {[key: string]: Web3SubscriptionConstructor} = {[key: string]: Web3SubscriptionConstructor}
> {
  constructor(
    requestManager: Web3RequestManager<API>, 
    registeredSubscriptions: RegisteredSubs, 
    tolerateUnlinkedSubscription?: boolean
  );
  
  // Core properties
  readonly requestManager: Web3RequestManager<API>;
  readonly registeredSubscriptions: RegisteredSubs;
  readonly subscriptions: Map<string, InstanceType<RegisteredSubs[keyof RegisteredSubs]>>;
  
  // Subscription management
  subscribe<T extends keyof RegisteredSubs>(
    name: T, 
    args?: ConstructorParameters<RegisteredSubs[T]>[0], 
    returnFormat?: DataFormat
  ): Promise<InstanceType<RegisteredSubs[T]>>;
  
  addSubscription(
    sub: InstanceType<RegisteredSubs[keyof RegisteredSubs]>
  ): Promise<string>;
  
  removeSubscription(
    sub: InstanceType<RegisteredSubs[keyof RegisteredSubs]>
  ): Promise<string>;
  
  unsubscribe(condition?: ShouldUnsubscribeCondition): Promise<string[]>;
  
  clear(): void;
  
  // Capability detection
  supportsSubscriptions(): boolean;
}

Usage Examples:

import { Web3SubscriptionManager, Web3RequestManager } from "web3-core";
import { EthExecutionAPI } from "web3-types";

// Define subscription types
interface MySubscriptions {
  newBlockHeaders: typeof NewBlockHeadersSubscription;
  pendingTransactions: typeof PendingTransactionsSubscription;
  logs: typeof LogsSubscription;
}

// Create WebSocket request manager
const wsRequestManager = new Web3RequestManager<EthExecutionAPI>(
  "wss://eth-mainnet.ws.alchemyapi.io/v2/your-api-key"
);

// Create subscription manager
const subscriptionManager = new Web3SubscriptionManager(
  wsRequestManager,
  {
    newBlockHeaders: NewBlockHeadersSubscription,
    pendingTransactions: PendingTransactionsSubscription,
    logs: LogsSubscription
  } as MySubscriptions
);

// Check subscription support
if (subscriptionManager.supportsSubscriptions()) {
  console.log("WebSocket provider supports subscriptions");
  
  // Subscribe to new block headers
  const blockSubscription = await subscriptionManager.subscribe("newBlockHeaders");
  
  blockSubscription.on("data", (blockHeader) => {
    console.log("New block:", blockHeader.number, blockHeader.hash);
  });
  
  blockSubscription.on("error", (error) => {
    console.error("Block subscription error:", error);
  });
} else {
  console.log("Current provider does not support subscriptions");
}

Web3Subscription Base Class

Abstract base class for implementing specific subscription types with event handling and lifecycle management.

/**
 * Abstract base class for WebSocket subscriptions with event emission
 * @template EventMap - Event type mapping for this subscription
 * @template ArgsType - Constructor arguments type
 * @template API - JSON-RPC API specification type
 */
abstract class Web3Subscription<
  EventMap extends {[key: string]: unknown} = {[key: string]: unknown}, 
  ArgsType = unknown, 
  API extends Web3APISpec = Web3APISpec
> extends Web3EventEmitter<EventMap & {
  data: any;
  changed: any;
  error: Error;
  connected: string;
  disconnected: string | Error;
}> {
  constructor(
    args: ArgsType, 
    options: {
      subscriptionManager: Web3SubscriptionManager<API, any>;
      returnFormat?: DataFormat;
    } | {
      requestManager: Web3RequestManager<API>;
      returnFormat?: DataFormat;
    }
  );
  
  // Core properties
  readonly id?: HexString;
  readonly args: ArgsType;
  lastBlock?: BlockOutput;
  readonly returnFormat: DataFormat;
  
  // Subscription lifecycle
  subscribe(): Promise<string>;
  unsubscribe(): Promise<void>;
  resubscribe(): Promise<void>;
  
  // Internal methods (implement in subclasses)
  protected abstract sendSubscriptionRequest(): Promise<string>;
  protected abstract sendUnsubscribeRequest(): Promise<void>;
  protected abstract processSubscriptionData(
    data: JsonRpcSubscriptionResult | JsonRpcSubscriptionResultOld<Log> | JsonRpcNotification<Log>
  ): void;
  protected abstract formatSubscriptionResult(data: any): any;
}

Usage Examples:

// Example custom subscription implementation
class CustomEventSubscription extends Web3Subscription<{
  data: CustomEvent;
  error: Error;
}> {
  constructor(
    args: { address: string; topics: string[] },
    options: { subscriptionManager: Web3SubscriptionManager }
  ) {
    super(args, options);
  }
  
  protected async sendSubscriptionRequest(): Promise<string> {
    return this.requestManager.send({
      method: "eth_subscribe",
      params: ["logs", {
        address: this.args.address,
        topics: this.args.topics
      }]
    });
  }
  
  protected async sendUnsubscribeRequest(): Promise<void> {
    if (this.id) {
      await this.requestManager.send({
        method: "eth_unsubscribe",
        params: [this.id]
      });
    }
  }
  
  protected processSubscriptionData(data: JsonRpcSubscriptionResult): void {
    const formattedData = this.formatSubscriptionResult(data.params.result);
    this.emit("data", formattedData);
  }
  
  protected formatSubscriptionResult(data: any): CustomEvent {
    // Format the raw data into CustomEvent format
    return {
      address: data.address,
      topics: data.topics,
      data: data.data,
      blockNumber: data.blockNumber,
      transactionHash: data.transactionHash
    };
  }
}

// Use custom subscription
const customSub = new CustomEventSubscription(
  { 
    address: "0x742d35Cc6634C0532925a3b8D0d3",
    topics: ["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"]
  },
  { subscriptionManager }
);

await customSub.subscribe();

customSub.on("data", (event) => {
  console.log("Custom event received:", event);
});

Subscription Lifecycle Management

Methods for managing subscription states and handling connection events.

/**
 * Subscribe to blockchain events
 * @returns Promise resolving to subscription ID
 */
subscribe(): Promise<string>;

/**
 * Unsubscribe from blockchain events
 * @returns Promise resolving when unsubscribed
 */
unsubscribe(): Promise<void>;

/**
 * Resubscribe to blockchain events (useful after connection loss)
 * @returns Promise resolving when resubscribed
 */
resubscribe(): Promise<void>;

/**
 * Add existing subscription instance to manager
 * @param sub - Subscription instance to add
 * @returns Promise resolving to subscription ID
 */
addSubscription(sub: InstanceType<RegisteredSubs[keyof RegisteredSubs]>): Promise<string>;

/**
 * Remove subscription instance from manager
 * @param sub - Subscription instance to remove
 * @returns Promise resolving to removed subscription ID
 */
removeSubscription(sub: InstanceType<RegisteredSubs[keyof RegisteredSubs]>): Promise<string>;

/**
 * Condition function type for selective unsubscription
 */
type ShouldUnsubscribeCondition = (subscription: Web3Subscription) => boolean;

/**
 * Unsubscribe multiple subscriptions based on condition
 * @param condition - Optional filter function for selective unsubscription
 * @returns Promise resolving to array of unsubscribed subscription IDs
 */
unsubscribe(condition?: ShouldUnsubscribeCondition): Promise<string[]>;

/**
 * Clear all subscriptions without sending unsubscribe requests
 */
clear(): void;

Usage Examples:

// Basic subscription lifecycle
const blockSub = await subscriptionManager.subscribe("newBlockHeaders");
console.log("Subscribed with ID:", blockSub.id);

// Later unsubscribe
await blockSub.unsubscribe();
console.log("Unsubscribed");

// Resubscribe after connection issues
await blockSub.resubscribe();
console.log("Resubscribed with new ID:", blockSub.id);

// Selective unsubscription
await subscriptionManager.unsubscribe((sub) => {
  // Unsubscribe all subscriptions older than 1 hour
  const oneHourAgo = Date.now() - 60 * 60 * 1000;
  return sub.createdAt < oneHourAgo;
});

// Clear all subscriptions (emergency cleanup)
subscriptionManager.clear();

Subscription Events

Event system for handling subscription data, errors, and connection states.

/**
 * Standard subscription events
 */
interface SubscriptionEventMap {
  /**
   * Emitted when new data is received
   */
  data: any;
  
  /**
   * Emitted when subscription data changes
   */
  changed: any;
  
  /**
   * Emitted when subscription encounters an error
   */
  error: Error;
  
  /**
   * Emitted when subscription successfully connects
   */
  connected: string;
  
  /**
   * Emitted when subscription disconnects
   */
  disconnected: string | Error;
}

Usage Examples:

// Handle all subscription events
const subscription = await subscriptionManager.subscribe("newBlockHeaders");

subscription.on("data", (blockHeader) => {
  console.log("New block header:", {
    number: blockHeader.number,
    hash: blockHeader.hash,
    timestamp: blockHeader.timestamp,
    parentHash: blockHeader.parentHash
  });
});

subscription.on("changed", (changedData) => {
  console.log("Subscription data changed:", changedData);
});

subscription.on("error", (error) => {
  console.error("Subscription error:", error.message);
  
  // Attempt to resubscribe on error
  setTimeout(async () => {
    try {
      await subscription.resubscribe();
      console.log("Successfully resubscribed after error");
    } catch (resubError) {
      console.error("Failed to resubscribe:", resubError);
    }
  }, 5000);
});

subscription.on("connected", (subscriptionId) => {
  console.log("Subscription connected with ID:", subscriptionId);
});

subscription.on("disconnected", (reason) => {
  if (reason instanceof Error) {
    console.error("Subscription disconnected due to error:", reason.message);
  } else {
    console.log("Subscription disconnected with ID:", reason);
  }
});

Provider Capability Detection

Methods for detecting WebSocket subscription support in providers.

/**
 * Check if current provider supports subscriptions
 * @returns Boolean indicating subscription support
 */
supportsSubscriptions(): boolean;

Usage Examples:

// Check provider capabilities before subscribing
if (subscriptionManager.supportsSubscriptions()) {
  // Provider supports subscriptions - proceed with WebSocket subscriptions
  const logsSubscription = await subscriptionManager.subscribe("logs", {
    address: "0x742d35Cc6634C0532925a3b8D0d3",
    topics: ["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"]
  });
  
  logsSubscription.on("data", (log) => {
    console.log("Token transfer:", log);
  });
} else {
  // Fall back to polling mechanism
  console.log("Provider doesn't support subscriptions, using polling...");
  
  const pollForBlocks = async () => {
    try {
      const latestBlock = await subscriptionManager.requestManager.send({
        method: "eth_getBlockByNumber",
        params: ["latest", false]
      });
      console.log("Latest block (polling):", latestBlock.number);
    } catch (error) {
      console.error("Polling error:", error);
    }
  };
  
  // Poll every 12 seconds (average block time)
  setInterval(pollForBlocks, 12000);
}

Subscription Constructor Types

Type definitions for subscription constructors and registration.

/**
 * Constructor type for Web3 subscription classes
 */
type Web3SubscriptionConstructor<
  EventMap = {[key: string]: unknown},
  ArgsType = unknown
> = new (
  args: ArgsType,
  options: {
    subscriptionManager?: Web3SubscriptionManager;
    requestManager?: Web3RequestManager;
    returnFormat?: DataFormat;
  }
) => Web3Subscription<EventMap, ArgsType>;

Usage Examples:

// Define subscription registry with proper typing
interface EthereumSubscriptions {
  newBlockHeaders: Web3SubscriptionConstructor<
    { data: BlockHeader }, 
    undefined
  >;
  logs: Web3SubscriptionConstructor<
    { data: Log }, 
    { address?: string; topics?: string[] }
  >;
  pendingTransactions: Web3SubscriptionConstructor<
    { data: string }, 
    undefined
  >;
  syncing: Web3SubscriptionConstructor<
    { data: SyncingStatus }, 
    undefined
  >;
}

// Create typed subscription manager
const typedSubscriptionManager = new Web3SubscriptionManager<
  EthExecutionAPI, 
  EthereumSubscriptions
>(
  wsRequestManager,
  {
    newBlockHeaders: NewBlockHeadersSubscription,
    logs: LogsSubscription,
    pendingTransactions: PendingTransactionsSubscription,
    syncing: SyncingSubscription
  }
);

// TypeScript will provide proper type checking and autocomplete
const logsub = await typedSubscriptionManager.subscribe("logs", {
  address: "0x742d35Cc6634C0532925a3b8D0d3",
  topics: ["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"]
});

Install with Tessl CLI

npx tessl i tessl/npm-web3-core

docs

batch-processing.md

configuration-management.md

context-management.md

event-system.md

index.md

provider-integration.md

request-management.md

subscription-management.md

tile.json