Comprehensive developer toolkit providing reusable skills for Java/Spring Boot, TypeScript/NestJS/React/Next.js, Python, PHP, AWS CloudFormation, AI/RAG, DevOps, and more.
90
90%
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Risky
Do not use without reviewing
#!/usr/bin/env python3
"""Drift Guard Monitor Hook for Claude Code.
Monitors file creation/modification events (PostToolUse on Write|Edit)
and alerts when unplanned files are detected.
Reads current state from _drift/state.json, compares file paths against
Expected Files list, emits informational alerts for unplanned files,
and logs all drift events to _drift/drift-events.log.
Hook event: PostToolUse with matcher "Write|Edit"
Input: JSON via stdin { "tool_name": "Write|Edit", "tool_input": { "file_path": "..." }, ... }
Output: Exit 0 = allow | Alert message on stdout | Exit 2 = block (not used)
Zero external dependencies — pure Python 3 standard library only.
"""
import json
import os
import sys
from datetime import datetime
from pathlib import Path
# ─── Constants ──────────────────────────────────────────────────────────────
STATE_FILE_NAME = "state.json"
DRIFT_DIR_NAME = "_drift"
DRIFT_EVENTS_LOG = "drift-events.log"
ALERTED_FILES_FIELD = "alerted_files"
# ─── State Management ─────────────────────────────────────────────────────────
def find_state_file(cwd: str) -> str | None:
"""Find _drift/state.json by searching upward from cwd.
Returns absolute path to state.json if found, None otherwise.
"""
current = Path(cwd).resolve()
# Search upward until root or found
for _ in range(20): # Prevent infinite loops
drift_dir = current / DRIFT_DIR_NAME
state_file = drift_dir / STATE_FILE_NAME
if state_file.exists():
return str(state_file)
# Move up one directory
if current.parent == current: # Reached root
break
current = current.parent
return None
def load_state(state_path: str) -> dict | None:
"""Load state.json, return None if file not found or invalid.
Graceful degradation: returns None on any error.
"""
try:
with open(state_path, "r", encoding="utf-8") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError, PermissionError):
return None
def update_alerted_files(state_path: str, file_path: str) -> None:
"""Add file path to alerted_files list in state.json.
Atomic update to prevent race conditions.
"""
try:
with open(state_path, "r", encoding="utf-8") as f:
state = json.load(f)
# Initialize alerted_files if missing
if ALERTED_FILES_FIELD not in state:
state[ALERTED_FILES_FIELD] = []
# Add file if not already present
if file_path not in state[ALERTED_FILES_FIELD]:
state[ALERTED_FILES_FIELD].append(file_path)
# Write back atomically
with open(state_path, "w", encoding="utf-8") as f:
json.dump(state, f, indent=2)
except (FileNotFoundError, json.JSONDecodeError, PermissionError, OSError):
# Graceful degradation: fail silently
pass
# ─── Drift Event Logging ─────────────────────────────────────────────────────
def log_drift_event(spec_folder: str, file_path: str) -> None:
"""Append drift event to _drift/drift-events.log.
Every drift event is logged, even duplicates.
"""
try:
drift_dir = os.path.join(spec_folder, DRIFT_DIR_NAME)
log_path = os.path.join(drift_dir, DRIFT_EVENTS_LOG)
timestamp = datetime.now().isoformat()
log_entry = f"{timestamp} | {file_path}\n"
with open(log_path, "a", encoding="utf-8") as f:
f.write(log_entry)
except (OSError, IOError):
# Graceful degradation: fail silently
pass
# ─── File Path Comparison ────────────────────────────────────────────────────
def is_expected_file(file_path: str, state: dict) -> bool:
"""Check if file path is in Expected Files list.
Exact string match, case-sensitive, no path normalization.
"""
expected_files = state.get("expected_files", [])
return file_path in expected_files
def is_already_alerted(file_path: str, state: dict) -> bool:
"""Check if file has already been alerted in this session.
"""
alerted_files = state.get(ALERTED_FILES_FIELD, [])
return file_path in alerted_files
def emit_alert(file_path: str, state: dict) -> None:
"""Emit informational alert to stdout as JSON.
Alert format: [Drift Guard] Unplanned file detected: <path>
Active task: <task_id>
Expected files (partial): <first 5 files>
"""
task_id = state.get("task_id", "unknown")
expected_files = state.get("expected_files", [])
# Show first 5 expected files as context
preview = expected_files[:5]
preview_str = ", ".join(preview)
if len(expected_files) > 5:
preview_str += f", ... ({len(expected_files) - 5} more)"
message = (
f"[Drift Guard] Unplanned file detected: {file_path}\n"
f"[Drift Guard] Active task: {task_id}\n"
f"[Drift Guard] Expected files (partial): {preview_str}"
)
output = {"type": "notification", "message": message}
print(json.dumps(output))
# ─── Entry Point ─────────────────────────────────────────────────────────────
def main() -> None:
# 1. Parse input JSON
try:
input_data = json.load(sys.stdin)
except (json.JSONDecodeError, ValueError):
sys.exit(0) # Malformed input → silent exit
# 2. Validate hook event
if input_data.get("hook_event_name") != "PostToolUse":
sys.exit(0) # Wrong event → silent exit
tool_name = input_data.get("tool_name")
if tool_name not in ("Write", "Edit"):
sys.exit(0) # Not a file operation → silent exit
# 3. Extract file path from tool_input
tool_input = input_data.get("tool_input", {})
file_path = tool_input.get("file_path")
if not file_path:
sys.exit(0) # No file path → silent exit
# 4. Find and load state.json
cwd = input_data.get("cwd", os.getcwd())
state_path = find_state_file(cwd)
if not state_path:
# State not found → system not initialized, graceful degradation
sys.exit(0)
state = load_state(state_path)
if not state:
# Invalid state → graceful degradation
sys.exit(0)
# 5. Get spec folder for logging
spec_folder = str(Path(state_path).parent.parent)
# 6. Log drift event (always, even for expected files)
log_drift_event(spec_folder, file_path)
# 7. Check if file is expected
if is_expected_file(file_path, state):
# File is in expected list → silent exit
sys.exit(0)
# 8. Check if already alerted in this session
if is_already_alerted(file_path, state):
# Already alerted → silent exit (but still logged)
sys.exit(0)
# 9. Emit alert
emit_alert(file_path, state)
# 10. Update alerted_files to prevent duplicate alerts
update_alerted_files(state_path, file_path)
sys.exit(0) # Allow operation to proceed
if __name__ == "__main__":
main()docs
plugins
developer-kit-ai
developer-kit-aws
agents
docs
skills
aws
aws-cli-beast
aws-cost-optimization
aws-drawio-architecture-diagrams
aws-sam-bootstrap
aws-cloudformation
aws-cloudformation-auto-scaling
aws-cloudformation-bedrock
aws-cloudformation-cloudfront
aws-cloudformation-cloudwatch
aws-cloudformation-dynamodb
aws-cloudformation-ec2
aws-cloudformation-ecs
aws-cloudformation-elasticache
references
aws-cloudformation-iam
references
aws-cloudformation-lambda
aws-cloudformation-rds
aws-cloudformation-s3
aws-cloudformation-security
aws-cloudformation-task-ecs-deploy-gh
aws-cloudformation-vpc
references
developer-kit-core
agents
commands
skills
developer-kit-devops
developer-kit-java
agents
commands
docs
skills
aws-lambda-java-integration
aws-rds-spring-boot-integration
aws-sdk-java-v2-bedrock
aws-sdk-java-v2-core
aws-sdk-java-v2-dynamodb
aws-sdk-java-v2-kms
aws-sdk-java-v2-lambda
aws-sdk-java-v2-messaging
aws-sdk-java-v2-rds
aws-sdk-java-v2-s3
aws-sdk-java-v2-secrets-manager
clean-architecture
graalvm-native-image
langchain4j-ai-services-patterns
references
langchain4j-mcp-server-patterns
references
langchain4j-rag-implementation-patterns
references
langchain4j-spring-boot-integration
langchain4j-testing-strategies
langchain4j-tool-function-calling-patterns
langchain4j-vector-stores-configuration
references
qdrant
references
spring-ai-mcp-server-patterns
spring-boot-actuator
spring-boot-cache
spring-boot-crud-patterns
spring-boot-dependency-injection
spring-boot-event-driven-patterns
spring-boot-openapi-documentation
spring-boot-project-creator
spring-boot-resilience4j
spring-boot-rest-api-standards
spring-boot-saga-pattern
spring-boot-security-jwt
assets
references
scripts
spring-boot-test-patterns
spring-data-jpa
references
spring-data-neo4j
references
unit-test-application-events
unit-test-bean-validation
unit-test-boundary-conditions
unit-test-caching
unit-test-config-properties
references
unit-test-controller-layer
unit-test-exception-handler
references
unit-test-json-serialization
unit-test-mapper-converter
references
unit-test-parameterized
unit-test-scheduled-async
references
unit-test-service-layer
references
unit-test-utility-methods
unit-test-wiremock-rest-api
references
developer-kit-php
developer-kit-project-management
developer-kit-python
developer-kit-specs
commands
docs
hooks
test-templates
tests
skills
developer-kit-tools
developer-kit-typescript
agents
docs
hooks
rules
skills
aws-cdk
aws-lambda-typescript-integration
better-auth
clean-architecture
drizzle-orm-patterns
dynamodb-toolbox-patterns
references
nestjs
nestjs-best-practices
nestjs-code-review
nestjs-drizzle-crud-generator
nextjs-app-router
nextjs-authentication
nextjs-code-review
nextjs-data-fetching
nextjs-deployment
nextjs-performance
nx-monorepo
react-code-review
react-patterns
shadcn-ui
tailwind-css-patterns
tailwind-design-system
references
turborepo-monorepo
typescript-docs
typescript-security-review
zod-validation-utilities
references
github-spec-kit