CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-rqalpha

Comprehensive algorithmic trading framework for Python with backtesting, simulation, and live trading capabilities

Pending
Overview
Eval results
Files

framework-core.mddocs/

Framework Core

Strategy context, execution framework, and event system providing the foundation for strategy development and lifecycle management. The core framework handles strategy execution phases, event processing, global state management, and provides the runtime environment for trading strategies.

Capabilities

Strategy Context

The central context object available to all strategy functions, providing access to portfolio, accounts, market data, and system state.

class StrategyContext:
    """
    Strategy context object (exposed as 'context' in strategies).
    
    Properties:
    - portfolio (Portfolio): Main portfolio object with all accounts and positions
    - stock_account (Account): Stock trading account
    - future_account (Account): Futures trading account
    - bond_account (Account): Bond trading account
    - now (datetime): Current strategy execution time
    - universe (list[str]): Current strategy universe of instruments
    - run_info (RunInfo): Strategy run information and metadata
    - config (dict): Strategy configuration
    
    Available in strategy functions as global 'context' variable.
    """

Global Variables

Global variable namespace for user data persistence across strategy execution.

# Global variables namespace (available as 'g' in strategies)
g = GlobalVars()  # User namespace for storing custom data

# Usage in strategies:
def init(context):
    g.my_variable = "some_value"
    g.counters = {"buy": 0, "sell": 0}

def handle_bar(context, bar_dict):
    g.counters["buy"] += 1
    logger.info(f"My variable: {g.my_variable}")

Event System

Event publishing and subscription system for strategy and system events.

def subscribe_event(event_type, handler):
    """
    Subscribe to system events.

    Parameters:
    - event_type (EVENT): Event type to subscribe to
    - handler (callable): Event handler function

    Returns:
    None

    The handler function receives (context, event_data) parameters.
    """

class Event:
    """
    Event object for system messaging.
    
    Properties:
    - event_type (EVENT): Type of event
    - timestamp (datetime): Event timestamp
    - data (dict): Event-specific data
    """

class EventBus:
    """
    Event publishing and subscription system.
    
    Methods:
    - publish_event(event): Publish event to subscribers
    - add_listener(event_type, handler): Add event listener
    - remove_listener(event_type, handler): Remove event listener
    """

Strategy Execution Management

Core classes managing strategy lifecycle and execution phases.

class Strategy:
    """
    Strategy execution engine and lifecycle manager.
    
    Methods:
    - init(): Initialize strategy
    - before_trading(): Pre-market phase
    - open_auction(): Opening auction phase  
    - handle_bar(bar_dict): Process bar data
    - handle_tick(tick): Process tick data
    - after_trading(): Post-market phase
    - teardown(): Strategy cleanup
    """

class ExecutionContext:
    """
    Execution context and phase enforcement.
    
    Properties:
    - phase (EXECUTION_PHASE): Current execution phase
    - strategy_context (StrategyContext): Strategy context
    -
    Methods:
    - set_phase(phase): Set current execution phase
    - enforce_phase(allowed_phases): Enforce phase restrictions
    """

class Executor:
    """
    Strategy executor and orchestrator.
    
    Methods:
    - run(): Execute complete strategy
    - execute_phase(phase): Execute specific phase
    - handle_exception(exception): Handle strategy exceptions
    """

Strategy Loading

Strategy loading interfaces for different strategy sources.

class FileStrategyLoader:
    """
    Load strategy from Python file.
    
    Methods:
    - load(file_path): Load strategy from file path
    - get_strategy_functions(): Extract strategy functions
    """

class SourceCodeStrategyLoader:
    """
    Load strategy from source code string.
    
    Methods:
    - load(source_code): Load strategy from code string
    - compile_strategy(): Compile strategy code
    """

class UserFuncStrategyLoader:
    """
    Load strategy from user-defined functions.
    
    Methods:
    - load(user_funcs): Load from function dictionary
    - validate_functions(): Validate function signatures
    """

Run Information

Metadata and information about strategy execution.

class RunInfo:
    """
    Strategy run information and metadata.
    
    Properties:
    - start_date (date): Strategy start date
    - end_date (date): Strategy end date
    - frequency (str): Data frequency
    - run_type (RUN_TYPE): Execution mode
    - benchmark (str): Benchmark instrument
    - matching_type (MATCHING_TYPE): Order matching type
    - commission_multiplier (float): Commission multiplier
    - margin_multiplier (float): Margin multiplier
    - slippage (float): Slippage factor
    - accounts (dict): Account configurations
    """

Environment Singleton

Global environment singleton managing system state.

