Build a new Nemesis file enrichment module end-to-end with explicit user approval gates for output mode, library choice, sample files, and integration testing.
73
60%
Does it follow best practices?
Impact
99%
1.41xAverage score across 3 eval scenarios
Risky
Do not use without reviewing
Optimize this skill with Tessl
npx tessl skill review --optimize ./.agents/skills/enrichment-module-builder/SKILL.mdThis skill guides the creation of new Nemesis enrichment modules from start to finish.
CRITICAL: At each user approval gate (Steps 2, 3, 4, and 8), prompt for explicit user approval before proceeding. Use request_user_input when available; otherwise ask a direct concise question in chat and wait for an explicit approval response. Do NOT continue past a gate until the user has explicitly approved.
Enrichment modules analyze files and extract security-relevant information like credentials, hashes, metadata, and indicators of compromise. This skill walks through the complete process:
Before starting, review:
libs/file_enrichment_modules/DEVELOPMENT_GUIDE.mdlibs/file_enrichment_modules/tests/harness/Use these 8 modules as implementation references - they cover all major patterns:
| Module | Detection Pattern | Key Feature |
|---|---|---|
pe | Magic + YARA | Complex parsing with lief |
yara | All files | YARA rule management |
chromium_cookies | Magic + YARA + filename | Database + DPAPI |
gitcredentials | Filename + plaintext | Simple text parsing |
group_policy_preferences | YARA + plaintext | XML + crypto |
container | is_container() | Archive handling |
keytab | Extension OR YARA | Binary struct parsing |
office_doc | Extension OR magic | Multi-format handling |
Paths: libs/file_enrichment_modules/file_enrichment_modules/{module_name}/
Gather requirements from the user:
Questions to ask:
Determine what the module should produce as output:
Findings Mode: The module extracts security-relevant data and generates findings
chromium_cookies, gitcredentials, group_policy_preferencesParsing-Only Mode: The module parses the file and stores structured data without generating findings
pe (extracts PE metadata), office_doc (extracts document metadata)Hybrid Mode: The module parses data AND generates findings for specific conditions
Format your recommendation:
## Module Output Mode for {file_type} Module
Based on the data to be extracted, I recommend:
### Recommended: {Findings Mode | Parsing-Only Mode | Hybrid Mode}
**Rationale:** {why this mode fits the use case}
### What this means:
- {description of what will be produced}
- {how data will be stored/displayed}
- {whether alerts will be generated}
### Alternative consideration:
{brief note on why other modes might or might not apply}
**Do you approve this output mode, or would you prefer a different approach?**STOP: Ask the user to approve one of the three output mode options (Findings Mode, Parsing-Only Mode, Hybrid Mode) before proceeding to Step 3. Prefer request_user_input when available.
Search for parsing libraries before implementation:
Search PyPI for relevant parsing libraries:
Search GitHub for reference implementations:
Evaluate options:
Format your recommendation:
## Library Recommendation for {file_type} Module
### Recommended: {library_name}
- **PyPI:** https://pypi.org/project/{library_name}/
- **GitHub:** {github_url}
- **Why:** {reasons - API quality, maintenance, features}
- **Downloads:** {monthly_downloads}
### Alternatives Considered:
1. {alt_library_1} - {why_not_chosen}
2. {alt_library_2} - {why_not_chosen}
### Manual Parsing
If no good library exists, we can implement manual parsing using:
- struct module for binary formats
- xml.etree for XML
- Regular expressions for text patterns
**Do you approve this library choice, or would you prefer an alternative?**STOP: Present the recommended library and alternatives and get explicit user approval before proceeding to Step 4. Prefer request_user_input when available.
Obtain test files for development and testing:
Public GitHub repos: Search for sample files (<100MB)
"{file_extension}" OR "{file_type} sample"Sample file repositories:
Generate synthetic files:
Format your recommendation:
## Sample File for {file_type} Module
### Source: {source_description}
- **URL/Location:** {url_or_path}
- **File:** {filename}
- **Size:** {size}
- **Why suitable:** {reasons}
### Alternative sources if needed:
1. {alt_source_1}
2. {alt_source_2}
### Synthetic generation (if no public samples):
{description of how to create test file}
**Do you approve this sample file source, or do you have an alternative?**STOP: Present sample file options and get explicit user approval before proceeding to Step 5. Prefer request_user_input when available.
Determine how should_process() will identify target files:
file command on sampleBased on analysis, select from:
If the file has distinctive binary signatures:
rule {file_type}_file {
meta:
description = "Detects {file_type} files"
strings:
$header = { XX XX XX XX } // Magic bytes
condition:
$header at 0
}Create the module structure:
mkdir -p libs/file_enrichment_modules/file_enrichment_modules/{module_name}Use this template, adapting based on the reference module that matches your pattern:
# enrichment_modules/{module_name}/analyzer.py
from common.logger import get_logger
from common.models import EnrichmentResult, FileObject, Finding, FindingCategory, FindingOrigin, Transform
from common.state_helpers import get_file_enriched_async
from common.storage import StorageS3
from file_enrichment_modules.module_loader import EnrichmentModule
logger = get_logger(__name__)
class {ModuleName}Analyzer(EnrichmentModule):
name: str = "{module_name}_analyzer"
dependencies: list[str] = []
def __init__(self):
self.storage = StorageS3()
self.asyncpg_pool = None
self.workflows = ["default"]
async def should_process(self, object_id: str, file_path: str | None = None) -> bool:
"""Determine if this module should process the file."""
file_enriched = await get_file_enriched_async(object_id, self.asyncpg_pool)
# TODO: Implement detection logic
return False
def _analyze_file(self, file_path: str, file_enriched) -> EnrichmentResult | None:
"""Analyze the file and extract data."""
result = EnrichmentResult(module_name=self.name, dependencies=self.dependencies)
try:
# TODO: Implement parsing logic
# Create findings if relevant data found
# Create transforms for derived files
return result
except Exception:
logger.exception(message=f"Error analyzing {file_enriched.file_name}")
return None
async def process(self, object_id: str, file_path: str | None = None) -> EnrichmentResult | None:
"""Process the file."""
try:
file_enriched = await get_file_enriched_async(object_id, self.asyncpg_pool)
if file_path:
return self._analyze_file(file_path, file_enriched)
else:
with self.storage.download(object_id) as temp_file:
return self._analyze_file(temp_file.name, file_enriched)
except Exception:
logger.exception(message="Error in process()")
return None
def create_enrichment_module() -> EnrichmentModule:
return {ModuleName}Analyzer()[project]
name = "{module_name}"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = [
"{library_name}>=X.Y.Z",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"Create rules.yar with detection rules.
Create and run tests using the test harness:
# tests/test_{module_name}.py
import pytest
from tests.harness import ModuleTestHarness, FileEnrichedFactory
from file_enrichment_modules.{module_name}.analyzer import {ModuleName}Analyzer
class Test{ModuleName}Analyzer:
"""Tests for {ModuleName}Analyzer."""
@pytest.mark.asyncio
async def test_should_process_target_file(self):
"""Test that should_process returns True for target files."""
harness = ModuleTestHarness()
harness.register_file(
object_id="test-uuid",
local_path="/path/to/sample/file",
file_enriched=FileEnrichedFactory.create(
object_id="test-uuid",
file_name="sample.ext",
magic_type="expected magic type",
# ... other fields
),
)
async with harness.create_module({ModuleName}Analyzer) as module:
result = await module.should_process("test-uuid")
assert result is True
@pytest.mark.asyncio
async def test_should_not_process_unrelated_file(self):
"""Test that should_process returns False for unrelated files."""
harness = ModuleTestHarness()
harness.register_file(
object_id="test-uuid",
local_path="/path/to/unrelated/file",
file_enriched=FileEnrichedFactory.create_plaintext_file(
object_id="test-uuid",
file_name="readme.txt",
),
)
async with harness.create_module({ModuleName}Analyzer) as module:
result = await module.should_process("test-uuid")
assert result is False
@pytest.mark.asyncio
async def test_process_extracts_expected_data(self):
"""Test that process extracts the expected data."""
harness = ModuleTestHarness()
harness.register_file(
object_id="test-uuid",
local_path="/path/to/sample/file",
file_enriched=FileEnrichedFactory.create(...),
)
async with harness.create_module({ModuleName}Analyzer) as module:
result = await module.process("test-uuid")
assert result is not None
assert result.module_name == "{module_name}_analyzer"
# Assert on findings
# Assert on transforms
# Assert on resultsGuidelines:
cd libs/file_enrichment_modules
uv run pytest tests/test_{module_name}.py -vshould_process() returns True for target filesshould_process() returns False for non-target filesprocess() extracts expected dataThis step is REQUIRED. You MUST execute the E2E integration test, not just print instructions.
Before proceeding, ask the user to confirm their Nemesis instance is ready:
## Integration Testing Ready Check
The module implementation and unit tests are complete. Now we need to run end-to-end integration testing against a live Nemesis instance.
**Please confirm:**
1. Is Nemesis dev environment running? (Start with: `./tools/nemesis-ctl.sh start dev`)
2. What is the Nemesis host? (default: `localhost:7443`)
Once confirmed, I will:
1. Verify the Nemesis instance is healthy
2. Submit a test file to the running instance
3. Wait for enrichment processing to complete
4. Query the database to verify results
5. Report the E2E test outcome
**Reply with the host (or press enter for localhost:7443) to proceed with integration testing.**STOP: Confirm Nemesis is running and capture the target host before proceeding with E2E testing. Prefer request_user_input when available.
Once the user confirms Nemesis is running, execute these steps IN ORDER:
Run a health check against the provided host. Note: The endpoint may return 401 Unauthorized if auth is required, which still indicates Nemesis is running:
curl -k -s "https://{host}/api/health" | head -20A response (even 401 Unauthorized) indicates Nemesis is running. If connection refused, ask the user to start Nemesis.
Verify the new module appears in the file-enrichment container logs:
docker compose logs file-enrichment 2>&1 | grep -i "{module_name}" | tail -10Look for successful module loading. If not found, check for import errors.
Use the test fixture file created during standalone testing. Execute the submission:
./tools/submit.sh {path_to_test_fixture_file} \
-h {host} \
-u n -p n \
-j test-project \
--debugCapture the object_id from the submission output - you will need it to verify results.
Wait for enrichment to complete. First, get the object_id for the submitted file:
# Wait a few seconds for processing
sleep 10
# Get the object_id for the submitted file
# IMPORTANT: Database is 'enrichment', user is 'nemesis', use container name 'nemesis-postgres-1'
docker exec nemesis-postgres-1 psql -U nemesis -d enrichment -c \
"SELECT object_id, file_name FROM files_enriched WHERE file_name = '{submitted_filename}' ORDER BY timestamp DESC LIMIT 1;"Query the database to confirm the module produced expected output.
IMPORTANT Database Connection Details:
nemesis-postgres-1enrichment (NOT nemesis)nemesis (NOT postgres)# Check enrichment record exists for the module
docker exec nemesis-postgres-1 psql -U nemesis -d enrichment -c \
"SELECT module_name, created_at FROM enrichments WHERE object_id = '{object_id}' ORDER BY created_at DESC;"
# Check findings were created (if applicable)
# Note: Use 'finding_id' not 'id', and 'finding_name' not 'value'
docker exec nemesis-postgres-1 psql -U nemesis -d enrichment -c \
"SELECT finding_id, category, severity, finding_name, origin_name FROM findings WHERE origin_name = '{module_name}_analyzer' ORDER BY created_at DESC LIMIT 10;"Schema Reference:
enrichments table columns: enrichment_id, object_id, module_name, result_data, created_at, updated_atfindings table columns: finding_id, finding_name, category, severity, object_id, origin_type, origin_name, raw_data, data, created_at, updated_at, triage_idAfter executing the above steps, report the E2E test outcome to the user:
## E2E Integration Test Results
### Status: {PASS | FAIL}
### Verification Steps:
- [ ] Nemesis health check: {PASS/FAIL}
- [ ] Module loaded in file-enrichment: {PASS/FAIL}
- [ ] File submission successful: {PASS/FAIL}
- [ ] Enrichment record created: {PASS/FAIL}
- [ ] Findings created (if applicable): {PASS/FAIL - count: N}
- [ ] No errors in logs: {PASS/FAIL}
### Details:
{Summary of what was found, any errors encountered}
### Object ID: {object_id}If any step fails, provide troubleshooting guidance and offer to re-run after the user fixes the issue.
Before considering the module complete, ALL items must be checked:
IMPORTANT: Do NOT mark the module as complete until Step 8 E2E integration testing has been executed and passed.
The most common issue is using wrong connection parameters. Use these exact values:
# Correct connection command
docker exec nemesis-postgres-1 psql -U nemesis -d enrichment -c "YOUR_QUERY"
# Common mistakes:
# - Using $(docker compose ps -q postgres) instead of nemesis-postgres-1
# - Using -d nemesis instead of -d enrichment
# - Using -U postgres instead of -U nemesisTo list available databases:
docker exec nemesis-postgres-1 psql -U nemesis -lTo check table schemas:
docker exec nemesis-postgres-1 psql -U nemesis -d enrichment -c "\d enrichments"
docker exec nemesis-postgres-1 psql -U nemesis -d enrichment -c "\d findings"create_enrichment_module() function exists432d081
If you maintain this skill, you can claim it as your own. Once claimed, you can manage eval scenarios, bundle related skills, attach documentation or rules, and ensure cross-agent compatibility.