Comprehensive algorithmic trading framework for Python with backtesting, simulation, and live trading capabilities
—
Strategy context, execution framework, and event system providing the foundation for strategy development and lifecycle management. The core framework handles strategy execution phases, event processing, global state management, and provides the runtime environment for trading strategies.
The central context object available to all strategy functions, providing access to portfolio, accounts, market data, and system state.
class StrategyContext:
"""
Strategy context object (exposed as 'context' in strategies).
Properties:
- portfolio (Portfolio): Main portfolio object with all accounts and positions
- stock_account (Account): Stock trading account
- future_account (Account): Futures trading account
- bond_account (Account): Bond trading account
- now (datetime): Current strategy execution time
- universe (list[str]): Current strategy universe of instruments
- run_info (RunInfo): Strategy run information and metadata
- config (dict): Strategy configuration
Available in strategy functions as global 'context' variable.
"""Global variable namespace for user data persistence across strategy execution.
# Global variables namespace (available as 'g' in strategies)
g = GlobalVars() # User namespace for storing custom data
# Usage in strategies:
def init(context):
g.my_variable = "some_value"
g.counters = {"buy": 0, "sell": 0}
def handle_bar(context, bar_dict):
g.counters["buy"] += 1
logger.info(f"My variable: {g.my_variable}")Event publishing and subscription system for strategy and system events.
def subscribe_event(event_type, handler):
"""
Subscribe to system events.
Parameters:
- event_type (EVENT): Event type to subscribe to
- handler (callable): Event handler function
Returns:
None
The handler function receives (context, event_data) parameters.
"""
class Event:
"""
Event object for system messaging.
Properties:
- event_type (EVENT): Type of event
- timestamp (datetime): Event timestamp
- data (dict): Event-specific data
"""
class EventBus:
"""
Event publishing and subscription system.
Methods:
- publish_event(event): Publish event to subscribers
- add_listener(event_type, handler): Add event listener
- remove_listener(event_type, handler): Remove event listener
"""Core classes managing strategy lifecycle and execution phases.
class Strategy:
"""
Strategy execution engine and lifecycle manager.
Methods:
- init(): Initialize strategy
- before_trading(): Pre-market phase
- open_auction(): Opening auction phase
- handle_bar(bar_dict): Process bar data
- handle_tick(tick): Process tick data
- after_trading(): Post-market phase
- teardown(): Strategy cleanup
"""
class ExecutionContext:
"""
Execution context and phase enforcement.
Properties:
- phase (EXECUTION_PHASE): Current execution phase
- strategy_context (StrategyContext): Strategy context
-
Methods:
- set_phase(phase): Set current execution phase
- enforce_phase(allowed_phases): Enforce phase restrictions
"""
class Executor:
"""
Strategy executor and orchestrator.
Methods:
- run(): Execute complete strategy
- execute_phase(phase): Execute specific phase
- handle_exception(exception): Handle strategy exceptions
"""Strategy loading interfaces for different strategy sources.
class FileStrategyLoader:
"""
Load strategy from Python file.
Methods:
- load(file_path): Load strategy from file path
- get_strategy_functions(): Extract strategy functions
"""
class SourceCodeStrategyLoader:
"""
Load strategy from source code string.
Methods:
- load(source_code): Load strategy from code string
- compile_strategy(): Compile strategy code
"""
class UserFuncStrategyLoader:
"""
Load strategy from user-defined functions.
Methods:
- load(user_funcs): Load from function dictionary
- validate_functions(): Validate function signatures
"""Metadata and information about strategy execution.
class RunInfo:
"""
Strategy run information and metadata.
Properties:
- start_date (date): Strategy start date
- end_date (date): Strategy end date
- frequency (str): Data frequency
- run_type (RUN_TYPE): Execution mode
- benchmark (str): Benchmark instrument
- matching_type (MATCHING_TYPE): Order matching type
- commission_multiplier (float): Commission multiplier
- margin_multiplier (float): Margin multiplier
- slippage (float): Slippage factor
- accounts (dict): Account configurations
"""Global environment singleton managing system state.
class Environment:
"""
Global environment singleton for system state.
Properties:
- config (dict): System configuration
- data_proxy (DataProxy): Data access layer
- broker (AbstractBroker): Order execution broker
- portfolio (Portfolio): Portfolio manager
- event_bus (EventBus): Event system
- phase (EXECUTION_PHASE): Current execution phase
Methods:
- get_instance(): Get singleton instance
- set_instance(env): Set singleton instance
"""# Strategy functions with framework integration
def init(context):
"""Strategy initialization - called once at start."""
# Access context properties
logger.info(f"Strategy start date: {context.run_info.start_date}")
logger.info(f"Initial portfolio value: {context.portfolio.total_value}")
# Set up global variables
g.trade_count = 0
g.last_rebalance = None
# Configure universe
context.stocks = ["000001.XSHE", "000002.XSHE"]
update_universe(context.stocks)
def before_trading(context):
"""Pre-market phase - called daily before market open."""
logger.info(f"Before trading on {context.now.date()}")
# Daily preparation logic
g.daily_signals = calculate_signals(context)
# Check for rebalancing
if should_rebalance(context):
g.rebalance_today = True
else:
g.rebalance_today = False
def handle_bar(context, bar_dict):
"""Bar processing - called for each bar."""
# Access current market data
for stock_id, bar in bar_dict.items():
current_price = bar.close
# Strategy logic using global variables
if g.rebalance_today and stock_id in g.daily_signals:
signal = g.daily_signals[stock_id]
target_weight = signal * 0.1 # 10% max per stock
order_target_percent(stock_id, target_weight)
g.trade_count += 1
def after_trading(context):
"""Post-market phase - called daily after market close."""
logger.info(f"After trading on {context.now.date()}")
logger.info(f"Portfolio value: {context.portfolio.total_value:.2f}")
logger.info(f"Total trades today: {g.trade_count}")
# Reset daily variables
g.rebalance_today = Falsefrom rqalpha.apis import subscribe_event, EVENT
def init(context):
# Subscribe to various events
subscribe_event(EVENT.TRADE, on_trade)
subscribe_event(EVENT.ORDER_CREATION_PASS, on_order_created)
subscribe_event(EVENT.ORDER_CREATION_REJECT, on_order_rejected)
subscribe_event(EVENT.SETTLEMENT, on_settlement)
g.trade_history = []
g.order_history = []
def on_trade(context, trade):
"""Handle trade execution events."""
logger.info(f"Trade executed: {trade.instrument.symbol} "
f"{trade.last_quantity} @ {trade.last_price}")
# Store trade information
trade_info = {
"datetime": trade.datetime,
"instrument": trade.order_book_id,
"quantity": trade.last_quantity,
"price": trade.last_price,
"side": trade.side,
"commission": trade.commission
}
g.trade_history.append(trade_info)
def on_order_created(context, order):
"""Handle successful order creation."""
logger.info(f"Order created: {order.order_id} for {order.instrument.symbol}")
order_info = {
"order_id": order.order_id,
"instrument": order.order_book_id,
"quantity": order.quantity,
"price": order.price,
"status": order.status,
"created_at": order.created_at
}
g.order_history.append(order_info)
def on_order_rejected(context, order):
"""Handle order rejection."""
logger.warning(f"Order rejected: {order.order_id}")
# Implement rejection handling logic
def on_settlement(context, settlement_data):
"""Handle daily settlement."""
logger.info("Daily settlement completed")
# Daily cleanup or reporting logicdef init(context):
# Initialize global variables
g.indicators = {}
g.signals = {}
g.position_history = []
g.performance_metrics = {
"max_drawdown": 0.0,
"peak_value": 0.0,
"win_rate": 0.0,
"trades": []
}
def handle_bar(context, bar_dict):
# Update indicators using global variables
for stock_id in context.universe:
if stock_id not in g.indicators:
g.indicators[stock_id] = {
"sma_20": [],
"sma_60": [],
"rsi": []
}
# Calculate indicators
price_history = history_bars(stock_id, 60, "1d", fields="close")
sma_20 = price_history["close"][-20:].mean()
sma_60 = price_history["close"][-60:].mean()
# Store in global variables
g.indicators[stock_id]["sma_20"].append(sma_20)
g.indicators[stock_id]["sma_60"].append(sma_60)
# Keep only recent values
if len(g.indicators[stock_id]["sma_20"]) > 252:
g.indicators[stock_id]["sma_20"] = g.indicators[stock_id]["sma_20"][-252:]
g.indicators[stock_id]["sma_60"] = g.indicators[stock_id]["sma_60"][-252:]
# Generate signals
if sma_20 > sma_60:
g.signals[stock_id] = 1 # Buy signal
else:
g.signals[stock_id] = -1 # Sell signal
def after_trading(context):
# Update performance metrics in global variables
current_value = context.portfolio.total_value
if current_value > g.performance_metrics["peak_value"]:
g.performance_metrics["peak_value"] = current_value
drawdown = (g.performance_metrics["peak_value"] - current_value) / g.performance_metrics["peak_value"]
if drawdown > g.performance_metrics["max_drawdown"]:
g.performance_metrics["max_drawdown"] = drawdown
# Store daily position snapshot
position_snapshot = {
"date": context.now.date(),
"total_value": current_value,
"cash": context.portfolio.cash,
"positions": {pos.order_book_id: pos.quantity for pos in get_positions()}
}
g.position_history.append(position_snapshot)def handle_bar(context, bar_dict):
# Portfolio information
logger.info(f"Portfolio total value: {context.portfolio.total_value}")
logger.info(f"Available cash: {context.portfolio.cash}")
logger.info(f"Market value: {context.portfolio.market_value}")
# Account-specific information
stock_cash = context.stock_account.cash
stock_value = context.stock_account.total_value
future_cash = context.future_account.cash
logger.info(f"Stock account - Cash: {stock_cash}, Value: {stock_value}")
logger.info(f"Future account cash: {future_cash}")
# Current time and universe
logger.info(f"Current time: {context.now}")
logger.info(f"Universe size: {len(context.universe)}")
# Run information
logger.info(f"Frequency: {context.run_info.frequency}")
logger.info(f"Benchmark: {context.run_info.benchmark}")
# Configuration access
start_date = context.config["base"]["start_date"]
accounts = context.config["base"]["accounts"]
logger.info(f"Strategy period: {start_date} to {context.run_info.end_date}")def init(context):
# Define helper functions in global scope
g.calculate_rsi = create_rsi_calculator()
g.calculate_bollinger = create_bollinger_calculator()
g.risk_manager = RiskManager(context.portfolio.total_value)
def create_rsi_calculator():
"""Create RSI calculation function."""
def calculate_rsi(prices, period=14):
deltas = np.diff(prices)
seed = deltas[:period+1]
up = seed[seed >= 0].sum() / period
down = -seed[seed < 0].sum() / period
rs = up / down
rsi = np.zeros_like(prices)
rsi[:period] = 100. - 100. / (1. + rs)
for i in range(period, len(prices)):
delta = deltas[i-1]
if delta > 0:
upval = delta
downval = 0.
else:
upval = 0.
downval = -delta
up = (up * (period - 1) + upval) / period
down = (down * (period - 1) + downval) / period
rs = up / down
rsi[i] = 100. - 100. / (1. + rs)
return rsi
return calculate_rsi
class RiskManager:
"""Custom risk management class."""
def __init__(self, initial_capital):
self.initial_capital = initial_capital
self.max_position_size = 0.1 # 10% max per position
self.max_drawdown = 0.2 # 20% max drawdown
def check_position_size(self, portfolio_value, position_value):
"""Check if position size exceeds limits."""
position_weight = position_value / portfolio_value
return position_weight <= self.max_position_size
def check_drawdown(self, current_value, peak_value):
"""Check if drawdown exceeds limits."""
drawdown = (peak_value - current_value) / peak_value
return drawdown <= self.max_drawdown
def handle_bar(context, bar_dict):
# Use custom helper functions
for stock_id in context.universe:
price_history = history_bars(stock_id, 30, "1d", fields="close")
rsi = g.calculate_rsi(price_history["close"])[-1]
# Risk management check
portfolio_value = context.portfolio.total_value
if stock_id in context.portfolio.positions:
position_value = context.portfolio.positions[stock_id].market_value
if not g.risk_manager.check_position_size(portfolio_value, position_value):
logger.warning(f"Position size limit exceeded for {stock_id}")
continue
# Trading logic with RSI
if rsi < 30: # Oversold
order_target_percent(stock_id, 0.05) # 5% allocation
elif rsi > 70: # Overbought
order_target_percent(stock_id, 0) # Close positionInstall with Tessl CLI
npx tessl i tessl/pypi-rqalpha