A web development framework for Python that enables building fast, beautiful, and interactive web applications using reactive programming principles.
—
Session handling, user input/output management, and session utilities for managing application state and user interactions. Each user session provides isolated state and communication between client and server.
The main session object that represents a user's connection to the Shiny application.
class Session:
"""
Represents a user session in a Shiny application.
"""
def __init__(self, **kwargs: object) -> None: ...
def send_custom_message(
self,
type: str,
message: dict[str, object]
) -> None:
"""
Send a custom message to the client.
Args:
type: Message type identifier.
message: Message data to send.
"""
def send_input_message(
self,
id: str,
message: dict[str, object]
) -> None:
"""
Send a message to a specific input.
Args:
id: Input identifier.
message: Message data to send.
"""
def download(
self,
id: str,
filename: str,
media_type: str = "application/octet-stream"
) -> None:
"""
Trigger a file download.
Args:
id: Download handler identifier.
filename: Name of file to download.
media_type: MIME type of the file.
"""
@property
def client_data(self) -> ClientData:
"""Access to client-side data and information."""
@property
def user_info(self) -> dict[str, object] | None:
"""User information if available."""
@property
def groups(self) -> list[str] | None:
"""User groups if available."""def server(input: Inputs, output: Outputs, session: Session):
# Send custom messages to client JavaScript
@reactive.effect
@reactive.event(input.notify_button)
def send_notification():
session.send_custom_message(
"notification",
{
"message": "Processing complete!",
"type": "success",
"duration": 3000
}
)
# Trigger downloads
@reactive.effect
@reactive.event(input.download_trigger)
def trigger_download():
session.download(
"data_download",
filename=f"export_{datetime.now().strftime('%Y%m%d')}.csv",
media_type="text/csv"
)
# Send messages to specific inputs
@reactive.effect
def update_input_config():
if input.advanced_mode():
session.send_input_message(
"parameters",
{"enable_advanced": True, "show_tooltips": True}
)Provides access to user input values in a reactive context.
class Inputs:
"""
Reactive access to user input values.
"""
def __call__(self) -> dict[str, object]:
"""
Get all input values as a dictionary.
Returns:
Dictionary of all current input values.
"""
def __getattr__(self, name: str) -> Callable[[], object]:
"""
Get a specific input value as a reactive function.
Args:
name: Input identifier.
Returns:
Function that returns the current input value.
"""
def __contains__(self, name: str) -> bool:
"""
Check if an input exists.
Args:
name: Input identifier.
Returns:
True if input exists.
"""
def __iter__(self) -> Iterator[str]:
"""
Iterate over input names.
Returns:
Iterator of input identifiers.
"""def server(input: Inputs, output: Outputs, session: Session):
# Access individual inputs reactively
@output
@render.text
def greeting():
name = input.user_name() # Reactive - updates when input changes
return f"Hello, {name}!"
# Check if inputs exist
@reactive.calc
def processed_data():
if "optional_filter" in input:
filter_value = input.optional_filter()
return apply_filter(base_data(), filter_value)
return base_data()
# Get all inputs at once
@output
@render.text
def debug_info():
all_inputs = input() # Get all input values
return f"Current inputs: {all_inputs}"
# Iterate over inputs
@reactive.calc
def input_summary():
summary = {}
for input_id in input:
value = getattr(input, input_id)()
summary[input_id] = type(value).__name__
return summary
# Conditional logic based on inputs
@reactive.calc
def analysis_config():
config = {"base_analysis": True}
if "advanced_options" in input and input.advanced_options():
config["detailed_stats"] = True
config["confidence_intervals"] = True
if "experimental_features" in input:
config["experimental"] = input.experimental_features()
return configContainer for managing rendered outputs that get sent to the client.
class Outputs:
"""
Container for application outputs.
"""
def __setattr__(
self,
name: str,
value: OutputRenderer[object]
) -> None:
"""
Assign an output renderer to an output ID.
Args:
name: Output identifier.
value: Output renderer function.
"""
def __getattr__(self, name: str) -> OutputRenderer[object]:
"""
Get an output renderer by name.
Args:
name: Output identifier.
Returns:
Output renderer if it exists.
"""
def __contains__(self, name: str) -> bool:
"""
Check if an output exists.
Args:
name: Output identifier.
Returns:
True if output exists.
"""def server(input: Inputs, output: Outputs, session: Session):
# Standard output assignment using decorators
@output
@render.text
def status_message():
return "Application ready"
# Dynamic output assignment
def create_plot_output():
@render.plot
def dynamic_plot():
return generate_plot(input.plot_data())
return dynamic_plot
@reactive.effect
def setup_outputs():
if input.enable_plotting():
output.main_plot = create_plot_output()
# Conditional output rendering
@output
@render.ui
def conditional_output():
if "results" in output:
return ui.div(
ui.h3("Results Available"),
ui.output_table("results")
)
return ui.p("No results yet")
# Multiple outputs with shared computation
@reactive.calc
def analysis_results():
return perform_analysis(input.dataset())
@output
@render.table
def results_table():
results = analysis_results()
return results["summary_table"]
@output
@render.plot
def results_plot():
results = analysis_results()
return results["visualization"]Access to client-side information and browser data.
class ClientData:
"""
Access to client-side data and browser information.
"""
def __call__(self) -> dict[str, object]:
"""
Get all client data as a dictionary.
Returns:
Dictionary of client-side information.
"""
def __getattr__(self, name: str) -> Callable[[], object]:
"""
Get specific client data reactively.
Args:
name: Client data property name.
Returns:
Function that returns the client data value.
"""
@property
def url_protocol(self) -> str:
"""Protocol used (http or https)."""
@property
def url_hostname(self) -> str:
"""Hostname of the request."""
@property
def url_port(self) -> int | None:
"""Port number if specified."""
@property
def url_pathname(self) -> str:
"""URL pathname."""
@property
def url_search(self) -> str:
"""URL search parameters."""
@property
def pixelratio(self) -> float:
"""Device pixel ratio."""def server(input: Inputs, output: Outputs, session: Session):
# Access client information
@output
@render.text
def client_info():
client = session.client_data
return f"""
Browser Info:
- URL: {client.url_protocol}://{client.url_hostname}:{client.url_port}{client.url_pathname}
- Search: {client.url_search}
- Pixel Ratio: {client.pixelratio}
"""
# Responsive behavior based on client data
@reactive.calc
def plot_dimensions():
client = session.client_data
pixel_ratio = client.pixelratio
# Adjust plot size for high-DPI displays
base_width = 800
base_height = 600
return {
"width": base_width * pixel_ratio,
"height": base_height * pixel_ratio,
"dpi": 96 * pixel_ratio
}
@output
@render.plot
def responsive_plot():
dims = plot_dimensions()
fig = create_plot(input.data())
fig.set_size_inches(dims["width"]/dims["dpi"], dims["height"]/dims["dpi"])
return fig
# URL-based routing
@reactive.calc
def current_page():
pathname = session.client_data.url_pathname
if pathname.endswith("/analysis"):
return "analysis"
elif pathname.endswith("/data"):
return "data"
else:
return "home"
@output
@render.ui
def page_content():
page = current_page()
if page == "analysis":
return analysis_page_ui()
elif page == "data":
return data_page_ui()
else:
return home_page_ui()Utilities for managing session context and lifecycle.
def get_current_session() -> Session | None:
"""
Get the current session if running within a session context.
Returns:
Current session or None if not in session context.
"""
def require_active_session(what: str | None = None) -> Session:
"""
Require that there is an active session context.
Args:
what: Description of what requires an active session.
Returns:
Current session.
Raises:
RuntimeError: If no active session.
"""
def session_context(session: Session) -> ContextManager[None]:
"""
Create a session context manager.
Args:
session: Session to use as context.
Returns:
Context manager that sets the session context.
"""from shiny.session import get_current_session, require_active_session, session_context
# Utility functions that work with sessions
def send_status_update(message: str, status_type: str = "info"):
"""Send a status update to the current session."""
session = get_current_session()
if session:
session.send_custom_message("status_update", {
"message": message,
"type": status_type,
"timestamp": datetime.now().isoformat()
})
def log_user_action(action: str, details: dict[str, object] | None = None):
"""Log a user action with session context."""
session = require_active_session("log user action")
log_entry = {
"session_id": id(session),
"action": action,
"timestamp": datetime.now().isoformat(),
"details": details or {}
}
# Log to database or file
logger.info(f"User action: {log_entry}")
# Background task with session context
async def process_data_async(session: Session, data: pd.DataFrame):
"""Process data asynchronously with session context."""
with session_context(session):
# Send progress updates
send_status_update("Starting data processing...", "info")
# Simulate long-running task
for i in range(10):
await asyncio.sleep(1)
progress = (i + 1) * 10
send_status_update(f"Processing... {progress}% complete", "info")
# Final update
send_status_update("Data processing complete!", "success")
return processed_data
# Usage in server function
def server(input: Inputs, output: Outputs, session: Session):
@reactive.effect
@reactive.event(input.process_button)
def start_processing():
data = input.uploaded_data()
# Log the action
log_user_action("start_data_processing", {
"data_rows": len(data),
"data_columns": len(data.columns)
})
# Start async processing
asyncio.create_task(process_data_async(session, data))
@reactive.effect
def track_input_changes():
# This runs whenever any input changes
current_inputs = input()
log_user_action("input_changed", {
"changed_inputs": list(current_inputs.keys()),
"input_count": len(current_inputs)
})Hooks for managing session startup and cleanup.
def on_session_start(fn: Callable[[Session], None]) -> None:
"""
Register a callback for when sessions start.
Args:
fn: Callback function that takes a Session.
"""
def on_session_end(fn: Callable[[Session], None]) -> None:
"""
Register a callback for when sessions end.
Args:
fn: Callback function that takes a Session.
"""# Session lifecycle management
def initialize_user_session(session: Session):
"""Initialize user session with defaults."""
session_id = id(session)
# Initialize session data
session_data[session_id] = {
"start_time": datetime.now(),
"user_preferences": load_default_preferences(),
"temporary_files": []
}
# Send welcome message
session.send_custom_message("welcome", {
"message": "Welcome to the application!",
"session_id": session_id
})
logger.info(f"Session {session_id} started")
def cleanup_user_session(session: Session):
"""Clean up session resources."""
session_id = id(session)
# Clean up temporary files
if session_id in session_data:
temp_files = session_data[session_id].get("temporary_files", [])
for file_path in temp_files:
try:
os.remove(file_path)
except FileNotFoundError:
pass
# Calculate session duration
start_time = session_data[session_id]["start_time"]
duration = datetime.now() - start_time
logger.info(f"Session {session_id} ended after {duration}")
# Remove session data
del session_data[session_id]
# Register lifecycle callbacks
on_session_start(initialize_user_session)
on_session_end(cleanup_user_session)
# Global session tracking
session_data: dict[int, dict[str, object]] = {}
def server(input: Inputs, output: Outputs, session: Session):
session_id = id(session)
# Access session-specific data
@reactive.calc
def user_preferences():
return session_data.get(session_id, {}).get("user_preferences", {})
# Track temporary files
@reactive.effect
@reactive.event(input.generate_report)
def create_temp_report():
temp_file = f"/tmp/report_{session_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf"
# Generate report
generate_pdf_report(input.report_data(), temp_file)
# Track the file for cleanup
session_data[session_id]["temporary_files"].append(temp_file)
# Trigger download
session.download("report_download", temp_file)Install with Tessl CLI
npx tessl i tessl/pypi-shiny