class Environment:
    """
    Global environment singleton for system state.
    
    Properties:
    - config (dict): System configuration
    - data_proxy (DataProxy): Data access layer
    - broker (AbstractBroker): Order execution broker
    - portfolio (Portfolio): Portfolio manager
    - event_bus (EventBus): Event system
    - phase (EXECUTION_PHASE): Current execution phase
    
    Methods:
    - get_instance(): Get singleton instance
    - set_instance(env): Set singleton instance
    """

Framework Usage Examples

Strategy Function Definition

# Strategy functions with framework integration
def init(context):
    """Strategy initialization - called once at start."""
    # Access context properties
    logger.info(f"Strategy start date: {context.run_info.start_date}")
    logger.info(f"Initial portfolio value: {context.portfolio.total_value}")
    
    # Set up global variables
    g.trade_count = 0
    g.last_rebalance = None
    
    # Configure universe
    context.stocks = ["000001.XSHE", "000002.XSHE"]
    update_universe(context.stocks)

def before_trading(context):
    """Pre-market phase - called daily before market open."""
    logger.info(f"Before trading on {context.now.date()}")
    
    # Daily preparation logic
    g.daily_signals = calculate_signals(context)
    
    # Check for rebalancing
    if should_rebalance(context):
        g.rebalance_today = True
    else:
        g.rebalance_today = False

def handle_bar(context, bar_dict):
    """Bar processing - called for each bar."""
    # Access current market data
    for stock_id, bar in bar_dict.items():
        current_price = bar.close
        
        # Strategy logic using global variables
        if g.rebalance_today and stock_id in g.daily_signals:
            signal = g.daily_signals[stock_id]
            target_weight = signal * 0.1  # 10% max per stock
            order_target_percent(stock_id, target_weight)
            g.trade_count += 1

def after_trading(context):
    """Post-market phase - called daily after market close."""
    logger.info(f"After trading on {context.now.date()}")
    logger.info(f"Portfolio value: {context.portfolio.total_value:.2f}")
    logger.info(f"Total trades today: {g.trade_count}")
    
    # Reset daily variables
    g.rebalance_today = False

Event System Usage

from rqalpha.apis import subscribe_event, EVENT

def init(context):
    # Subscribe to various events
    subscribe_event(EVENT.TRADE, on_trade)
    subscribe_event(EVENT.ORDER_CREATION_PASS, on_order_created)
    subscribe_event(EVENT.ORDER_CREATION_REJECT, on_order_rejected)
    subscribe_event(EVENT.SETTLEMENT, on_settlement)
    
    g.trade_history = []
    g.order_history = []

def on_trade(context, trade):
    """Handle trade execution events."""
    logger.info(f"Trade executed: {trade.instrument.symbol} "
                f"{trade.last_quantity} @ {trade.last_price}")
    
    # Store trade information
    trade_info = {
        "datetime": trade.datetime,
        "instrument": trade.order_book_id,
        "quantity": trade.last_quantity,
        "price": trade.last_price,
        "side": trade.side,
        "commission": trade.commission
    }
    g.trade_history.append(trade_info)

def on_order_created(context, order):
    """Handle successful order creation."""
    logger.info(f"Order created: {order.order_id} for {order.instrument.symbol}")
    
    order_info = {
        "order_id": order.order_id,
        "instrument": order.order_book_id,
        "quantity": order.quantity,
        "price": order.price,
        "status": order.status,
        "created_at": order.created_at
    }
    g.order_history.append(order_info)

def on_order_rejected(context, order):
    """Handle order rejection."""
    logger.warning(f"Order rejected: {order.order_id}")
    # Implement rejection handling logic

def on_settlement(context, settlement_data):
    """Handle daily settlement."""
    logger.info("Daily settlement completed")
    # Daily cleanup or reporting logic

Global Variables Management

def init(context):
    # Initialize global variables
    g.indicators = {}
    g.signals = {}
    g.position_history = []
    g.performance_metrics = {
        "max_drawdown": 0.0,
        "peak_value": 0.0,
        "win_rate": 0.0,
        "trades": []
    }

def handle_bar(context, bar_dict):
    # Update indicators using global variables
    for stock_id in context.universe:
        if stock_id not in g.indicators:
            g.indicators[stock_id] = {
                "sma_20": [],
                "sma_60": [],
                "rsi": []
            }
        
        # Calculate indicators
        price_history = history_bars(stock_id, 60, "1d", fields="close")
        sma_20 = price_history["close"][-20:].mean()
        sma_60 = price_history["close"][-60:].mean()
        
        # Store in global variables
        g.indicators[stock_id]["sma_20"].append(sma_20)
        g.indicators[stock_id]["sma_60"].append(sma_60)
        
        # Keep only recent values
        if len(g.indicators[stock_id]["sma_20"]) > 252:
            g.indicators[stock_id]["sma_20"] = g.indicators[stock_id]["sma_20"][-252:]
            g.indicators[stock_id]["sma_60"] = g.indicators[stock_id]["sma_60"][-252:]
        
        # Generate signals
        if sma_20 > sma_60:
            g.signals[stock_id] = 1  # Buy signal
        else:
            g.signals[stock_id] = -1  # Sell signal

