CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-jupyter-server

The backend infrastructure for Jupyter web applications providing core services, APIs, and REST endpoints.

Overview
Eval results
Files

handlers.mddocs/

Handlers and Utilities

The Jupyter Server provides base handler classes and utilities for building secure, authenticated web handlers and APIs.

Base Handler Classes

JupyterHandler

Core base handler for all Jupyter server requests with authentication and common functionality.

from jupyter_server.base.handlers import JupyterHandler
from jupyter_server.base.handlers import JupyterHandler
from jupyter_server.auth.decorator import authorized
from tornado import web

class MyCustomHandler(JupyterHandler):
    """Custom handler extending JupyterHandler."""

    @authorized
    async def get(self):
        """Handle GET request."""
        # Access current authenticated user
        user = self.current_user
        if user:
            self.finish({
                "message": f"Hello {user.display_name}!",
                "username": user.username,
            })
        else:
            raise web.HTTPError(401, "Authentication required")

    @authorized
    async def post(self):
        """Handle POST request."""
        # Get JSON request body
        try:
            data = self.get_json_body()
        except ValueError:
            raise web.HTTPError(400, "Invalid JSON")

        # Process data
        result = await self.process_data(data)

        # Return JSON response
        self.finish({"result": result})

    async def process_data(self, data):
        """Process request data."""
        # Implement custom logic
        return f"Processed: {data}"

    def write_error(self, status_code, **kwargs):
        """Custom error handling."""
        self.set_header("Content-Type", "application/json")
        error_message = {
            "error": {
                "code": status_code,
                "message": self._reason,
            }
        }
        self.finish(error_message)

APIHandler

Base class for REST API endpoints with JSON handling and CORS support.

from jupyter_server.base.handlers import APIHandler
from jupyter_server.auth.decorator import authorized
from tornado import web
import json

class MyAPIHandler(APIHandler):
    """REST API handler."""

    @authorized
    async def get(self, resource_id=None):
        """Get resource(s)."""
        if resource_id:
            # Get specific resource
            resource = await self.get_resource(resource_id)
            if not resource:
                raise web.HTTPError(404, f"Resource {resource_id} not found")
            self.finish(resource)
        else:
            # List all resources
            resources = await self.list_resources()
            self.finish({"resources": resources})

    @authorized
    async def post(self):
        """Create new resource."""
        data = self.get_json_body()

        # Validate required fields
        required_fields = ["name", "type"]
        for field in required_fields:
            if field not in data:
                raise web.HTTPError(400, f"Missing required field: {field}")

        # Create resource
        resource = await self.create_resource(data)
        self.set_status(201)  # Created
        self.finish(resource)

    @authorized
    async def put(self, resource_id):
        """Update existing resource."""
        data = self.get_json_body()

        # Check if resource exists
        existing = await self.get_resource(resource_id)
        if not existing:
            raise web.HTTPError(404, f"Resource {resource_id} not found")

        # Update resource
        updated = await self.update_resource(resource_id, data)
        self.finish(updated)

    @authorized
    async def delete(self, resource_id):
        """Delete resource."""
        # Check if resource exists
        existing = await self.get_resource(resource_id)
        if not existing:
            raise web.HTTPError(404, f"Resource {resource_id} not found")

        # Delete resource
        await self.delete_resource(resource_id)
        self.set_status(204)  # No Content
        self.finish()

    async def get_resource(self, resource_id):
        """Get resource by ID."""
        # Implement resource retrieval
        return {"id": resource_id, "name": "Example Resource"}

    async def list_resources(self):
        """List all resources."""
        # Implement resource listing
        return [
            {"id": "1", "name": "Resource 1"},
            {"id": "2", "name": "Resource 2"},
        ]

    async def create_resource(self, data):
        """Create new resource."""
        # Implement resource creation
        resource_id = "new-id"
        return {"id": resource_id, **data}

    async def update_resource(self, resource_id, data):
        """Update existing resource."""
        # Implement resource update
        return {"id": resource_id, **data}

    async def delete_resource(self, resource_id):
        """Delete resource."""
        # Implement resource deletion
        pass

AuthenticatedHandler

Base for handlers requiring authentication.

from jupyter_server.base.handlers import AuthenticatedHandler
from tornado import web

