The backend infrastructure for Jupyter web applications providing core services, APIs, and REST endpoints.
The Jupyter Server provides a flexible authentication and authorization system supporting multiple identity providers and fine-grained access control.
Represents user information in the system.
from jupyter_server.auth.identity import Userfrom jupyter_server.auth.identity import User
from dataclasses import dataclass
# Create user instance
user = User(
username="alice",
name="Alice Smith",
display_name="Alice",
initials="AS",
avatar_url="https://example.com/avatar.jpg",
color="blue"
)
# Access user properties
print(f"Username: {user.username}")
print(f"Display name: {user.display_name}")
print(f"Initials: {user.initials}")
# User serialization for APIs
from dataclasses import asdict
user_dict = asdict(user)
print(user_dict)
# {
# "username": "alice",
# "name": "Alice Smith",
# "display_name": "Alice",
# "initials": "AS",
# "avatar_url": "https://example.com/avatar.jpg",
# "color": "blue"
# }Base class for authentication providers.
from jupyter_server.auth.identity import IdentityProviderfrom jupyter_server.auth.identity import IdentityProvider, User
from jupyter_server.base.handlers import JupyterHandler
from typing import Optional
class MyIdentityProvider(IdentityProvider):
"""Custom identity provider implementation."""
async def get_user(self, handler: JupyterHandler) -> Optional[User]:
"""Authenticate user from request handler."""
# Check custom authentication header
auth_header = handler.request.headers.get("X-Custom-Auth")
if not auth_header:
return None
# Validate authentication token
user_info = await self.validate_token(auth_header)
if not user_info:
return None
return User(
username=user_info["username"],
name=user_info["full_name"],
display_name=user_info["display_name"],
)
async def validate_token(self, token: str) -> Optional[dict]:
"""Validate authentication token."""
# Implement token validation logic
# This could involve API calls, database queries, etc.
if token == "valid-token-123":
return {
"username": "alice",
"full_name": "Alice Smith",
"display_name": "Alice",
}
return None
# Configure server to use custom identity provider
from jupyter_server.serverapp import ServerApp
app = ServerApp()
app.identity_provider_class = MyIdentityProviderBuilt-in password authentication provider.
from jupyter_server.auth.identity import PasswordIdentityProviderfrom jupyter_server.auth.identity import PasswordIdentityProvider
from jupyter_server.serverapp import ServerApp
# Configure password authentication
app = ServerApp()
app.identity_provider_class = PasswordIdentityProvider
# Set password (hashed)
app.password = "sha1:hash:of:password"
# Or set token-based authentication
app.token = "secret-token-here"
# Allow password changes
app.allow_password_change = True
# Start server with authentication
app.initialize()
app.start()Compatibility with older Jupyter authentication.
from jupyter_server.auth.identity import LegacyIdentityProvider
from jupyter_server.serverapp import ServerApp
# Use legacy authentication (not recommended for new deployments)
app = ServerApp()
app.identity_provider_class = LegacyIdentityProviderAbstract base for access control implementations.
from jupyter_server.auth.authorizer import Authorizerfrom jupyter_server.auth.authorizer import Authorizer
from jupyter_server.auth.identity import User
from jupyter_server.base.handlers import JupyterHandler
from typing import Optional
class MyAuthorizer(Authorizer):
"""Custom authorization implementation."""
async def is_authorized(
self,
handler: JupyterHandler,
user: User,
action: str,
resource: str
) -> bool:
"""Check if user is authorized for action on resource."""
# Admin users can do anything
if user.username in self.admin_users:
return True
# Check specific permissions
if action == "read" and resource.startswith("/api/contents"):
return self.can_read_contents(user, resource)
elif action == "write" and resource.startswith("/api/contents"):
return self.can_write_contents(user, resource)
elif action == "execute" and resource.startswith("/api/kernels"):
return self.can_execute_kernels(user)
# Default deny
return False
def can_read_contents(self, user: User, resource: str) -> bool:
"""Check if user can read specific content."""
# Implement read permission logic
return True
def can_write_contents(self, user: User, resource: str) -> bool:
"""Check if user can write specific content."""
# Implement write permission logic
return user.username != "readonly_user"
def can_execute_kernels(self, user: User) -> bool:
"""Check if user can execute kernels."""
# Implement kernel execution permission logic
return user.username in self.kernel_users
@property
def admin_users(self) -> set:
"""Set of admin usernames."""
return {"admin", "superuser"}
@property
def kernel_users(self) -> set:
"""Set of users allowed to execute kernels."""
return {"alice", "bob", "admin", "superuser"}
# Configure server to use custom authorizer
from jupyter_server.serverapp import ServerApp
app = ServerApp()
app.authorizer_class = MyAuthorizerPermissive authorizer that allows all authenticated requests.
from jupyter_server.auth.authorizer import AllowAllAuthorizer
from jupyter_server.serverapp import ServerApp
# Use permissive authorization (default)
app = ServerApp()
app.authorizer_class = AllowAllAuthorizer # This is the default
# All authenticated users can perform any action
app.initialize()
app.start()Decorator for protecting handler methods with authorization checks.
from jupyter_server.auth.decorator import authorizedfrom jupyter_server.auth.decorator import authorized
from jupyter_server.base.handlers import APIHandler
from tornado import web
class ProtectedHandler(APIHandler):
"""Handler with authorization checks."""
@authorized
async def get(self):
"""Protected GET endpoint."""
# This method only executes if user is authorized
user = self.current_user
self.finish({
"message": f"Hello {user.display_name}!",
"user": asdict(user),
})
@authorized
async def post(self):
"""Protected POST endpoint."""
# Authorization check happens automatically
data = self.get_json_body()
# Process authorized request
self.finish({"status": "success", "data": data})
@authorized
async def delete(self):
"""Protected DELETE endpoint."""
# Custom authorization logic can be added
if not await self.is_admin():
raise web.HTTPError(403, "Admin access required")
# Process admin-only deletion
self.finish({"status": "deleted"})
async def is_admin(self) -> bool:
"""Check if current user is admin."""
user = self.current_user
return user.username in ["admin", "superuser"]from jupyter_server.auth.decorator import authorized
from jupyter_server.base.handlers import APIHandler
from tornado import web
class FileHandler(APIHandler):
"""Handler for file operations with path-based authorization."""
@authorized
async def get(self, path):
"""Get file with path-based authorization."""
# Check if user can read this specific path
if not await self.can_read_path(path):
raise web.HTTPError(403, f"Access denied to {path}")
# Return file content
self.finish({"path": path, "content": "file content"})
@authorized
async def put(self, path):
"""Update file with write authorization."""
# Check write permissions for path
if not await self.can_write_path(path):
raise web.HTTPError(403, f"Write access denied to {path}")
# Update file
data = self.get_json_body()
self.finish({"path": path, "status": "updated"})
async def can_read_path(self, path: str) -> bool:
"""Check if current user can read specific path."""
user = self.current_user
authorizer = self.settings.get("authorizer")
return await authorizer.is_authorized(
handler=self,
user=user,
action="read",
resource=f"/api/contents/{path}"
)
async def can_write_path(self, path: str) -> bool:
"""Check if current user can write to specific path."""
user = self.current_user
authorizer = self.settings.get("authorizer")
return await authorizer.is_authorized(
handler=self,
user=user,
action="write",
resource=f"/api/contents/{path}"
)from jupyter_server.auth.security import passwd, passwd_check, set_passwordfrom jupyter_server.auth.security import passwd, passwd_check, set_password
# Generate password hash
password_hash = passwd("my-secret-password")
print(f"Password hash: {password_hash}")
# Output: sha1:algorithm:salt:hash
# Verify password
is_valid = passwd_check(password_hash, "my-secret-password")
print(f"Password valid: {is_valid}") # True
is_invalid = passwd_check(password_hash, "wrong-password")
print(f"Wrong password: {is_invalid}") # False
# Interactive password setting (prompts user)
hash_value = set_password()
print(f"Generated hash: {hash_value}")from jupyter_server.auth.utils import get_anonymous_username
# Generate anonymous username
anon_user = get_anonymous_username()
print(f"Anonymous user: {anon_user}") # anonymous-abc123Custom login form and authentication logic.
from jupyter_server.auth.login import LoginFormHandlerfrom jupyter_server.auth.login import LoginFormHandler
from jupyter_server.base.handlers import JupyterHandler
from tornado import web
class CustomLoginHandler(LoginFormHandler):
"""Custom login handler."""
async def get(self):
"""Show custom login form."""
# Render custom login template
html = self.render_template(
"custom_login.html",
message=self.get_argument("message", ""),
)
self.finish(html)
async def post(self):
"""Process login form submission."""
username = self.get_argument("username")
password = self.get_argument("password")
# Custom authentication logic
if await self.authenticate_user(username, password):
# Set authentication cookie/token
self.set_current_user(username)
self.redirect(self.get_argument("next", "/"))
else:
# Redirect back with error
self.redirect(
"/login?message=Invalid+credentials",
status=302
)
async def authenticate_user(self, username: str, password: str) -> bool:
"""Custom user authentication."""
# Implement authentication logic
# Could involve database lookup, LDAP, OAuth, etc.
valid_users = {
"alice": "password123",
"bob": "secret456",
}
return valid_users.get(username) == password
def set_current_user(self, username: str):
"""Set authenticated user."""
# Set secure cookie or token
self.set_secure_cookie("user", username)Handle user logout and session cleanup.
from jupyter_server.auth.logout import LogoutHandler
class CustomLogoutHandler(LogoutHandler):
"""Custom logout handler."""
async def get(self):
"""Handle logout request."""
# Clear authentication
self.clear_cookie("user")
self.clear_cookie("_xsrf")
# Custom logout logic
await self.perform_logout()
# Redirect to login page
self.redirect("/login?message=Logged+out+successfully")
async def perform_logout(self):
"""Custom logout operations."""
# Cleanup user sessions, invalidate tokens, etc.
user = self.current_user
if user:
await self.cleanup_user_sessions(user.username)
async def cleanup_user_sessions(self, username: str):
"""Cleanup user-specific resources."""
# Close user's kernels, sessions, etc.
passfrom jupyter_server.auth.identity import IdentityProvider, User
from typing import Optional
class MultiProviderIdentity(IdentityProvider):
"""Identity provider supporting multiple authentication methods."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.providers = {
"token": self.token_auth,
"header": self.header_auth,
"oauth": self.oauth_auth,
}
async def get_user(self, handler) -> Optional[User]:
"""Try multiple authentication methods."""
for method, provider in self.providers.items():
try:
user = await provider(handler)
if user:
return user
except Exception as e:
self.log.debug(f"Auth method {method} failed: {e}")
continue
return None
async def token_auth(self, handler) -> Optional[User]:
"""Token-based authentication."""
token = handler.get_argument("token", "")
if not token:
token = handler.request.headers.get("Authorization", "").replace("Bearer ", "")
if token and await self.validate_token(token):
return await self.get_user_from_token(token)
return None
async def header_auth(self, handler) -> Optional[User]:
"""Header-based authentication."""
user_header = handler.request.headers.get("X-Remote-User")
if user_header:
return User(username=user_header, name=user_header)
return None
async def oauth_auth(self, handler) -> Optional[User]:
"""OAuth-based authentication."""
oauth_token = handler.get_secure_cookie("oauth_token")
if oauth_token:
return await self.get_user_from_oauth(oauth_token.decode())
return Nonefrom jupyter_server.auth.authorizer import Authorizer
from jupyter_server.auth.identity import User
from jupyter_server.base.handlers import JupyterHandler
class RoleBasedAuthorizer(Authorizer):
"""Role-based access control."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.user_roles = self.load_user_roles()
self.role_permissions = self.load_role_permissions()
async def is_authorized(
self,
handler: JupyterHandler,
user: User,
action: str,
resource: str
) -> bool:
"""Check authorization based on user roles."""
user_roles = self.user_roles.get(user.username, set())
# Check if any role has required permission
for role in user_roles:
permissions = self.role_permissions.get(role, set())
if self.check_permission(permissions, action, resource):
return True
return False
def check_permission(self, permissions: set, action: str, resource: str) -> bool:
"""Check if permissions allow action on resource."""
# Check exact permission
if f"{action}:{resource}" in permissions:
return True
# Check wildcard permissions
if f"{action}:*" in permissions:
return True
if "*:*" in permissions:
return True
# Check resource patterns
for perm in permissions:
if self.matches_pattern(perm, action, resource):
return True
return False
def matches_pattern(self, permission: str, action: str, resource: str) -> bool:
"""Check if permission pattern matches action and resource."""
# Implement pattern matching logic
# e.g., "read:/api/contents/*" matches "read:/api/contents/file.txt"
perm_action, perm_resource = permission.split(":", 1)
if perm_action != "*" and perm_action != action:
return False
if perm_resource == "*":
return True
# Simple wildcard matching
if perm_resource.endswith("*"):
prefix = perm_resource[:-1]
return resource.startswith(prefix)
return perm_resource == resource
def load_user_roles(self) -> dict:
"""Load user-role mappings."""
return {
"admin": {"administrator", "user"},
"alice": {"editor", "user"},
"bob": {"viewer", "user"},
"guest": {"viewer"},
}
def load_role_permissions(self) -> dict:
"""Load role-permission mappings."""
return {
"administrator": {"*:*"},
"editor": {
"read:/api/*",
"write:/api/contents/*",
"execute:/api/kernels/*",
},
"viewer": {
"read:/api/contents/*",
"read:/api/kernelspecs/*",
},
"user": {
"read:/api/me",
"read:/api/status",
},
}from jupyter_server.auth.identity import IdentityProvider, User
from typing import Optional
class MockIdentityProvider(IdentityProvider):
"""Mock identity provider for testing."""
def __init__(self, mock_user: Optional[User] = None, **kwargs):
super().__init__(**kwargs)
self.mock_user = mock_user or User(username="testuser", name="Test User")
async def get_user(self, handler) -> Optional[User]:
"""Return mock user for testing."""
return self.mock_user
# Use in tests
import pytest
from jupyter_server.serverapp import ServerApp
@pytest.fixture
def authenticated_app():
app = ServerApp()
app.identity_provider_class = MockIdentityProvider
app.initialize()
return app
def test_authenticated_endpoint(authenticated_app):
# Test with authenticated user
pass# jupyter_server_config.py
c = get_config()
# Authentication configuration
c.ServerApp.identity_provider_class = "my_package.MyIdentityProvider"
c.ServerApp.authorizer_class = "my_package.MyAuthorizer"
# Password authentication
c.ServerApp.password = "sha1:algorithm:salt:hash"
c.ServerApp.allow_password_change = True
# Token authentication
c.ServerApp.token = "secret-token-here"
c.ServerApp.token_length = 48
# Security settings
c.ServerApp.disable_check_xsrf = False
c.ServerApp.allow_remote_access = True
c.ServerApp.allow_origin = "*"
c.ServerApp.allow_credentials = True
# Custom identity provider settings
c.MyIdentityProvider.auth_timeout = 300
c.MyIdentityProvider.require_ssl = True
# Custom authorizer settings
c.MyAuthorizer.admin_users = {"admin", "superuser"}
c.MyAuthorizer.default_permissions = {"read": True, "write": False}Install with Tessl CLI
npx tessl i tessl/pypi-jupyter-server