def after_trading(context):
    # Update performance metrics in global variables
    current_value = context.portfolio.total_value
    
    if current_value > g.performance_metrics["peak_value"]:
        g.performance_metrics["peak_value"] = current_value
    
    drawdown = (g.performance_metrics["peak_value"] - current_value) / g.performance_metrics["peak_value"]
    if drawdown > g.performance_metrics["max_drawdown"]:
        g.performance_metrics["max_drawdown"] = drawdown
    
    # Store daily position snapshot
    position_snapshot = {
        "date": context.now.date(),
        "total_value": current_value,
        "cash": context.portfolio.cash,
        "positions": {pos.order_book_id: pos.quantity for pos in get_positions()}
    }
    g.position_history.append(position_snapshot)

Context Properties Access

def handle_bar(context, bar_dict):
    # Portfolio information
    logger.info(f"Portfolio total value: {context.portfolio.total_value}")
    logger.info(f"Available cash: {context.portfolio.cash}")
    logger.info(f"Market value: {context.portfolio.market_value}")
    
    # Account-specific information
    stock_cash = context.stock_account.cash
    stock_value = context.stock_account.total_value
    future_cash = context.future_account.cash
    
    logger.info(f"Stock account - Cash: {stock_cash}, Value: {stock_value}")
    logger.info(f"Future account cash: {future_cash}")
    
    # Current time and universe
    logger.info(f"Current time: {context.now}")
    logger.info(f"Universe size: {len(context.universe)}")
    
    # Run information
    logger.info(f"Frequency: {context.run_info.frequency}")
    logger.info(f"Benchmark: {context.run_info.benchmark}")
    
    # Configuration access
    start_date = context.config["base"]["start_date"]
    accounts = context.config["base"]["accounts"]
    logger.info(f"Strategy period: {start_date} to {context.run_info.end_date}")

Custom Helper Functions

def init(context):
    # Define helper functions in global scope
    g.calculate_rsi = create_rsi_calculator()
    g.calculate_bollinger = create_bollinger_calculator()
    g.risk_manager = RiskManager(context.portfolio.total_value)

def create_rsi_calculator():
    """Create RSI calculation function."""
    def calculate_rsi(prices, period=14):
        deltas = np.diff(prices)
        seed = deltas[:period+1]
        up = seed[seed >= 0].sum() / period
        down = -seed[seed < 0].sum() / period
        rs = up / down
        rsi = np.zeros_like(prices)
        rsi[:period] = 100. - 100. / (1. + rs)
        
        for i in range(period, len(prices)):
            delta = deltas[i-1]
            if delta > 0:
                upval = delta
                downval = 0.
            else:
                upval = 0.
                downval = -delta
            
            up = (up * (period - 1) + upval) / period
            down = (down * (period - 1) + downval) / period
            rs = up / down
            rsi[i] = 100. - 100. / (1. + rs)
        
        return rsi
    return calculate_rsi

class RiskManager:
    """Custom risk management class."""
    def __init__(self, initial_capital):
        self.initial_capital = initial_capital
        self.max_position_size = 0.1  # 10% max per position
        self.max_drawdown = 0.2  # 20% max drawdown
    
    def check_position_size(self, portfolio_value, position_value):
        """Check if position size exceeds limits."""
        position_weight = position_value / portfolio_value
        return position_weight <= self.max_position_size
    
    def check_drawdown(self, current_value, peak_value):
        """Check if drawdown exceeds limits."""
        drawdown = (peak_value - current_value) / peak_value
        return drawdown <= self.max_drawdown

def handle_bar(context, bar_dict):
    # Use custom helper functions
    for stock_id in context.universe:
        price_history = history_bars(stock_id, 30, "1d", fields="close")
        rsi = g.calculate_rsi(price_history["close"])[-1]
        
        # Risk management check
        portfolio_value = context.portfolio.total_value
        if stock_id in context.portfolio.positions:
            position_value = context.portfolio.positions[stock_id].market_value
            if not g.risk_manager.check_position_size(portfolio_value, position_value):
                logger.warning(f"Position size limit exceeded for {stock_id}")
                continue
        
        # Trading logic with RSI
        if rsi < 30:  # Oversold
            order_target_percent(stock_id, 0.05)  # 5% allocation
        elif rsi > 70:  # Overbought
            order_target_percent(stock_id, 0)  # Close position

Install with Tessl CLI

npx tessl i tessl/pypi-rqalpha

docs

cli-commands.md

constants-enums.md

data-access.md

data-models.md

framework-core.md

index.md

portfolio-management.md

strategy-execution.md

trading-api.md

tile.json