class MyAuthenticatedHandler(AuthenticatedHandler):
    """Handler requiring authentication."""

    async def get(self):
        """GET requires authentication."""
        # User is guaranteed to be authenticated
        user = self.current_user
        from dataclasses import asdict
        self.finish({
            "authenticated": True,
            "user": asdict(user),
            "message": "This is a protected endpoint",
        })

    async def prepare(self):
        """Called before any HTTP method."""
        await super().prepare()

        # Custom preparation logic
        self.request_start_time = time.time()

    def on_finish(self):
        """Called after request completion."""
        if hasattr(self, 'request_start_time'):
            duration = time.time() - self.request_start_time
            self.log.info(f"Request took {duration:.3f} seconds")

Static File Handling

AuthenticatedFileHandler

Serves static files with authentication requirements.

from jupyter_server.base.handlers import AuthenticatedFileHandler
import os

# Configure authenticated static file serving
static_path = "/path/to/static/files"
handler_class = AuthenticatedFileHandler
handler_settings = {"path": static_path}

# In web application setup
handlers = [
    (r"/static/(.*)", handler_class, handler_settings),
]

# Usage: Files at /path/to/static/files/* are served at /static/*
# Authentication is required to access these files

FileFindHandler

Advanced file discovery and serving with caching.

from jupyter_server.base.handlers import FileFindHandler

# Configure file finder
file_finder_settings = {
    "path": ["/path/to/files", "/another/path"],
    "default_filename": "index.html",
}

handlers = [
    (r"/files/(.*)", FileFindHandler, file_finder_settings),
]

# Features:
# - Searches multiple paths in order
# - Caches file locations for performance
# - Supports default filenames for directories
# - Content type detection
# - ETags and caching headers

WebSocket Handlers

ZMQStreamHandler

Bridge between WebSocket connections and ZeroMQ sockets.

from jupyter_server.base.zmqhandlers import ZMQStreamHandler, AuthenticatedZMQStreamHandler
from jupyter_server.base.zmqhandlers import AuthenticatedZMQStreamHandler
from jupyter_client.session import Session
import zmq

class MyZMQHandler(AuthenticatedZMQStreamHandler):
    """WebSocket handler for ZMQ communication."""

    def initialize(self, zmq_stream, session):
        """Initialize with ZMQ stream and session."""
        self.zmq_stream = zmq_stream
        self.session = session

    def open(self, kernel_id):
        """Handle WebSocket connection."""
        self.kernel_id = kernel_id
        self.log.info(f"WebSocket opened for kernel {kernel_id}")

        # Connect to ZMQ socket
        self.zmq_stream.on_recv(self.on_zmq_reply)

    async def on_message(self, msg):
        """Handle WebSocket message."""
        # Parse WebSocket message
        try:
            ws_msg = json.loads(msg)
        except json.JSONDecodeError:
            self.log.error("Invalid JSON in WebSocket message")
            return

        # Forward to ZMQ socket
        zmq_msg = self.session.serialize(ws_msg)
        self.zmq_stream.send(zmq_msg)

    def on_zmq_reply(self, msg_list):
        """Handle ZMQ reply."""
        # Deserialize ZMQ message
        try:
            msg = self.session.deserialize(msg_list)
        except Exception as e:
            self.log.error(f"Failed to deserialize ZMQ message: {e}")
            return

        # Forward to WebSocket
        self.write_message(json.dumps(msg))

    def on_close(self):
        """Handle WebSocket disconnection."""
        self.log.info(f"WebSocket closed for kernel {self.kernel_id}")

        # Cleanup ZMQ connection
        if hasattr(self, 'zmq_stream'):
            self.zmq_stream.close()

WebSocket Mixins

from jupyter_server.base.zmqhandlers import WebSocketMixin
from tornado import websocket
import json

class MyWebSocketHandler(WebSocketMixin, websocket.WebSocketHandler):
    """Custom WebSocket handler with utilities."""

    def open(self):
        """Handle connection."""
        self.log.info("WebSocket connection opened")
        self.ping_interval = self.settings.get("websocket_ping_interval", 30)
        self.start_ping()

    def on_message(self, message):
        """Handle incoming message."""
        try:
            data = json.loads(message)
            msg_type = data.get("type")

            if msg_type == "ping":
                self.send_message({"type": "pong"})
            elif msg_type == "echo":
                self.send_message({"type": "echo", "data": data.get("data")})
            else:
                self.log.warning(f"Unknown message type: {msg_type}")

        except json.JSONDecodeError:
            self.send_error("Invalid JSON message")

    def on_close(self):
        """Handle disconnection."""
        self.log.info("WebSocket connection closed")
        self.stop_ping()

    def send_message(self, message):
        """Send JSON message to client."""
        self.write_message(json.dumps(message))

    def send_error(self, error_message):
        """Send error message to client."""
        self.send_message({
            "type": "error",
            "message": error_message,
        })

    def start_ping(self):
        """Start periodic ping to keep connection alive."""
        import tornado.ioloop

        def send_ping():
            if not self.ws_connection or self.ws_connection.is_closing():
                return

            self.ping(b"keepalive")
            tornado.ioloop.IOLoop.current().call_later(
                self.ping_interval, send_ping
            )

        tornado.ioloop.IOLoop.current().call_later(
            self.ping_interval, send_ping
        )

    def stop_ping(self):
        """Stop ping timer."""
        # Cleanup would happen automatically when connection closes
        pass

