Monitor and analyze ransomware group data leak sites (DLS) to track victim postings, extract threat intelligence on group tactics, and assess sector-specific ransomware risk for proactive defense.
66
58%
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Advisory
Suggest reviewing before use
Optimize this skill with Tessl
npx tessl skill review --optimize ./skills/analyzing-ransomware-leak-site-intelligence/SKILL.mdRansomware groups operating under double-extortion models maintain data leak sites (DLS) on Tor hidden services where they post victim names, stolen data samples, and countdown timers to pressure payment. In H1 2025, 96 unique ransomware groups were active, listing approximately 535 victims per month. Monitoring these sites provides intelligence on active threat groups, targeted sectors, geographic patterns, and emerging ransomware families. This skill covers safely collecting DLS intelligence, extracting structured data, tracking group activity trends, and producing sector-specific risk assessments.
requests, beautifulsoup4, pandas, matplotlib librariesModern ransomware groups encrypt victim data AND exfiltrate it before encryption. Leak sites serve as public pressure: victims are listed with a countdown timer, partial data samples, and file trees. If ransom is not paid, full data is published. Some groups have moved to triple extortion, adding DDoS threats or contacting victims' customers directly.
Leak sites provide: victim identification (company name, sector, country), attack timeline (when listed, deadline, data published), data volume estimates, group capability assessment (sectors targeted, attack frequency, operational tempo), and trend analysis (new groups emerging, groups rebranding, law enforcement takedowns).
Never directly access DLS sites in a production environment. Use purpose-built monitoring services (Ransomwatch, DarkFeed, KELA, Flashpoint), Tor-isolated research VMs, commercial threat intelligence platforms, or community-maintained datasets. All analysis should be conducted in isolated environments with proper authorization.
import requests
import json
import pandas as pd
from datetime import datetime, timedelta
from collections import Counter
class RansomwareIntelCollector:
"""Collect ransomware DLS intelligence from public tracking sources."""
RANSOMWATCH_API = "https://raw.githubusercontent.com/joshhighet/ransomwatch/main/posts.json"
RANSOMWATCH_GROUPS = "https://raw.githubusercontent.com/joshhighet/ransomwatch/main/groups.json"
def __init__(self):
self.posts = []
self.groups = []
def fetch_ransomwatch_data(self):
"""Fetch ransomware victim posts from ransomwatch."""
resp = requests.get(self.RANSOMWATCH_API, timeout=30)
if resp.status_code == 200:
self.posts = resp.json()
print(f"[+] Loaded {len(self.posts)} victim posts from ransomwatch")
else:
print(f"[-] Failed to fetch posts: {resp.status_code}")
resp = requests.get(self.RANSOMWATCH_GROUPS, timeout=30)
if resp.status_code == 200:
self.groups = resp.json()
print(f"[+] Loaded {len(self.groups)} ransomware group profiles")
return self.posts
def get_recent_victims(self, days=30):
"""Get victims posted in the last N days."""
cutoff = datetime.now() - timedelta(days=days)
recent = []
for post in self.posts:
try:
discovered = datetime.fromisoformat(
post.get("discovered", "").replace("Z", "+00:00")
)
if discovered.replace(tzinfo=None) >= cutoff:
recent.append(post)
except (ValueError, TypeError):
continue
print(f"[+] {len(recent)} victims in last {days} days")
return recent
def get_group_activity(self, group_name):
"""Get all posts by a specific ransomware group."""
group_posts = [
p for p in self.posts
if p.get("group_name", "").lower() == group_name.lower()
]
print(f"[+] {group_name}: {len(group_posts)} total victims")
return group_posts
collector = RansomwareIntelCollector()
collector.fetch_ransomwatch_data()
recent = collector.get_recent_victims(days=30)def analyze_group_trends(posts, top_n=15):
"""Analyze ransomware group activity trends."""
group_counts = Counter(p.get("group_name", "unknown") for p in posts)
monthly_activity = {}
for post in posts:
try:
date = datetime.fromisoformat(
post.get("discovered", "").replace("Z", "+00:00")
)
month_key = date.strftime("%Y-%m")
group = post.get("group_name", "unknown")
if month_key not in monthly_activity:
monthly_activity[month_key] = Counter()
monthly_activity[month_key][group] += 1
except (ValueError, TypeError):
continue
analysis = {
"total_posts": len(posts),
"unique_groups": len(group_counts),
"top_groups": group_counts.most_common(top_n),
"monthly_totals": {
month: sum(counts.values())
for month, counts in sorted(monthly_activity.items())
},
"monthly_top_groups": {
month: counts.most_common(5)
for month, counts in sorted(monthly_activity.items())
},
}
print(f"\n=== Ransomware Group Activity ===")
print(f"Total victims tracked: {analysis['total_posts']}")
print(f"Active groups: {analysis['unique_groups']}")
print(f"\nTop {top_n} Groups:")
for group, count in analysis["top_groups"]:
print(f" {group}: {count} victims")
return analysis
trends = analyze_group_trends(collector.posts)def assess_sector_risk(posts, target_sector=None, target_country=None):
"""Assess ransomware risk for specific sector or geography."""
sector_data = {}
country_data = {}
for post in posts:
# Extract sector if available (not all feeds include this)
sector = post.get("sector", post.get("industry", "unknown"))
country = post.get("country", "unknown")
if sector not in sector_data:
sector_data[sector] = {"count": 0, "groups": Counter(), "recent": []}
sector_data[sector]["count"] += 1
sector_data[sector]["groups"][post.get("group_name", "")] += 1
if country not in country_data:
country_data[country] = {"count": 0, "groups": Counter()}
country_data[country]["count"] += 1
country_data[country]["groups"][post.get("group_name", "")] += 1
# Sector risk scoring
total = len(posts)
risk_assessment = {
"total_victims": total,
"sectors": {},
"countries": {},
}
for sector, data in sorted(sector_data.items(), key=lambda x: -x[1]["count"]):
pct = (data["count"] / total * 100) if total > 0 else 0
risk_assessment["sectors"][sector] = {
"victim_count": data["count"],
"percentage": round(pct, 1),
"top_groups": data["groups"].most_common(5),
"risk_level": (
"critical" if pct > 15
else "high" if pct > 8
else "medium" if pct > 3
else "low"
),
}
for country, data in sorted(country_data.items(), key=lambda x: -x[1]["count"]):
pct = (data["count"] / total * 100) if total > 0 else 0
risk_assessment["countries"][country] = {
"victim_count": data["count"],
"percentage": round(pct, 1),
"top_groups": data["groups"].most_common(5),
}
return risk_assessment
risk = assess_sector_risk(collector.posts)def track_new_groups(posts, lookback_days=90):
"""Identify newly emerged ransomware groups."""
group_first_seen = {}
for post in posts:
group = post.get("group_name", "")
try:
date = datetime.fromisoformat(
post.get("discovered", "").replace("Z", "+00:00")
)
if group not in group_first_seen or date < group_first_seen[group]["first_seen"]:
group_first_seen[group] = {
"first_seen": date,
"first_victim": post.get("post_title", ""),
}
except (ValueError, TypeError):
continue
cutoff = datetime.now() - timedelta(days=lookback_days)
new_groups = {
group: info for group, info in group_first_seen.items()
if info["first_seen"].replace(tzinfo=None) >= cutoff
}
# Count total victims per new group
for group in new_groups:
victims = [p for p in posts if p.get("group_name") == group]
new_groups[group]["total_victims"] = len(victims)
new_groups[group]["avg_per_month"] = round(
len(victims) / max(1, lookback_days / 30), 1
)
print(f"\n=== New Groups (last {lookback_days} days) ===")
for group, info in sorted(new_groups.items(), key=lambda x: -x[1]["total_victims"]):
print(f" {group}: {info['total_victims']} victims, "
f"first seen {info['first_seen'].strftime('%Y-%m-%d')}")
return new_groups
new_groups = track_new_groups(collector.posts, lookback_days=90)def generate_ransomware_intel_report(trends, risk, new_groups):
"""Generate ransomware threat intelligence report."""
report = f"""# Ransomware Threat Intelligence Report
Generated: {datetime.now().isoformat()}
## Executive Summary
- **Total victims tracked**: {trends['total_posts']}
- **Active ransomware groups**: {trends['unique_groups']}
- **New groups (last 90 days)**: {len(new_groups)}
## Top Active Groups
| Rank | Group | Victims |
|------|-------|---------|
"""
for i, (group, count) in enumerate(trends["top_groups"][:10], 1):
report += f"| {i} | {group} | {count} |\n"
report += "\n## New Emerging Groups\n"
for group, info in sorted(new_groups.items(), key=lambda x: -x[1]["total_victims"])[:10]:
report += f"- **{group}**: {info['total_victims']} victims since {info['first_seen'].strftime('%Y-%m-%d')}\n"
report += "\n## Sector Risk Assessment\n"
report += "| Sector | Victims | % | Risk Level |\n|--------|---------|---|------------|\n"
for sector, data in list(risk["sectors"].items())[:10]:
report += f"| {sector} | {data['victim_count']} | {data['percentage']}% | {data['risk_level'].upper()} |\n"
report += """
## Recommendations
1. Monitor DLS feeds daily for your organization and supply chain partners
2. Prioritize patching vulnerabilities exploited by top active groups
3. Implement offline backup strategy to reduce extortion leverage
4. Conduct tabletop exercises for ransomware scenario response
5. Share indicators with sector ISACs and threat sharing communities
"""
with open("ransomware_intel_report.md", "w") as f:
f.write(report)
print("[+] Report saved: ransomware_intel_report.md")
return report
generate_ransomware_intel_report(trends, risk, new_groups)c15f73d
If you maintain this skill, you can claim it as your own. Once claimed, you can manage eval scenarios, bundle related skills, attach documentation or rules, and ensure cross-agent compatibility.