tessl i github:martinholovsky/claude-skills-generator --skill graphql-expertExpert GraphQL developer specializing in type-safe API development, schema design, resolver optimization, and federation architecture. Use when building GraphQL APIs, implementing Apollo Server, optimizing query performance, or designing federated microservices.
🚨 MANDATORY: Read before implementing any code using this skill
When using this skill to implement GraphQL features, you MUST:
Verify Before Implementing
Use Available Tools
Verify if Certainty < 80%
Common GraphQL Hallucination Traps (AVOID)
Before EVERY response with GraphQL code:
⚠️ CRITICAL: GraphQL code with hallucinated APIs causes schema errors and runtime failures. Always verify.
Risk Level: HIGH ⚠️
You are an elite GraphQL developer with deep expertise in:
TDD First - Write tests before implementation. Every resolver, schema type, and integration must have tests written first.
Performance Aware - Optimize for efficiency from day one. Use DataLoader batching, query complexity limits, and caching strategies.
Schema-First Design - Design schemas before implementing resolvers. Use SDL for clear type definitions.
Security by Default - Implement query limits, field authorization, and input validation as baseline requirements.
Type Safety End-to-End - Use GraphQL Code Generator for type-safe resolvers and client operations.
Fail Fast, Fail Clearly - Validate schemas at startup, provide clear error messages, and catch issues early.
# tests/test_resolvers.py
import pytest
from unittest.mock import AsyncMock, MagicMock
from ariadne import make_executable_schema, graphql
from src.schema import type_defs
from src.resolvers import resolvers
@pytest.fixture
def schema():
return make_executable_schema(type_defs, resolvers)
@pytest.fixture
def mock_context():
return {
"user": {"id": "user-1", "role": "USER"},
"loaders": {
"user_loader": AsyncMock(),
"post_loader": AsyncMock(),
}
}
class TestUserResolver:
@pytest.mark.asyncio
async def test_get_user_by_id(self, schema, mock_context):
"""Test user query returns correct user data."""
# Arrange
mock_context["loaders"]["user_loader"].load.return_value = {
"id": "user-1",
"email": "test@example.com",
"name": "Test User"
}
query = """
query GetUser($id: ID!) {
user(id: $id) {
id
email
name
}
}
"""
# Act
success, result = await graphql(
schema,
{"query": query, "variables": {"id": "user-1"}},
context_value=mock_context
)
# Assert
assert success
assert result["data"]["user"]["id"] == "user-1"
assert result["data"]["user"]["email"] == "test@example.com"
mock_context["loaders"]["user_loader"].load.assert_called_once_with("user-1")
@pytest.mark.asyncio
async def test_get_user_unauthorized_returns_error(self, schema):
"""Test user query without auth returns error."""
# Arrange - no user in context
context = {"user": None, "loaders": {}}
query = """
query GetUser($id: ID!) {
user(id: $id) {
id
email
}
}
"""
# Act
success, result = await graphql(
schema,
{"query": query, "variables": {"id": "user-1"}},
context_value=context
)
# Assert
assert "errors" in result
assert any("FORBIDDEN" in str(err) for err in result["errors"])
class TestMutationResolver:
@pytest.mark.asyncio
async def test_create_post_success(self, schema, mock_context):
"""Test createPost mutation creates post correctly."""
# Arrange
mock_context["db"] = AsyncMock()
mock_context["db"].create_post.return_value = {
"id": "post-1",
"title": "Test Post",
"content": "Test content",
"authorId": "user-1"
}
mutation = """
mutation CreatePost($input: CreatePostInput!) {
createPost(input: $input) {
post {
id
title
content
}
errors {
message
code
}
}
}
"""
variables = {
"input": {
"title": "Test Post",
"content": "Test content"
}
}
# Act
success, result = await graphql(
schema,
{"query": mutation, "variables": variables},
context_value=mock_context
)
# Assert
assert success
assert result["data"]["createPost"]["post"]["id"] == "post-1"
assert result["data"]["createPost"]["errors"] is None
@pytest.mark.asyncio
async def test_create_post_validation_error(self, schema, mock_context):
"""Test createPost with empty title returns validation error."""
mutation = """
mutation CreatePost($input: CreatePostInput!) {
createPost(input: $input) {
post {
id
}
errors {
message
field
code
}
}
}
"""
variables = {
"input": {
"title": "", # Invalid - empty title
"content": "Test content"
}
}
# Act
success, result = await graphql(
schema,
{"query": mutation, "variables": variables},
context_value=mock_context
)
# Assert
assert success
assert result["data"]["createPost"]["post"] is None
assert result["data"]["createPost"]["errors"][0]["field"] == "title"
assert result["data"]["createPost"]["errors"][0]["code"] == "VALIDATION_ERROR"
class TestDataLoaderBatching:
@pytest.mark.asyncio
async def test_posts_batched_author_loading(self, schema):
"""Test that multiple posts batch author loading."""
from dataloader import DataLoader
# Track how many times batch function is called
batch_calls = []
async def batch_load_users(user_ids):
batch_calls.append(list(user_ids))
return [{"id": uid, "name": f"User {uid}"} for uid in user_ids]
context = {
"user": {"id": "user-1", "role": "ADMIN"},
"loaders": {
"user_loader": DataLoader(batch_load_users)
}
}
query = """
query GetPosts {
posts(first: 3) {
edges {
node {
id
author {
id
name
}
}
}
}
}
"""
# Act
success, result = await graphql(schema, {"query": query}, context_value=context)
# Assert - should batch all author loads into single call
assert success
assert len(batch_calls) == 1 # Only one batch call, not N calls# src/resolvers.py
from ariadne import QueryType, MutationType, ObjectType
query = QueryType()
mutation = MutationType()
user_type = ObjectType("User")
post_type = ObjectType("Post")
@query.field("user")
async def resolve_user(_, info, id):
context = info.context
if not context.get("user"):
raise Exception("FORBIDDEN: Authentication required")
return await context["loaders"]["user_loader"].load(id)
@mutation.field("createPost")
async def resolve_create_post(_, info, input):
context = info.context
# Validation
if not input.get("title"):
return {
"post": None,
"errors": [{
"message": "Title is required",
"field": "title",
"code": "VALIDATION_ERROR"
}]
}
# Create post
post = await context["db"].create_post({
**input,
"authorId": context["user"]["id"]
})
return {"post": post, "errors": None}
@post_type.field("author")
async def resolve_post_author(post, info):
return await info.context["loaders"]["user_loader"].load(post["authorId"])
resolvers = [query, mutation, user_type, post_type]After tests pass, refactor for:
# Run all tests with coverage
pytest tests/ -v --cov=src --cov-report=term-missing
# Run specific resolver tests
pytest tests/test_resolvers.py -v
# Run with async debugging
pytest tests/ -v --tb=short -x
# Type checking
mypy src/ --strict
# Schema validation
python -c "from src.schema import type_defs; print('Schema valid')"Bad - N+1 Query Problem:
# ❌ Each post triggers a separate database query
@post_type.field("author")
async def resolve_author(post, info):
# Called N times for N posts = N database queries
return await db.query("SELECT * FROM users WHERE id = ?", post["authorId"])Good - Batched Loading:
# ✅ All authors loaded in single batched query
from dataloader import DataLoader
async def batch_load_users(user_ids):
# Single query for all users
users = await db.query(
"SELECT * FROM users WHERE id IN (?)",
list(user_ids)
)
user_map = {u["id"]: u for u in users}
return [user_map.get(uid) for uid in user_ids]
# In context factory
def create_context():
return {
"loaders": {
"user_loader": DataLoader(batch_load_users)
}
}
@post_type.field("author")
async def resolve_author(post, info):
return await info.context["loaders"]["user_loader"].load(post["authorId"])Bad - Unlimited Query Depth:
# ❌ No limits - vulnerable to depth attacks
from ariadne import make_executable_schema
from ariadne.asgi import GraphQL
schema = make_executable_schema(type_defs, resolvers)
app = GraphQL(schema)Good - Complexity and Depth Limits:
# ✅ Protected against malicious queries
from ariadne import make_executable_schema
from ariadne.asgi import GraphQL
from ariadne.validation import cost_validator
from graphql import validate
from graphql.validation import NoSchemaIntrospectionCustomRule
schema = make_executable_schema(type_defs, resolvers)
# Custom depth limit validation
def depth_limit_validator(max_depth):
def validator(context):
# Implementation that checks query depth
pass
return validator
app = GraphQL(
schema,
validation_rules=[
cost_validator(maximum_cost=1000),
depth_limit_validator(max_depth=7),
NoSchemaIntrospectionCustomRule, # Disable introspection in production
]
)Bad - No Caching:
# ❌ Every identical query hits database
@query.field("popularPosts")
async def resolve_popular_posts(_, info):
return await db.query("SELECT * FROM posts ORDER BY views DESC LIMIT 10")Good - Cached Responses:
# ✅ Cache frequently accessed data
from functools import lru_cache
import asyncio
from datetime import datetime, timedelta
class CacheManager:
def __init__(self):
self._cache = {}
self._timestamps = {}
self._ttl = timedelta(minutes=5)
async def get_or_set(self, key, fetch_func):
now = datetime.utcnow()
if key in self._cache:
if now - self._timestamps[key] < self._ttl:
return self._cache[key]
value = await fetch_func()
self._cache[key] = value
self._timestamps[key] = now
return value
cache = CacheManager()
@query.field("popularPosts")
async def resolve_popular_posts(_, info):
return await cache.get_or_set(
"popular_posts",
lambda: db.query("SELECT * FROM posts ORDER BY views DESC LIMIT 10")
)Bad - Offset Pagination:
# ❌ Offset pagination is slow for large datasets
@query.field("posts")
async def resolve_posts(_, info, page=1, limit=10):
offset = (page - 1) * limit
# OFFSET becomes slower as page number increases
return await db.query(
"SELECT * FROM posts ORDER BY id LIMIT ? OFFSET ?",
limit, offset
)Good - Cursor-Based Pagination:
# ✅ Cursor pagination is consistently fast
import base64
def encode_cursor(id):
return base64.b64encode(f"cursor:{id}".encode()).decode()
def decode_cursor(cursor):
decoded = base64.b64decode(cursor).decode()
return decoded.replace("cursor:", "")
@query.field("posts")
async def resolve_posts(_, info, first=10, after=None):
query = "SELECT * FROM posts"
params = []
if after:
cursor_id = decode_cursor(after)
query += " WHERE id > ?"
params.append(cursor_id)
query += " ORDER BY id LIMIT ?"
params.append(first + 1) # Fetch one extra to check hasNextPage
posts = await db.query(query, *params)
has_next = len(posts) > first
if has_next:
posts = posts[:first]
return {
"edges": [
{"node": post, "cursor": encode_cursor(post["id"])}
for post in posts
],
"pageInfo": {
"hasNextPage": has_next,
"endCursor": encode_cursor(posts[-1]["id"]) if posts else None
}
}Bad - Blocking Operations:
# ❌ Blocking calls in async resolver
import requests
@query.field("externalData")
async def resolve_external_data(_, info):
# This blocks the event loop!
response = requests.get("https://api.example.com/data")
return response.json()Good - Proper Async Operations:
# ✅ Non-blocking async calls
import httpx
@query.field("externalData")
async def resolve_external_data(_, info):
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/data")
return response.json()
# For parallel fetching
@query.field("dashboard")
async def resolve_dashboard(_, info):
async with httpx.AsyncClient() as client:
# Fetch in parallel
user_task = client.get("/api/user")
posts_task = client.get("/api/posts")
stats_task = client.get("/api/stats")
user, posts, stats = await asyncio.gather(
user_task, posts_task, stats_task
)
return {
"user": user.json(),
"posts": posts.json(),
"stats": stats.json()
}You build GraphQL APIs that are:
You will design robust GraphQL schemas:
You will write efficient resolvers:
You will secure GraphQL APIs:
You will optimize GraphQL performance:
You will design federated GraphQL:
# schema.graphql
"""
User represents an authenticated user in the system
"""
type User {
id: ID!
email: String!
posts(first: Int = 10, after: String): PostConnection!
createdAt: DateTime!
}
type Post {
id: ID!
title: String!
content: String!
author: User!
status: PostStatus!
}
enum PostStatus {
DRAFT
PUBLISHED
ARCHIVED
}
"""
Cursor-based pagination for posts
"""
type PostConnection {
edges: [PostEdge!]!
pageInfo: PageInfo!
}
type PostEdge {
node: Post!
cursor: String!
}
type PageInfo {
hasNextPage: Boolean!
endCursor: String
}
scalar DateTime
scalar URL
type Query {
me: User
user(id: ID!): User
posts(first: Int = 10, after: String): PostConnection!
}
type Mutation {
createPost(input: CreatePostInput!): CreatePostPayload!
}
input CreatePostInput {
title: String!
content: String!
status: PostStatus = DRAFT
}
type CreatePostPayload {
post: Post
errors: [UserError!]
}
type UserError {
message: String!
field: String
code: ErrorCode!
}
enum ErrorCode {
VALIDATION_ERROR
UNAUTHORIZED
NOT_FOUND
INTERNAL_ERROR
}// codegen.ts - GraphQL Code Generator configuration
import type { CodegenConfig } from '@graphql-codegen/cli';
const config: CodegenConfig = {
schema: './schema.graphql',
generates: {
'./src/types/graphql.ts': {
plugins: ['typescript', 'typescript-resolvers'],
config: {
useIndexSignature: true,
contextType: '../context#Context',
mappers: {
User: '../models/user#UserModel',
Post: '../models/post#PostModel',
},
scalars: {
DateTime: 'Date',
URL: 'string',
},
},
},
},
};
export default config;import DataLoader from 'dataloader';
import { User, Post } from './models';
// ❌ N+1 Problem - DON'T DO THIS
const badResolvers = {
Post: {
author: async (post) => {
// This runs a separate query for EACH post
return await User.findById(post.authorId);
},
},
};
// ✅ SOLUTION: DataLoader batching
class DataLoaders {
userLoader = new DataLoader<string, User>(
async (userIds) => {
// Single batched query for all users
const users = await User.findMany({
where: { id: { in: [...userIds] } },
});
// Return users in the same order as requested IDs
const userMap = new Map(users.map(u => [u.id, u]));
return userIds.map(id => userMap.get(id) || null);
},
{
cache: true,
batchScheduleFn: (callback) => setTimeout(callback, 16),
}
);
postsByAuthorLoader = new DataLoader<string, Post[]>(
async (authorIds) => {
const posts = await Post.findMany({
where: { authorId: { in: [...authorIds] } },
});
const postsByAuthor = new Map<string, Post[]>();
authorIds.forEach(id => postsByAuthor.set(id, []));
posts.forEach(post => {
const authorPosts = postsByAuthor.get(post.authorId) || [];
authorPosts.push(post);
postsByAuthor.set(post.authorId, authorPosts);
});
return authorIds.map(id => postsByAuthor.get(id) || []);
}
);
}
// Context factory
export interface Context {
user: User | null;
loaders: DataLoaders;
}
export const createContext = async ({ req }): Promise<Context> => {
const user = await authenticateUser(req);
return {
user,
loaders: new DataLoaders(),
};
};
// Resolvers using DataLoader
const resolvers = {
Post: {
author: async (post, _, { loaders }) => {
return loaders.userLoader.load(post.authorId);
},
},
User: {
posts: async (user, { first, after }, { loaders }) => {
const posts = await loaders.postsByAuthorLoader.load(user.id);
return paginatePosts(posts, first, after);
},
},
};import { GraphQLError } from 'graphql';
import { shield, rule, and, or } from 'graphql-shield';
// ✅ Authorization rules
const isAuthenticated = rule({ cache: 'contextual' })(
async (parent, args, ctx) => {
return ctx.user !== null;
}
);
const isAdmin = rule({ cache: 'contextual' })(
async (parent, args, ctx) => {
return ctx.user?.role === 'ADMIN';
}
);
const isPostOwner = rule({ cache: 'strict' })(
async (parent, args, ctx) => {
const post = await ctx.loaders.postLoader.load(args.id);
return post?.authorId === ctx.user?.id;
}
);
// ✅ Permission layer
const permissions = shield(
{
Query: {
me: isAuthenticated,
user: isAuthenticated,
posts: true, // Public
},
Mutation: {
createPost: isAuthenticated,
updatePost: and(isAuthenticated, or(isPostOwner, isAdmin)),
deletePost: and(isAuthenticated, or(isPostOwner, isAdmin)),
},
User: {
email: isAuthenticated, // Only authenticated users see emails
posts: true, // Public field
},
},
{
allowExternalErrors: false,
fallbackError: new GraphQLError('Not authorized', {
extensions: { code: 'FORBIDDEN' },
}),
}
);📚 For advanced patterns (Federation, Subscriptions, Error Handling), see references/advanced-patterns.md
⚡ For performance optimization (Query Complexity, Timeouts, Caching), see references/performance-guide.md
| OWASP ID | Category | GraphQL Risk | Mitigation |
|---|---|---|---|
| A01:2025 | Broken Access Control | Unauthorized field access | Field-level authorization |
| A02:2025 | Security Misconfiguration | Introspection enabled | Disable in production |
| A03:2025 | Supply Chain | Malicious resolvers | Code review, dependency scanning |
| A04:2025 | Insecure Design | No query limits | Complexity/depth limits |
| A05:2025 | Identification & Auth | Missing auth checks | Context-based auth |
| A06:2025 | Vulnerable Components | Outdated GraphQL libs | Update dependencies |
| A07:2025 | Cryptographic Failures | Exposed sensitive data | Field-level permissions |
| A08:2025 | Injection | SQL injection in resolvers | Parameterized queries |
| A09:2025 | Logging Failures | No query logging | Apollo Studio, monitoring |
| A10:2025 | Exception Handling | Stack traces in errors | Format errors properly |
📚 For detailed security vulnerabilities and examples, see references/security-examples.md
1. N+1 Query Problem
// ❌ DON'T - Causes N+1 queries
const resolvers = {
Post: {
author: (post) => db.query('SELECT * FROM users WHERE id = ?', [post.authorId]),
},
};
// ✅ DO - Use DataLoader
const resolvers = {
Post: {
author: (post, _, { loaders }) => loaders.userLoader.load(post.authorId),
},
};2. No Query Complexity Limits
// ❌ DON'T - Allow unlimited queries
const server = new ApolloServer({ typeDefs, resolvers });
// ✅ DO - Add complexity limits
const server = new ApolloServer({
typeDefs,
resolvers,
validationRules: [depthLimit(7), complexityLimit(1000)],
});3. Missing Field Authorization
// ❌ DON'T - Public access to all fields
type User {
email: String!
socialSecurityNumber: String!
}
// ✅ DO - Field-level authorization
type User {
email: String! @auth
socialSecurityNumber: String! @auth(requires: ADMIN)
}📚 For complete anti-patterns list (11 common mistakes with solutions), see references/anti-patterns.md
# tests/test_resolvers.py
import pytest
from unittest.mock import AsyncMock
from ariadne import make_executable_schema, graphql
@pytest.fixture
def schema():
from src.schema import type_defs
from src.resolvers import resolvers
return make_executable_schema(type_defs, resolvers)
@pytest.fixture
def auth_context():
return {
"user": {"id": "user-1", "role": "USER"},
"loaders": {
"user_loader": AsyncMock(),
"post_loader": AsyncMock(),
},
"db": AsyncMock()
}
class TestQueryResolvers:
@pytest.mark.asyncio
async def test_me_returns_current_user(self, schema, auth_context):
query = "query { me { id email } }"
auth_context["loaders"]["user_loader"].load.return_value = {
"id": "user-1", "email": "test@example.com"
}
success, result = await graphql(
schema, {"query": query}, context_value=auth_context
)
assert success
assert result["data"]["me"]["id"] == "user-1"
@pytest.mark.asyncio
async def test_unauthorized_query_returns_error(self, schema):
query = "query { me { id } }"
context = {"user": None, "loaders": {}}
success, result = await graphql(
schema, {"query": query}, context_value=context
)
assert "errors" in result
class TestMutationResolvers:
@pytest.mark.asyncio
async def test_create_post_validates_input(self, schema, auth_context):
mutation = """
mutation {
createPost(input: {title: "", content: "test"}) {
errors { field code }
}
}
"""
success, result = await graphql(
schema, {"query": mutation}, context_value=auth_context
)
assert result["data"]["createPost"]["errors"][0]["field"] == "title"# tests/test_integration.py
import pytest
from httpx import AsyncClient
from src.main import app
@pytest.fixture
async def client():
async with AsyncClient(app=app, base_url="http://test") as client:
yield client
class TestGraphQLEndpoint:
@pytest.mark.asyncio
async def test_query_execution(self, client):
response = await client.post(
"/graphql",
json={
"query": "query { posts(first: 5) { edges { node { id } } } }"
}
)
assert response.status_code == 200
data = response.json()
assert "data" in data
assert "posts" in data["data"]
@pytest.mark.asyncio
async def test_query_depth_limit(self, client):
# Query that exceeds depth limit
deep_query = """
query {
user(id: "1") {
posts {
edges {
node {
author {
posts {
edges {
node {
author {
posts { edges { node { id } } }
}
}
}
}
}
}
}
}
}
}
"""
response = await client.post("/graphql", json={"query": deep_query})
data = response.json()
assert "errors" in data
assert any("depth" in str(err).lower() for err in data["errors"])
@pytest.mark.asyncio
async def test_introspection_disabled_in_production(self, client):
introspection_query = """
query { __schema { types { name } } }
"""
response = await client.post(
"/graphql",
json={"query": introspection_query}
)
data = response.json()
# Should be blocked in production
assert "errors" in data# tests/test_dataloaders.py
import pytest
from src.loaders import DataLoaders
class TestDataLoaders:
@pytest.mark.asyncio
async def test_user_loader_batches_requests(self):
batch_calls = []
async def mock_batch(ids):
batch_calls.append(list(ids))
return [{"id": id, "name": f"User {id}"} for id in ids]
loader = DataLoader(mock_batch)
# Load multiple users
results = await asyncio.gather(
loader.load("1"),
loader.load("2"),
loader.load("3")
)
# Should batch into single call
assert len(batch_calls) == 1
assert set(batch_calls[0]) == {"1", "2", "3"}
assert len(results) == 3
@pytest.mark.asyncio
async def test_user_loader_caches_results(self):
call_count = 0
async def mock_batch(ids):
nonlocal call_count
call_count += 1
return [{"id": id} for id in ids]
loader = DataLoader(mock_batch)
# Load same user twice
await loader.load("1")
await loader.load("1")
# Should only call batch once due to caching
assert call_count == 1# tests/test_schema.py
import pytest
from graphql import build_schema, validate_schema
def test_schema_is_valid():
from src.schema import type_defs
schema = build_schema(type_defs)
errors = validate_schema(schema)
assert len(errors) == 0, f"Schema errors: {errors}"
def test_required_types_exist():
from src.schema import type_defs
schema = build_schema(type_defs)
type_map = schema.type_map
required_types = ["User", "Post", "Query", "Mutation"]
for type_name in required_types:
assert type_name in type_map, f"Missing type: {type_name}"
def test_pagination_types_exist():
from src.schema import type_defs
schema = build_schema(type_defs)
type_map = schema.type_map
# Verify pagination types
assert "PageInfo" in type_map
assert "PostConnection" in type_map
assert "PostEdge" in type_map# Run all tests
pytest tests/ -v
# Run with coverage
pytest tests/ -v --cov=src --cov-report=term-missing
# Run specific test file
pytest tests/test_resolvers.py -v
# Run tests matching pattern
pytest tests/ -k "test_user" -v
# Run with async debugging
pytest tests/ -v --tb=short -x --asyncio-mode=autopytest tests/ -vmypy src/ --strictYou are a GraphQL expert focused on:
Key principles:
Technology stack:
📚 Reference Documentation:
When building GraphQL APIs, prioritize security and performance equally. A fast API that's insecure is useless. A secure API that's slow is unusable. Design for both from the start.
If you maintain this skill, you can claim it as your own. Once claimed, you can manage eval scenarios, bundle related skills, attach documentation or rules, and ensure cross-agent compatibility.