Error Handlers

Template404

Custom 404 error pages with templating.

from jupyter_server.base.handlers import Template404

# Custom 404 handler
class My404Handler(Template404):
    """Custom 404 error page."""

    def get_template_path(self):
        """Return path to error templates."""
        return "/path/to/error/templates"

    def get_template_name(self):
        """Return 404 template name."""
        return "my_404.html"

    def get_template_vars(self):
        """Return variables for template rendering."""
        return {
            "path": self.request.path,
            "method": self.request.method,
            "user": self.current_user,
            "app_name": "My Jupyter App",
        }

# Register as default handler
handlers = [
    # ... other handlers
    (r".*", My404Handler),  # Catch-all for 404s
]

Utility Functions

URL Utilities

from jupyter_server.utils import (
    url_path_join,
    url_is_absolute,
    path2url,
    url2path,
    url_escape,
    url_unescape,
)
from jupyter_server.utils import (
    url_path_join,
    url_is_absolute,
    path2url,
    url2path,
    url_escape,
    url_unescape,
    to_os_path,
    to_api_path,
)

# URL path joining
base_url = "/api/v1"
endpoint = "contents/file.txt"
full_url = url_path_join(base_url, endpoint)
print(full_url)  # "/api/v1/contents/file.txt"

# URL validation
is_abs = url_is_absolute("https://example.com/path")
print(is_abs)  # True

is_rel = url_is_absolute("/relative/path")
print(is_rel)  # False

# Path conversions
file_path = "/home/user/notebook.ipynb"
api_path = path2url(file_path)
print(api_path)  # "home/user/notebook.ipynb"

# URL encoding/decoding
encoded = url_escape("file with spaces.txt")
print(encoded)  # "file%20with%20spaces.txt"

decoded = url_unescape("file%20with%20spaces.txt")
print(decoded)  # "file with spaces.txt"

# OS path conversions
api_path = "notebooks/example.ipynb"
os_path = to_os_path(api_path, root="/home/user")
print(os_path)  # "/home/user/notebooks/example.ipynb"

os_path = "/home/user/notebooks/example.ipynb"
api_path = to_api_path(os_path, root="/home/user")
print(api_path)  # "notebooks/example.ipynb"

System Utilities

from jupyter_server.utils import (
    samefile_simple,
    check_version,
    run_sync,
    fetch,
    import_item,
    expand_path,
    filefind,
)

# File comparison
are_same = samefile_simple("/path/to/file1", "/path/to/file2")
print(f"Files are same: {are_same}")

# Version checking
is_compatible = check_version("1.0.0", ">=0.9.0")
print(f"Version compatible: {is_compatible}")

# Run async function synchronously
async def async_task():
    return "Result from async function"

result = run_sync(async_task())
print(result)  # "Result from async function"

# HTTP client
response = await fetch("https://api.example.com/data")
data = response.json()

# Dynamic import
MyClass = import_item("my_package.module.MyClass")
instance = MyClass()

# Path expansion
expanded = expand_path("~/documents/notebook.ipynb")
print(expanded)  # "/home/user/documents/notebook.ipynb"

# File finding
found_file = filefind("config.json", ["/etc", "/usr/local/etc", "~/.config"])
print(f"Config found at: {found_file}")

Async Utilities

from jupyter_server.utils import ensure_async, run_sync_in_loop
import asyncio

# Ensure function is async
def sync_function():
    return "sync result"

async_func = ensure_async(sync_function)
result = await async_func()  # "sync result"

# Run async in existing event loop
async def run_in_background():
    loop = asyncio.get_event_loop()

    def background_task():
        # Some sync work
        return "background result"

    result = await run_sync_in_loop(background_task)
    print(result)

