The backend infrastructure for Jupyter web applications providing core services, APIs, and REST endpoints.
The Jupyter Server provides base handler classes and utilities for building secure, authenticated web handlers and APIs.
Core base handler for all Jupyter server requests with authentication and common functionality.
from jupyter_server.base.handlers import JupyterHandlerfrom jupyter_server.base.handlers import JupyterHandler
from jupyter_server.auth.decorator import authorized
from tornado import web
class MyCustomHandler(JupyterHandler):
"""Custom handler extending JupyterHandler."""
@authorized
async def get(self):
"""Handle GET request."""
# Access current authenticated user
user = self.current_user
if user:
self.finish({
"message": f"Hello {user.display_name}!",
"username": user.username,
})
else:
raise web.HTTPError(401, "Authentication required")
@authorized
async def post(self):
"""Handle POST request."""
# Get JSON request body
try:
data = self.get_json_body()
except ValueError:
raise web.HTTPError(400, "Invalid JSON")
# Process data
result = await self.process_data(data)
# Return JSON response
self.finish({"result": result})
async def process_data(self, data):
"""Process request data."""
# Implement custom logic
return f"Processed: {data}"
def write_error(self, status_code, **kwargs):
"""Custom error handling."""
self.set_header("Content-Type", "application/json")
error_message = {
"error": {
"code": status_code,
"message": self._reason,
}
}
self.finish(error_message)Base class for REST API endpoints with JSON handling and CORS support.
from jupyter_server.base.handlers import APIHandler
from jupyter_server.auth.decorator import authorized
from tornado import web
import json
class MyAPIHandler(APIHandler):
"""REST API handler."""
@authorized
async def get(self, resource_id=None):
"""Get resource(s)."""
if resource_id:
# Get specific resource
resource = await self.get_resource(resource_id)
if not resource:
raise web.HTTPError(404, f"Resource {resource_id} not found")
self.finish(resource)
else:
# List all resources
resources = await self.list_resources()
self.finish({"resources": resources})
@authorized
async def post(self):
"""Create new resource."""
data = self.get_json_body()
# Validate required fields
required_fields = ["name", "type"]
for field in required_fields:
if field not in data:
raise web.HTTPError(400, f"Missing required field: {field}")
# Create resource
resource = await self.create_resource(data)
self.set_status(201) # Created
self.finish(resource)
@authorized
async def put(self, resource_id):
"""Update existing resource."""
data = self.get_json_body()
# Check if resource exists
existing = await self.get_resource(resource_id)
if not existing:
raise web.HTTPError(404, f"Resource {resource_id} not found")
# Update resource
updated = await self.update_resource(resource_id, data)
self.finish(updated)
@authorized
async def delete(self, resource_id):
"""Delete resource."""
# Check if resource exists
existing = await self.get_resource(resource_id)
if not existing:
raise web.HTTPError(404, f"Resource {resource_id} not found")
# Delete resource
await self.delete_resource(resource_id)
self.set_status(204) # No Content
self.finish()
async def get_resource(self, resource_id):
"""Get resource by ID."""
# Implement resource retrieval
return {"id": resource_id, "name": "Example Resource"}
async def list_resources(self):
"""List all resources."""
# Implement resource listing
return [
{"id": "1", "name": "Resource 1"},
{"id": "2", "name": "Resource 2"},
]
async def create_resource(self, data):
"""Create new resource."""
# Implement resource creation
resource_id = "new-id"
return {"id": resource_id, **data}
async def update_resource(self, resource_id, data):
"""Update existing resource."""
# Implement resource update
return {"id": resource_id, **data}
async def delete_resource(self, resource_id):
"""Delete resource."""
# Implement resource deletion
passBase for handlers requiring authentication.
from jupyter_server.base.handlers import AuthenticatedHandler
from tornado import web
class MyAuthenticatedHandler(AuthenticatedHandler):
"""Handler requiring authentication."""
async def get(self):
"""GET requires authentication."""
# User is guaranteed to be authenticated
user = self.current_user
from dataclasses import asdict
self.finish({
"authenticated": True,
"user": asdict(user),
"message": "This is a protected endpoint",
})
async def prepare(self):
"""Called before any HTTP method."""
await super().prepare()
# Custom preparation logic
self.request_start_time = time.time()
def on_finish(self):
"""Called after request completion."""
if hasattr(self, 'request_start_time'):
duration = time.time() - self.request_start_time
self.log.info(f"Request took {duration:.3f} seconds")Serves static files with authentication requirements.
from jupyter_server.base.handlers import AuthenticatedFileHandler
import os
# Configure authenticated static file serving
static_path = "/path/to/static/files"
handler_class = AuthenticatedFileHandler
handler_settings = {"path": static_path}
# In web application setup
handlers = [
(r"/static/(.*)", handler_class, handler_settings),
]
# Usage: Files at /path/to/static/files/* are served at /static/*
# Authentication is required to access these filesAdvanced file discovery and serving with caching.
from jupyter_server.base.handlers import FileFindHandler
# Configure file finder
file_finder_settings = {
"path": ["/path/to/files", "/another/path"],
"default_filename": "index.html",
}
handlers = [
(r"/files/(.*)", FileFindHandler, file_finder_settings),
]
# Features:
# - Searches multiple paths in order
# - Caches file locations for performance
# - Supports default filenames for directories
# - Content type detection
# - ETags and caching headersBridge between WebSocket connections and ZeroMQ sockets.
from jupyter_server.base.zmqhandlers import ZMQStreamHandler, AuthenticatedZMQStreamHandlerfrom jupyter_server.base.zmqhandlers import AuthenticatedZMQStreamHandler
from jupyter_client.session import Session
import zmq
class MyZMQHandler(AuthenticatedZMQStreamHandler):
"""WebSocket handler for ZMQ communication."""
def initialize(self, zmq_stream, session):
"""Initialize with ZMQ stream and session."""
self.zmq_stream = zmq_stream
self.session = session
def open(self, kernel_id):
"""Handle WebSocket connection."""
self.kernel_id = kernel_id
self.log.info(f"WebSocket opened for kernel {kernel_id}")
# Connect to ZMQ socket
self.zmq_stream.on_recv(self.on_zmq_reply)
async def on_message(self, msg):
"""Handle WebSocket message."""
# Parse WebSocket message
try:
ws_msg = json.loads(msg)
except json.JSONDecodeError:
self.log.error("Invalid JSON in WebSocket message")
return
# Forward to ZMQ socket
zmq_msg = self.session.serialize(ws_msg)
self.zmq_stream.send(zmq_msg)
def on_zmq_reply(self, msg_list):
"""Handle ZMQ reply."""
# Deserialize ZMQ message
try:
msg = self.session.deserialize(msg_list)
except Exception as e:
self.log.error(f"Failed to deserialize ZMQ message: {e}")
return
# Forward to WebSocket
self.write_message(json.dumps(msg))
def on_close(self):
"""Handle WebSocket disconnection."""
self.log.info(f"WebSocket closed for kernel {self.kernel_id}")
# Cleanup ZMQ connection
if hasattr(self, 'zmq_stream'):
self.zmq_stream.close()from jupyter_server.base.zmqhandlers import WebSocketMixin
from tornado import websocket
import json
class MyWebSocketHandler(WebSocketMixin, websocket.WebSocketHandler):
"""Custom WebSocket handler with utilities."""
def open(self):
"""Handle connection."""
self.log.info("WebSocket connection opened")
self.ping_interval = self.settings.get("websocket_ping_interval", 30)
self.start_ping()
def on_message(self, message):
"""Handle incoming message."""
try:
data = json.loads(message)
msg_type = data.get("type")
if msg_type == "ping":
self.send_message({"type": "pong"})
elif msg_type == "echo":
self.send_message({"type": "echo", "data": data.get("data")})
else:
self.log.warning(f"Unknown message type: {msg_type}")
except json.JSONDecodeError:
self.send_error("Invalid JSON message")
def on_close(self):
"""Handle disconnection."""
self.log.info("WebSocket connection closed")
self.stop_ping()
def send_message(self, message):
"""Send JSON message to client."""
self.write_message(json.dumps(message))
def send_error(self, error_message):
"""Send error message to client."""
self.send_message({
"type": "error",
"message": error_message,
})
def start_ping(self):
"""Start periodic ping to keep connection alive."""
import tornado.ioloop
def send_ping():
if not self.ws_connection or self.ws_connection.is_closing():
return
self.ping(b"keepalive")
tornado.ioloop.IOLoop.current().call_later(
self.ping_interval, send_ping
)
tornado.ioloop.IOLoop.current().call_later(
self.ping_interval, send_ping
)
def stop_ping(self):
"""Stop ping timer."""
# Cleanup would happen automatically when connection closes
passCustom 404 error pages with templating.
from jupyter_server.base.handlers import Template404
# Custom 404 handler
class My404Handler(Template404):
"""Custom 404 error page."""
def get_template_path(self):
"""Return path to error templates."""
return "/path/to/error/templates"
def get_template_name(self):
"""Return 404 template name."""
return "my_404.html"
def get_template_vars(self):
"""Return variables for template rendering."""
return {
"path": self.request.path,
"method": self.request.method,
"user": self.current_user,
"app_name": "My Jupyter App",
}
# Register as default handler
handlers = [
# ... other handlers
(r".*", My404Handler), # Catch-all for 404s
]from jupyter_server.utils import (
url_path_join,
url_is_absolute,
path2url,
url2path,
url_escape,
url_unescape,
)from jupyter_server.utils import (
url_path_join,
url_is_absolute,
path2url,
url2path,
url_escape,
url_unescape,
to_os_path,
to_api_path,
)
# URL path joining
base_url = "/api/v1"
endpoint = "contents/file.txt"
full_url = url_path_join(base_url, endpoint)
print(full_url) # "/api/v1/contents/file.txt"
# URL validation
is_abs = url_is_absolute("https://example.com/path")
print(is_abs) # True
is_rel = url_is_absolute("/relative/path")
print(is_rel) # False
# Path conversions
file_path = "/home/user/notebook.ipynb"
api_path = path2url(file_path)
print(api_path) # "home/user/notebook.ipynb"
# URL encoding/decoding
encoded = url_escape("file with spaces.txt")
print(encoded) # "file%20with%20spaces.txt"
decoded = url_unescape("file%20with%20spaces.txt")
print(decoded) # "file with spaces.txt"
# OS path conversions
api_path = "notebooks/example.ipynb"
os_path = to_os_path(api_path, root="/home/user")
print(os_path) # "/home/user/notebooks/example.ipynb"
os_path = "/home/user/notebooks/example.ipynb"
api_path = to_api_path(os_path, root="/home/user")
print(api_path) # "notebooks/example.ipynb"from jupyter_server.utils import (
samefile_simple,
check_version,
run_sync,
fetch,
import_item,
expand_path,
filefind,
)
# File comparison
are_same = samefile_simple("/path/to/file1", "/path/to/file2")
print(f"Files are same: {are_same}")
# Version checking
is_compatible = check_version("1.0.0", ">=0.9.0")
print(f"Version compatible: {is_compatible}")
# Run async function synchronously
async def async_task():
return "Result from async function"
result = run_sync(async_task())
print(result) # "Result from async function"
# HTTP client
response = await fetch("https://api.example.com/data")
data = response.json()
# Dynamic import
MyClass = import_item("my_package.module.MyClass")
instance = MyClass()
# Path expansion
expanded = expand_path("~/documents/notebook.ipynb")
print(expanded) # "/home/user/documents/notebook.ipynb"
# File finding
found_file = filefind("config.json", ["/etc", "/usr/local/etc", "~/.config"])
print(f"Config found at: {found_file}")from jupyter_server.utils import ensure_async, run_sync_in_loop
import asyncio
# Ensure function is async
def sync_function():
return "sync result"
async_func = ensure_async(sync_function)
result = await async_func() # "sync result"
# Run async in existing event loop
async def run_in_background():
loop = asyncio.get_event_loop()
def background_task():
# Some sync work
return "background result"
result = await run_sync_in_loop(background_task)
print(result)from jupyter_server.utils import (
urlencode_unix_socket_path,
urldecode_unix_socket_path,
unix_socket_in_use,
)
# Unix socket path encoding for URLs
socket_path = "/tmp/jupyter.sock"
encoded = urlencode_unix_socket_path(socket_path)
print(encoded) # Encoded socket path for URL use
# Decode socket path from URL
decoded = urldecode_unix_socket_path(encoded)
print(decoded) # "/tmp/jupyter.sock"
# Check if socket is in use
in_use = unix_socket_in_use(socket_path)
print(f"Socket in use: {in_use}")from jupyter_server.serverapp import ServerApp
from tornado.httpclient import AsyncHTTPClient
from tornado.testing import AsyncHTTPTestCase
import json
class MyHandlerTest(AsyncHTTPTestCase):
"""Test case for custom handlers."""
def get_app(self):
"""Create test application."""
self.server_app = ServerApp()
self.server_app.initialize()
return self.server_app.web_app
async def test_api_endpoint(self):
"""Test API endpoint."""
# Make GET request
response = await self.fetch(
"/api/my_endpoint",
method="GET",
headers={"Authorization": "Bearer test-token"},
)
self.assertEqual(response.code, 200)
# Parse JSON response
data = json.loads(response.body)
self.assertIn("result", data)
async def test_post_endpoint(self):
"""Test POST endpoint."""
payload = {"name": "test", "value": 123}
response = await self.fetch(
"/api/my_endpoint",
method="POST",
headers={
"Content-Type": "application/json",
"Authorization": "Bearer test-token",
},
body=json.dumps(payload),
)
self.assertEqual(response.code, 201)
data = json.loads(response.body)
self.assertEqual(data["name"], "test")
async def test_websocket_connection(self):
"""Test WebSocket connection."""
from tornado.websocket import websocket_connect
ws_url = f"ws://localhost:{self.get_http_port()}/api/websocket"
ws = await websocket_connect(ws_url)
# Send message
test_message = {"type": "test", "data": "hello"}
ws.write_message(json.dumps(test_message))
# Receive response
response = await ws.read_message()
data = json.loads(response)
self.assertEqual(data["type"], "response")
ws.close()from jupyter_server.base.handlers import PrometheusMetricsHandler
# Built-in metrics endpoint
# GET /api/metrics returns Prometheus-format metrics
# Custom metrics in handlers
class MetricsHandler(APIHandler):
"""Handler with custom metrics."""
def initialize(self):
self.request_count = 0
self.error_count = 0
async def get(self):
"""GET with metrics tracking."""
start_time = time.time()
self.request_count += 1
try:
# Handle request
result = await self.process_request()
self.finish(result)
except Exception as e:
self.error_count += 1
raise
finally:
# Log performance
duration = time.time() - start_time
self.log.info(
f"Request completed in {duration:.3f}s "
f"(total: {self.request_count}, errors: {self.error_count})"
)from jupyter_server.base.handlers import JupyterHandler
import time
class LoggedHandler(JupyterHandler):
"""Handler with detailed request logging."""
def prepare(self):
"""Log request start."""
self.start_time = time.time()
self.log.info(
f"Request started: {self.request.method} {self.request.path} "
f"from {self.request.remote_ip}"
)
def on_finish(self):
"""Log request completion."""
duration = time.time() - self.start_time
status = self.get_status()
self.log.info(
f"Request finished: {self.request.method} {self.request.path} "
f"-> {status} in {duration:.3f}s "
f"({self.request.body_size} bytes in, {self._write_buffer_size} bytes out)"
)
def write_error(self, status_code, **kwargs):
"""Log errors."""
self.log.error(
f"Request error: {self.request.method} {self.request.path} "
f"-> {status_code} {self._reason}"
)
super().write_error(status_code, **kwargs)Install with Tessl CLI
npx tessl i tessl/pypi-jupyter-server