CtrlK
BlogDocsLog inGet started
Tessl Logo

giuseppe-trisciuoglio/developer-kit

Comprehensive developer toolkit providing reusable skills for Java/Spring Boot, TypeScript/NestJS/React/Next.js, Python, PHP, AWS CloudFormation, AI/RAG, DevOps, and more.

90

Quality

90%

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Risky

Do not use without reviewing

This version of the tile failed moderation
Moderation pipeline encountered an internal error
Overview
Quality
Evals
Security
Files

drift-monitor.pyplugins/developer-kit-specs/hooks/

#!/usr/bin/env python3
"""Drift Guard Monitor Hook for Claude Code.

Monitors file creation/modification events (PostToolUse on Write|Edit)
and alerts when unplanned files are detected.

Reads current state from _drift/state.json, compares file paths against
Expected Files list, emits informational alerts for unplanned files,
and logs all drift events to _drift/drift-events.log.

Hook event: PostToolUse with matcher "Write|Edit"
Input:  JSON via stdin { "tool_name": "Write|Edit", "tool_input": { "file_path": "..." }, ... }
Output: Exit 0 = allow | Alert message on stdout | Exit 2 = block (not used)

Zero external dependencies — pure Python 3 standard library only.
"""

import json
import os
import sys
from datetime import datetime
from pathlib import Path

# ─── Constants ──────────────────────────────────────────────────────────────

STATE_FILE_NAME = "state.json"
DRIFT_DIR_NAME = "_drift"
DRIFT_EVENTS_LOG = "drift-events.log"
ALERTED_FILES_FIELD = "alerted_files"


# ─── State Management ─────────────────────────────────────────────────────────


def find_state_file(cwd: str) -> str | None:
    """Find _drift/state.json by searching upward from cwd.

    Returns absolute path to state.json if found, None otherwise.
    """
    current = Path(cwd).resolve()

    # Search upward until root or found
    for _ in range(20):  # Prevent infinite loops
        drift_dir = current / DRIFT_DIR_NAME
        state_file = drift_dir / STATE_FILE_NAME

        if state_file.exists():
            return str(state_file)

        # Move up one directory
        if current.parent == current:  # Reached root
            break
        current = current.parent

    return None


def load_state(state_path: str) -> dict | None:
    """Load state.json, return None if file not found or invalid.

    Graceful degradation: returns None on any error.
    """
    try:
        with open(state_path, "r", encoding="utf-8") as f:
            return json.load(f)
    except (FileNotFoundError, json.JSONDecodeError, PermissionError):
        return None


def update_alerted_files(state_path: str, file_path: str) -> None:
    """Add file path to alerted_files list in state.json.

    Atomic update to prevent race conditions.
    """
    try:
        with open(state_path, "r", encoding="utf-8") as f:
            state = json.load(f)

        # Initialize alerted_files if missing
        if ALERTED_FILES_FIELD not in state:
            state[ALERTED_FILES_FIELD] = []

        # Add file if not already present
        if file_path not in state[ALERTED_FILES_FIELD]:
            state[ALERTED_FILES_FIELD].append(file_path)

        # Write back atomically
        with open(state_path, "w", encoding="utf-8") as f:
            json.dump(state, f, indent=2)

    except (FileNotFoundError, json.JSONDecodeError, PermissionError, OSError):
        # Graceful degradation: fail silently
        pass


# ─── Drift Event Logging ─────────────────────────────────────────────────────


def log_drift_event(spec_folder: str, file_path: str) -> None:
    """Append drift event to _drift/drift-events.log.

    Every drift event is logged, even duplicates.
    """
    try:
        drift_dir = os.path.join(spec_folder, DRIFT_DIR_NAME)
        log_path = os.path.join(drift_dir, DRIFT_EVENTS_LOG)

        timestamp = datetime.now().isoformat()
        log_entry = f"{timestamp} | {file_path}\n"

        with open(log_path, "a", encoding="utf-8") as f:
            f.write(log_entry)

    except (OSError, IOError):
        # Graceful degradation: fail silently
        pass


# ─── File Path Comparison ────────────────────────────────────────────────────


def is_expected_file(file_path: str, state: dict) -> bool:
    """Check if file path is in Expected Files list.

    Exact string match, case-sensitive, no path normalization.
    """
    expected_files = state.get("expected_files", [])
    return file_path in expected_files


def is_already_alerted(file_path: str, state: dict) -> bool:
    """Check if file has already been alerted in this session.

    """
    alerted_files = state.get(ALERTED_FILES_FIELD, [])
    return file_path in alerted_files


def emit_alert(file_path: str, state: dict) -> None:
    """Emit informational alert to stdout as JSON.

    Alert format: [Drift Guard] Unplanned file detected: <path>
                  Active task: <task_id>
                  Expected files (partial): <first 5 files>
    """
    task_id = state.get("task_id", "unknown")
    expected_files = state.get("expected_files", [])

    # Show first 5 expected files as context
    preview = expected_files[:5]
    preview_str = ", ".join(preview)
    if len(expected_files) > 5:
        preview_str += f", ... ({len(expected_files) - 5} more)"

    message = (
        f"[Drift Guard] Unplanned file detected: {file_path}\n"
        f"[Drift Guard] Active task: {task_id}\n"
        f"[Drift Guard] Expected files (partial): {preview_str}"
    )
    output = {"type": "notification", "message": message}
    print(json.dumps(output))


# ─── Entry Point ─────────────────────────────────────────────────────────────


def main() -> None:
    # 1. Parse input JSON
    try:
        input_data = json.load(sys.stdin)
    except (json.JSONDecodeError, ValueError):
        sys.exit(0)  # Malformed input → silent exit

    # 2. Validate hook event
    if input_data.get("hook_event_name") != "PostToolUse":
        sys.exit(0)  # Wrong event → silent exit

    tool_name = input_data.get("tool_name")
    if tool_name not in ("Write", "Edit"):
        sys.exit(0)  # Not a file operation → silent exit

    # 3. Extract file path from tool_input
    tool_input = input_data.get("tool_input", {})
    file_path = tool_input.get("file_path")

    if not file_path:
        sys.exit(0)  # No file path → silent exit

    # 4. Find and load state.json
    cwd = input_data.get("cwd", os.getcwd())
    state_path = find_state_file(cwd)

    if not state_path:
        # State not found → system not initialized, graceful degradation
        sys.exit(0)

    state = load_state(state_path)
    if not state:
        # Invalid state → graceful degradation
        sys.exit(0)

    # 5. Get spec folder for logging
    spec_folder = str(Path(state_path).parent.parent)

    # 6. Log drift event (always, even for expected files)
    log_drift_event(spec_folder, file_path)

    # 7. Check if file is expected
    if is_expected_file(file_path, state):
        # File is in expected list → silent exit
        sys.exit(0)

    # 8. Check if already alerted in this session
    if is_already_alerted(file_path, state):
        # Already alerted → silent exit (but still logged)
        sys.exit(0)

    # 9. Emit alert
    emit_alert(file_path, state)

    # 10. Update alerted_files to prevent duplicate alerts
    update_alerted_files(state_path, file_path)

    sys.exit(0)  # Allow operation to proceed


if __name__ == "__main__":
    main()

plugins

CHANGELOG.md

context7.json

CONTRIBUTING.md

README_CN.md

README_ES.md

README_IT.md

README.md

tessl.json

tile.json