Unix Socket Support

from jupyter_server.utils import (
    urlencode_unix_socket_path,
    urldecode_unix_socket_path,
    unix_socket_in_use,
)

# Unix socket path encoding for URLs
socket_path = "/tmp/jupyter.sock"
encoded = urlencode_unix_socket_path(socket_path)
print(encoded)  # Encoded socket path for URL use

# Decode socket path from URL
decoded = urldecode_unix_socket_path(encoded)
print(decoded)  # "/tmp/jupyter.sock"

# Check if socket is in use
in_use = unix_socket_in_use(socket_path)
print(f"Socket in use: {in_use}")

Handler Testing Utilities

Test Client

from jupyter_server.serverapp import ServerApp
from tornado.httpclient import AsyncHTTPClient
from tornado.testing import AsyncHTTPTestCase
import json

class MyHandlerTest(AsyncHTTPTestCase):
    """Test case for custom handlers."""

    def get_app(self):
        """Create test application."""
        self.server_app = ServerApp()
        self.server_app.initialize()
        return self.server_app.web_app

    async def test_api_endpoint(self):
        """Test API endpoint."""
        # Make GET request
        response = await self.fetch(
            "/api/my_endpoint",
            method="GET",
            headers={"Authorization": "Bearer test-token"},
        )

        self.assertEqual(response.code, 200)

        # Parse JSON response
        data = json.loads(response.body)
        self.assertIn("result", data)

    async def test_post_endpoint(self):
        """Test POST endpoint."""
        payload = {"name": "test", "value": 123}

        response = await self.fetch(
            "/api/my_endpoint",
            method="POST",
            headers={
                "Content-Type": "application/json",
                "Authorization": "Bearer test-token",
            },
            body=json.dumps(payload),
        )

        self.assertEqual(response.code, 201)
        data = json.loads(response.body)
        self.assertEqual(data["name"], "test")

    async def test_websocket_connection(self):
        """Test WebSocket connection."""
        from tornado.websocket import websocket_connect

        ws_url = f"ws://localhost:{self.get_http_port()}/api/websocket"
        ws = await websocket_connect(ws_url)

        # Send message
        test_message = {"type": "test", "data": "hello"}
        ws.write_message(json.dumps(test_message))

        # Receive response
        response = await ws.read_message()
        data = json.loads(response)

        self.assertEqual(data["type"], "response")
        ws.close()

Performance and Monitoring

Metrics Handler

from jupyter_server.base.handlers import PrometheusMetricsHandler

# Built-in metrics endpoint
# GET /api/metrics returns Prometheus-format metrics

# Custom metrics in handlers
class MetricsHandler(APIHandler):
    """Handler with custom metrics."""

    def initialize(self):
        self.request_count = 0
        self.error_count = 0

    async def get(self):
        """GET with metrics tracking."""
        start_time = time.time()
        self.request_count += 1

        try:
            # Handle request
            result = await self.process_request()
            self.finish(result)

        except Exception as e:
            self.error_count += 1
            raise

        finally:
            # Log performance
            duration = time.time() - start_time
            self.log.info(
                f"Request completed in {duration:.3f}s "
                f"(total: {self.request_count}, errors: {self.error_count})"
            )

Request Logging

from jupyter_server.base.handlers import JupyterHandler
import time

class LoggedHandler(JupyterHandler):
    """Handler with detailed request logging."""

    def prepare(self):
        """Log request start."""
        self.start_time = time.time()
        self.log.info(
            f"Request started: {self.request.method} {self.request.path} "
            f"from {self.request.remote_ip}"
        )

    def on_finish(self):
        """Log request completion."""
        duration = time.time() - self.start_time
        status = self.get_status()

        self.log.info(
            f"Request finished: {self.request.method} {self.request.path} "
            f"-> {status} in {duration:.3f}s "
            f"({self.request.body_size} bytes in, {self._write_buffer_size} bytes out)"
        )

    def write_error(self, status_code, **kwargs):
        """Log errors."""
        self.log.error(
            f"Request error: {self.request.method} {self.request.path} "
            f"-> {status_code} {self._reason}"
        )
        super().write_error(status_code, **kwargs)

Install with Tessl CLI

npx tessl i tessl/pypi-jupyter-server

docs

auth.md

configuration.md

core-application.md

extensions.md

handlers.md

index.md

services.md

tile.json