Simple security for Flask apps
Flask-Security provides Flask signals that allow you to hook into security workflows for customization and extension. These signals are sent at key points during user registration, confirmation, password reset, and login processes, enabling you to add custom behavior without modifying Flask-Security's core functionality.
Signals related to user registration and account creation.
user_registered = Signal()
"""
Signal sent when a user registers.
Sender: Flask application
Data:
user: The newly registered user
confirm_token: Email confirmation token (if confirmable enabled)
"""Signals related to email confirmation processes.
user_confirmed = Signal()
"""
Signal sent when user confirms their email address.
Sender: Flask application
Data:
user: The user who confirmed their email
"""
confirm_instructions_sent = Signal()
"""
Signal sent when confirmation instructions are dispatched via email.
Sender: Flask application
Data:
user: The user receiving confirmation instructions
token: The confirmation token
"""Signals related to password reset workflows.
password_reset = Signal()
"""
Signal sent when a user's password is reset.
Sender: Flask application
Data:
user: The user whose password was reset
"""
reset_password_instructions_sent = Signal()
"""
Signal sent when password reset instructions are dispatched via email.
Sender: Flask application
Data:
user: The user receiving reset instructions
token: The password reset token
"""Additional signals for authentication events.
login_instructions_sent = Signal()
"""
Signal sent when passwordless login instructions are dispatched via email.
Sender: Flask application
Data:
user: The user receiving login instructions
token: The login token
"""
password_changed = Signal()
"""
Signal sent when a user changes their password.
Sender: Flask application
Data:
user: The user who changed their password
"""from flask_security import user_registered, user_confirmed
from flask import current_app
@user_registered.connect_via(app)
def user_registered_sighandler(sender, **extra):
user = extra['user']
confirm_token = extra.get('confirm_token')
print(f"New user registered: {user.email}")
# Send welcome email
send_welcome_email(user)
# Log registration
current_app.logger.info(f"User registered: {user.email}")
@user_confirmed.connect_via(app)
def user_confirmed_sighandler(sender, **extra):
user = extra['user']
print(f"User confirmed email: {user.email}")
# Grant default role
user_datastore.add_role_to_user(user, 'member')
user_datastore.commit()from flask_security import user_registered, confirm_instructions_sent
from datetime import datetime, timedelta
@user_registered.connect_via(app)
def handle_user_registration(sender, **extra):
user = extra['user']
# Set user status
user.status = 'pending_confirmation'
user.registered_at = datetime.utcnow()
# Create user profile
profile = UserProfile(user_id=user.id)
db.session.add(profile)
# Send to admin notification queue
notify_admin_new_user(user)
# Track registration metrics
increment_registration_counter()
db.session.commit()
@confirm_instructions_sent.connect_via(app)
def track_confirmation_sent(sender, **extra):
user = extra['user']
token = extra['token']
# Log confirmation email sent
ConfirmationLog.create(
user_id=user.id,
token=token,
sent_at=datetime.utcnow()
)
# Set reminder for follow-up
schedule_confirmation_reminder(user, delay=timedelta(days=1))from flask_security import reset_password_instructions_sent, password_reset
@reset_password_instructions_sent.connect_via(app)
def handle_reset_instructions(sender, **extra):
user = extra['user']
token = extra['token']
# Log reset request
PasswordResetLog.create(
user_id=user.id,
token=token,
requested_at=datetime.utcnow(),
ip_address=request.remote_addr,
user_agent=request.user_agent.string
)
# Send admin notification for suspicious activity
if is_suspicious_reset_pattern(user):
notify_admin_suspicious_reset(user)
@password_reset.connect_via(app)
def handle_password_reset(sender, **extra):
user = extra['user']
# Update security log
user.password_changed_at = datetime.utcnow()
user.force_password_change = False
# Invalidate all existing sessions
invalidate_user_sessions(user)
# Send confirmation email
send_password_reset_confirmation(user)
# Log security event
SecurityEvent.create(
user_id=user.id,
event_type='password_reset',
timestamp=datetime.utcnow()
)
db.session.commit()from flask_security import user_confirmed
@user_confirmed.connect_via(app)
def handle_user_confirmation(sender, **extra):
user = extra['user']
# Update user status
user.status = 'active'
user.confirmed_at = datetime.utcnow()
# Grant default permissions
default_role = user_datastore.find_role('member')
if default_role:
user_datastore.add_role_to_user(user, default_role)
# Send welcome email with additional info
send_welcome_email_with_getting_started(user)
# Create default user preferences
UserPreferences.create_defaults(user)
# Track confirmation metrics
increment_confirmation_counter()
db.session.commit()from flask import Flask
from flask_security import Security, user_registered
def create_app():
app = Flask(__name__)
security = Security(app, user_datastore)
# Connect signals after app creation
connect_signals(app)
return app
def connect_signals(app):
@user_registered.connect_via(app)
def handle_registration(sender, **extra):
user = extra['user']
# App-specific registration logic
if app.config.get('SEND_SLACK_NOTIFICATIONS'):
send_slack_notification(f"New user: {user.email}")
if app.config.get('SYNC_WITH_CRM'):
sync_user_to_crm(user)from flask.signals import Namespace
# Create custom namespace for your signals
my_signals = Namespace()
# Define custom signals
user_profile_updated = my_signals.signal('user-profile-updated')
user_subscription_changed = my_signals.signal('user-subscription-changed')
# Send custom signals
def update_user_profile(user, profile_data):
# Update profile logic
user.profile.update(profile_data)
db.session.commit()
# Send custom signal
user_profile_updated.send(
current_app._get_current_object(),
user=user,
changes=profile_data
)
# Handle custom signals
@user_profile_updated.connect_via(app)
def handle_profile_update(sender, **extra):
user = extra['user']
changes = extra['changes']
# Custom handling logic
update_search_index(user)
notify_connected_services(user, changes)class NotificationPlugin:
def __init__(self, app):
self.app = app
self.setup_signals()
def setup_signals(self):
user_registered.connect_via(self.app)(self.on_user_registered)
user_confirmed.connect_via(self.app)(self.on_user_confirmed)
password_reset.connect_via(self.app)(self.on_password_reset)
def on_user_registered(self, sender, **extra):
user = extra['user']
self.send_notification('registration', user)
def on_user_confirmed(self, sender, **extra):
user = extra['user']
self.send_notification('confirmation', user)
def on_password_reset(self, sender, **extra):
user = extra['user']
self.send_notification('password_reset', user)
def send_notification(self, event_type, user):
# Notification logic
notification_service.send(
event=event_type,
user_id=user.id,
email=user.email
)
# Register plugin
notification_plugin = NotificationPlugin(app)import unittest
from flask_security import user_registered
class TestSignals(unittest.TestCase):
def setUp(self):
self.app = create_app()
self.app_context = self.app.app_context()
self.app_context.push()
self.signal_received = False
self.signal_data = None
# Connect test signal handler
user_registered.connect(self.on_user_registered, sender=self.app)
def tearDown(self):
self.app_context.pop()
def on_user_registered(self, sender, **extra):
self.signal_received = True
self.signal_data = extra
def test_user_registration_sends_signal(self):
# Register a user
user = user_datastore.create_user(
email='test@example.com',
password='password'
)
# Trigger registration signal
user_registered.send(
self.app,
user=user,
confirm_token='test-token'
)
# Assert signal was received
self.assertTrue(self.signal_received)
self.assertEqual(self.signal_data['user'], user)
self.assertEqual(self.signal_data['confirm_token'], 'test-token')tessl i tessl/pypi-flask-security@3.0.0