A web development framework for Python that enables building fast, beautiful, and interactive web applications using reactive programming principles.
—
Shiny's reactive programming system automatically tracks dependencies and updates outputs when inputs change. The reactive system includes reactive values, calculations, effects, and event handling that form the core of Shiny's reactivity model.
Reactive values are the foundation of Shiny's reactive system, storing data that can trigger updates when changed.
def reactive.value(value: T) -> Value[T]:
"""
Create a reactive value.
Args:
value: Initial value of any type.
Returns:
Value object that can be read and written reactively.
"""
class reactive.Value[T]:
"""
A reactive value that can be read and written.
"""
def __call__(self) -> T:
"""Get the current value."""
def set(self, value: T) -> None:
"""Set a new value, triggering reactive updates."""
def freeze(self) -> T:
"""Get value without establishing reactive dependency."""
def is_set(self) -> bool:
"""Check if value has been set."""from shiny import reactive
# Create reactive values
counter = reactive.value(0)
user_name = reactive.value("")
selected_data = reactive.value(None)
# Reading reactive values (establishes dependency)
current_count = counter() # Will trigger updates when counter changes
name = user_name() # Will trigger updates when user_name changes
# Setting reactive values (triggers dependent updates)
counter.set(counter() + 1) # Increment counter
user_name.set("Alice") # Update name
selected_data.set(df.iloc[0:10]) # Update data selection
# Non-reactive reading (no dependency established)
current_count_frozen = counter.freeze()
# Check if value has been set
if selected_data.is_set():
process_data(selected_data())Reactive calculations are cached computations that automatically update when their dependencies change.
def reactive.calc(fn: Callable[[], T]) -> Calc[T]:
"""
Create a reactive calculation.
Args:
fn: Function that computes the value. Should not have side effects.
Returns:
Calc object that caches the computed value.
"""
class reactive.Calc[T]:
"""
A reactive calculation that caches computed values.
"""
def __call__(self) -> T:
"""Get the computed value, recalculating if dependencies changed."""
def invalidate(self) -> None:
"""Invalidate the cached value, forcing recalculation."""# Create reactive calculations
@reactive.calc
def filtered_data():
df = input.dataset()
filter_val = input.filter_value()
return df[df['column'] > filter_val] # Automatically updates when inputs change
@reactive.calc
def summary_stats():
data = filtered_data() # Depends on filtered_data calculation
return {
'mean': data['value'].mean(),
'std': data['value'].std(),
'count': len(data)
}
# Use calculations in outputs
@output
@render.text
def stats_display():
stats = summary_stats() # Will update when underlying data changes
return f"Mean: {stats['mean']:.2f}, Count: {stats['count']}"
# Alternative syntax without decorator
processed_data = reactive.calc(lambda: expensive_computation(input.data()))Reactive effects perform side effects when their dependencies change, but don't return values.
def reactive.effect(fn: Callable[[], None]) -> Effect:
"""
Create a reactive effect.
Args:
fn: Function that performs side effects. Should not return a value.
Returns:
Effect object for managing the effect.
"""
class reactive.Effect:
"""
A reactive effect that performs side effects.
"""
def destroy(self) -> None:
"""Stop the effect from running."""
def invalidate(self) -> None:
"""Invalidate the effect, causing it to re-run."""# Create reactive effects
@reactive.effect
def update_database():
# Runs whenever selected_data changes
data = selected_data()
if data is not None:
database.save_selection(data)
@reactive.effect
def log_user_activity():
# Log when user changes inputs
current_page = input.page()
user_id = session.get_user_id()
logger.info(f"User {user_id} viewed page {current_page}")
# Manual effect creation
def cleanup_temp_files():
temp_dir = input.temp_directory()
if temp_dir and os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
cleanup_effect = reactive.effect(cleanup_temp_files)
# Destroy effect when no longer needed
# cleanup_effect.destroy()Event-based reactivity for controlling when reactive code should run.
def reactive.event(
*args: object,
ignore_none: bool = True,
ignore_init: bool = False
) -> Callable[[Callable[[], T]], Callable[[], T]]:
"""
Decorator for event-driven reactivity.
Args:
*args: Event sources (typically input values or reactive values).
ignore_none: Don't trigger on None values.
ignore_init: Don't trigger on initial evaluation.
Returns:
Decorator function.
"""# Event-driven calculations
@reactive.calc
@reactive.event(input.update_button)
def expensive_analysis():
# Only runs when update button is clicked, not when data changes
return perform_complex_analysis(input.dataset())
@output
@render.plot
@reactive.event(input.plot_button, ignore_init=True)
def generate_plot():
# Only generates plot when button is clicked
data = input.data()
return create_visualization(data)
# Multiple event sources
@reactive.calc
@reactive.event(input.refresh_btn, input.auto_refresh)
def updated_data():
# Runs when either refresh button clicked OR auto_refresh changes
return fetch_latest_data()
# Event-driven effects
@reactive.effect
@reactive.event(input.save_button)
def save_data():
# Save only when save button is clicked
current_data = get_current_state()
database.save(current_data)Long-running asynchronous tasks that can be monitored and controlled.
class reactive.ExtendedTask:
"""
A task that can run for an extended period with status tracking.
"""
def __init__(self, func: Callable[..., Awaitable[T]]): ...
def __call__(self, *args: object, **kwargs: object) -> ExtendedTask[T]: ...
def cancel(self) -> None:
"""Cancel the running task."""
def result(self) -> T:
"""Get the task result (blocks until complete)."""
def status(self) -> Literal["initial", "running", "success", "error", "cancelled"]:
"""Get current task status."""
def reactive.extended_task(
func: Callable[..., Awaitable[T]]
) -> ExtendedTask[T]:
"""
Decorator to create an extended task.
Args:
func: Async function to run as extended task.
Returns:
ExtendedTask wrapper.
"""import asyncio
from shiny import reactive
# Define extended task
@reactive.extended_task
async def process_large_dataset(dataset_path, parameters):
# Long-running data processing
await asyncio.sleep(1) # Simulate work
with open(dataset_path, 'r') as f:
data = load_and_process(f, parameters)
# More processing...
await asyncio.sleep(2)
return analysis_results
# Use in server function
def server(input: Inputs, output: Outputs, session: Session):
@reactive.effect
@reactive.event(input.start_processing)
def start_task():
# Start the extended task
process_large_dataset(
input.dataset_file(),
input.processing_params()
)
@output
@render.text
def task_status():
# Show current status
status = process_large_dataset.status()
if status == "running":
return "Processing... Please wait."
elif status == "success":
return "Processing complete!"
elif status == "error":
return "An error occurred during processing."
else:
return "Ready to start processing."
@output
@render.table
def results():
# Show results when task completes
if process_large_dataset.status() == "success":
return process_large_dataset.result()
return None
@reactive.effect
@reactive.event(input.cancel_button)
def cancel_task():
process_large_dataset.cancel()Utilities for time-based reactive updates and polling.
def reactive.invalidate_later(
delay: float,
session: Session | None = None
) -> None:
"""
Schedule reactive invalidation after a delay.
Args:
delay: Delay in seconds.
session: Session to use (current session if None).
"""
def reactive.poll(
func: Callable[[], T],
interval_secs: float,
session: Session | None = None
) -> Callable[[], T]:
"""
Poll a function at regular intervals.
Args:
func: Function to poll.
interval_secs: Polling interval in seconds.
session: Session to use (current session if None).
Returns:
Reactive function that returns polled value.
"""
def reactive.file_reader(
filepath: str | os.PathLike[str],
session: Session | None = None
) -> Callable[[], str | None]:
"""
Reactively read a file when it changes.
Args:
filepath: Path to file to monitor.
session: Session to use (current session if None).
Returns:
Function that returns file contents when changed.
"""# Auto-refresh data every 30 seconds
@reactive.calc
def live_data():
reactive.invalidate_later(30) # Refresh every 30 seconds
return fetch_current_data()
# Poll external API
api_data = reactive.poll(
lambda: requests.get('https://api.example.com/data').json(),
interval_secs=60 # Poll every minute
)
@output
@render.text
def current_api_data():
data = api_data()
return f"Latest data: {data}"
# Monitor log file
log_contents = reactive.file_reader("/var/log/myapp.log")
@output
@render.text
def log_display():
contents = log_contents()
if contents:
# Show last 10 lines
return "\n".join(contents.strip().split("\n")[-10:])
return "No log data"Low-level utilities for managing reactive contexts.
def reactive.isolate(fn: Callable[[], T]) -> T:
"""
Execute function without establishing reactive dependencies.
Args:
fn: Function to execute in isolation.
Returns:
Function result.
"""
def reactive.flush() -> None:
"""
Force the reactive system to flush all pending updates.
"""
def reactive.get_current_context() -> Context | None:
"""
Get the current reactive context.
Returns:
Current Context object or None if not in reactive context.
"""
class reactive.Context:
"""
Reactive execution context.
"""
def __enter__(self) -> Context: ...
def __exit__(self, *args: object) -> None: ...# Execute code without creating reactive dependencies
@reactive.calc
def smart_calculation():
# This will create a reactive dependency
primary_input = input.primary_value()
# This won't create a reactive dependency
config_value = reactive.isolate(lambda: input.config_setting())
# Calculation only re-runs when primary_value changes,
# not when config_setting changes
return perform_calculation(primary_input, config_value)
# Force reactive flush for testing or debugging
def update_all_outputs():
# Make some changes
my_reactive_value.set("new value")
another_value.set(42)
# Force immediate update of all dependent calculations
reactive.flush()
# Now all outputs are guaranteed to be current
# Work with reactive context
context = reactive.get_current_context()
if context:
# We're in a reactive context
print("Currently in reactive execution")Install with Tessl CLI
npx tessl i tessl/pypi-shiny