Comprehensive developer toolkit providing reusable skills for Java/Spring Boot, TypeScript/NestJS/React/Next.js, Python, PHP, AWS CloudFormation, AI/RAG, DevOps, and more.
82
82%
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Risky
Do not use without reviewing
This document provides comprehensive implementation guidance for building effective chunking systems.
Document Processor
├── Ingestion Layer
│ ├── Document Type Detection
│ ├── Format Parsing (PDF, HTML, Markdown, etc.)
│ └── Content Extraction
├── Analysis Layer
│ ├── Structure Analysis
│ ├── Content Type Identification
│ └── Complexity Assessment
├── Strategy Selection Layer
│ ├── Rule-based Selection
│ ├── ML-based Prediction
│ └── Adaptive Configuration
├── Chunking Layer
│ ├── Strategy Implementation
│ ├── Parameter Optimization
│ └── Quality Validation
└── Output Layer
├── Chunk Metadata Generation
├── Embedding Integration
└── Storage Preparationfrom dataclasses import dataclass
from typing import List, Dict, Any
import re
@dataclass
class DocumentAnalysis:
doc_type: str
structure_score: float # 0-1, higher means more structured
complexity_score: float # 0-1, higher means more complex
content_types: List[str]
language: str
estimated_tokens: int
has_multimodal: bool
class DocumentAnalyzer:
def __init__(self):
self.structure_patterns = {
'markdown': [r'^#+\s', r'^\*\*.*\*\*$', r'^\* ', r'^\d+\. '],
'html': [r'<h[1-6]>', r'<p>', r'<div>', r'<table>'],
'latex': [r'\\section', r'\\subsection', r'\\begin\{', r'\\end\{'],
'academic': [r'^\d+\.', r'^\d+\.\d+', r'^[A-Z]\.', r'^Figure \d+']
}
def analyze(self, content: str) -> DocumentAnalysis:
doc_type = self.detect_document_type(content)
structure_score = self.calculate_structure_score(content, doc_type)
complexity_score = self.calculate_complexity_score(content)
content_types = self.identify_content_types(content)
language = self.detect_language(content)
estimated_tokens = self.estimate_tokens(content)
has_multimodal = self.detect_multimodal_content(content)
return DocumentAnalysis(
doc_type=doc_type,
structure_score=structure_score,
complexity_score=complexity_score,
content_types=content_types,
language=language,
estimated_tokens=estimated_tokens,
has_multimodal=has_multimodal
)
def detect_document_type(self, content: str) -> str:
content_lower = content.lower()
if '<html' in content_lower or '<body' in content_lower:
return 'html'
elif '#' in content and '##' in content:
return 'markdown'
elif '\\documentclass' in content_lower or '\\begin{' in content_lower:
return 'latex'
elif any(keyword in content_lower for keyword in ['abstract', 'introduction', 'conclusion', 'references']):
return 'academic'
elif 'def ' in content or 'class ' in content or 'function ' in content_lower:
return 'code'
else:
return 'plain'
def calculate_structure_score(self, content: str, doc_type: str) -> float:
patterns = self.structure_patterns.get(doc_type, [])
if not patterns:
return 0.5 # Default for unstructured content
line_count = len(content.split('\n'))
structured_lines = 0
for line in content.split('\n'):
for pattern in patterns:
if re.search(pattern, line.strip()):
structured_lines += 1
break
return min(structured_lines / max(line_count, 1), 1.0)
def calculate_complexity_score(self, content: str) -> float:
# Factors that increase complexity
avg_sentence_length = self.calculate_avg_sentence_length(content)
vocabulary_richness = self.calculate_vocabulary_richness(content)
nested_structure = self.detect_nested_structure(content)
# Normalize and combine
complexity = (
min(avg_sentence_length / 30, 1.0) * 0.3 +
vocabulary_richness * 0.4 +
nested_structure * 0.3
)
return min(complexity, 1.0)
def identify_content_types(self, content: str) -> List[str]:
types = []
if '```' in content or 'def ' in content or 'function ' in content.lower():
types.append('code')
if '|' in content and '\n' in content:
types.append('tables')
if re.search(r'\!\[.*\]\(.*\)', content):
types.append('images')
if re.search(r'http[s]?://', content):
types.append('links')
if re.search(r'\d+\.\d+', content) or re.search(r'\$\d', content):
types.append('numbers')
return types if types else ['text']
def detect_language(self, content: str) -> str:
# Simple language detection - can be enhanced with proper language detection libraries
if re.search(r'[\u4e00-\u9fff]', content):
return 'chinese'
elif re.search(r'[u0600-\u06ff]', content):
return 'arabic'
elif re.search(r'[u0400-\u04ff]', content):
return 'russian'
else:
return 'english' # Default assumption
def estimate_tokens(self, content: str) -> int:
# Rough estimation - actual tokenization varies by model
word_count = len(content.split())
return int(word_count * 1.3) # Average tokens per word
def detect_multimodal_content(self, content: str) -> bool:
multimodal_indicators = [
r'\!\[.*\]\(.*\)', # Images
r'<iframe', # Embedded content
r'<object', # Embedded objects
r'<embed', # Embedded media
]
return any(re.search(pattern, content) for pattern in multimodal_indicators)
def calculate_avg_sentence_length(self, content: str) -> float:
sentences = re.split(r'[.!?]+', content)
sentences = [s.strip() for s in sentences if s.strip()]
if not sentences:
return 0
return sum(len(s.split()) for s in sentences) / len(sentences)
def calculate_vocabulary_richness(self, content: str) -> float:
words = content.lower().split()
if not words:
return 0
unique_words = set(words)
return len(unique_words) / len(words)
def detect_nested_structure(self, content: str) -> float:
# Detect nested lists, indented content, etc.
lines = content.split('\n')
indented_lines = 0
for line in lines:
if line.strip() and line.startswith(' '):
indented_lines += 1
return indented_lines / max(len(lines), 1)from abc import ABC, abstractmethod
from typing import Dict, Any
class ChunkingStrategy(ABC):
@abstractmethod
def chunk(self, content: str, analysis: DocumentAnalysis) -> List[Dict[str, Any]]:
pass
class StrategySelector:
def __init__(self):
self.strategies = {
'fixed_size': FixedSizeStrategy(),
'recursive': RecursiveStrategy(),
'structure_aware': StructureAwareStrategy(),
'semantic': SemanticStrategy(),
'adaptive': AdaptiveStrategy()
}
def select_strategy(self, analysis: DocumentAnalysis) -> str:
# Rule-based selection logic
if analysis.structure_score > 0.8 and analysis.doc_type in ['markdown', 'html', 'latex']:
return 'structure_aware'
elif analysis.complexity_score > 0.7 and analysis.estimated_tokens < 10000:
return 'semantic'
elif analysis.doc_type == 'code':
return 'structure_aware'
elif analysis.structure_score < 0.3:
return 'fixed_size'
elif analysis.complexity_score > 0.5:
return 'recursive'
else:
return 'adaptive'
def get_strategy(self, analysis: DocumentAnalysis) -> ChunkingStrategy:
strategy_name = self.select_strategy(analysis)
return self.strategies[strategy_name]
# Example strategy implementations
class FixedSizeStrategy(ChunkingStrategy):
def __init__(self, default_size=512, default_overlap=50):
self.default_size = default_size
self.default_overlap = default_overlap
def chunk(self, content: str, analysis: DocumentAnalysis) -> List[Dict[str, Any]]:
# Adjust parameters based on analysis
if analysis.complexity_score > 0.7:
chunk_size = 1024
elif analysis.complexity_score < 0.3:
chunk_size = 256
else:
chunk_size = self.default_size
overlap = int(chunk_size * 0.1) # 10% overlap
# Implementation here...
return self._fixed_size_chunk(content, chunk_size, overlap)
def _fixed_size_chunk(self, content: str, chunk_size: int, overlap: int) -> List[Dict[str, Any]]:
# Implementation using RecursiveCharacterTextSplitter or custom logic
pass
class AdaptiveStrategy(ChunkingStrategy):
def chunk(self, content: str, analysis: DocumentAnalysis) -> List[Dict[str, Any]]:
# Combine multiple strategies based on content characteristics
if analysis.structure_score > 0.6:
# Use structure-aware for structured parts
structured_chunks = self._chunk_structured_parts(content, analysis)
else:
# Use fixed-size for unstructured parts
unstructured_chunks = self._chunk_unstructured_parts(content, analysis)
# Merge and optimize
return self._merge_chunks(structured_chunks + unstructured_chunks)
def _chunk_structured_parts(self, content: str, analysis: DocumentAnalysis) -> List[Dict[str, Any]]:
# Implementation for structured content
pass
def _chunk_unstructured_parts(self, content: str, analysis: DocumentAnalysis) -> List[Dict[str, Any]]:
# Implementation for unstructured content
pass
def _merge_chunks(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
# Implementation for merging and optimizing chunks
passfrom typing import List, Dict, Any
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
class ChunkQualityAssessor:
def __init__(self):
self.quality_weights = {
'coherence': 0.3,
'completeness': 0.25,
'size_appropriateness': 0.2,
'semantic_similarity': 0.15,
'boundary_quality': 0.1
}
def assess_chunks(self, chunks: List[Dict[str, Any]], analysis: DocumentAnalysis) -> Dict[str, float]:
scores = {}
# Coherence: Do chunks make sense on their own?
scores['coherence'] = self._assess_coherence(chunks)
# Completeness: Do chunks preserve important information?
scores['completeness'] = self._assess_completeness(chunks, analysis)
# Size appropriateness: Are chunks within optimal size range?
scores['size_appropriateness'] = self._assess_size(chunks)
# Semantic similarity: Are chunks thematically consistent?
scores['semantic_similarity'] = self._assess_semantic_consistency(chunks)
# Boundary quality: Are chunk boundaries placed well?
scores['boundary_quality'] = self._assess_boundary_quality(chunks)
# Calculate overall quality score
overall_score = sum(
score * self.quality_weights[metric]
for metric, score in scores.items()
)
scores['overall'] = overall_score
return scores
def _assess_coherence(self, chunks: List[Dict[str, Any]]) -> float:
# Simple heuristic-based coherence assessment
coherence_scores = []
for chunk in chunks:
content = chunk['content']
# Check for complete sentences
sentences = re.split(r'[.!?]+', content)
complete_sentences = sum(1 for s in sentences if s.strip())
coherence = complete_sentences / max(len(sentences), 1)
coherence_scores.append(coherence)
return np.mean(coherence_scores)
def _assess_completeness(self, chunks: List[Dict[str, Any]], analysis: DocumentAnalysis) -> float:
# Check if important structural elements are preserved
if analysis.doc_type in ['markdown', 'html']:
return self._assess_structure_preservation(chunks, analysis)
else:
return self._assess_content_preservation(chunks)
def _assess_structure_preservation(self, chunks: List[Dict[str, Any]], analysis: DocumentAnalysis) -> float:
# Check if headings, lists, and other structural elements are preserved
preserved_elements = 0
total_elements = 0
for chunk in chunks:
content = chunk['content']
# Count preserved structural elements
headings = len(re.findall(r'^#+\s', content, re.MULTILINE))
lists = len(re.findall(r'^\s*[-*+]\s', content, re.MULTILINE))
preserved_elements += headings + lists
total_elements += 1 # Simplified count
return preserved_elements / max(total_elements, 1)
def _assess_content_preservation(self, chunks: List[Dict[str, Any]]) -> float:
# Simple check based on content ratio
total_content = ''.join(chunk['content'] for chunk in chunks)
# This would need comparison with original content
return 0.8 # Placeholder
def _assess_size(self, chunks: List[Dict[str, Any]]) -> float:
optimal_min = 100 # tokens
optimal_max = 1000 # tokens
size_scores = []
for chunk in chunks:
token_count = self._estimate_tokens(chunk['content'])
if optimal_min <= token_count <= optimal_max:
score = 1.0
elif token_count < optimal_min:
score = token_count / optimal_min
else:
score = max(0, 1 - (token_count - optimal_max) / optimal_max)
size_scores.append(score)
return np.mean(size_scores)
def _assess_semantic_consistency(self, chunks: List[Dict[str, Any]]) -> float:
# This would require embedding models for actual implementation
# Placeholder implementation
return 0.7
def _assess_boundary_quality(self, chunks: List[Dict[str, Any]]) -> float:
# Check if boundaries don't split important content
boundary_scores = []
for i, chunk in enumerate(chunks):
content = chunk['content']
# Check for incomplete sentences at boundaries
if not content.strip().endswith(('.', '!', '?', '>', '}')):
boundary_scores.append(0.5)
else:
boundary_scores.append(1.0)
return np.mean(boundary_scores)
def _estimate_tokens(self, content: str) -> int:
# Simple token estimation
return len(content.split()) * 4 // 3 # Rough approximationimport logging
from typing import Optional, List
from dataclasses import dataclass
@dataclass
class ChunkingError:
error_type: str
message: str
chunk_index: Optional[int] = None
recovery_action: Optional[str] = None
class ChunkingErrorHandler:
def __init__(self):
self.logger = logging.getLogger(__name__)
self.error_handlers = {
'empty_content': self._handle_empty_content,
'oversized_chunk': self._handle_oversized_chunk,
'encoding_error': self._handle_encoding_error,
'memory_error': self._handle_memory_error,
'structure_parsing_error': self._handle_structure_parsing_error
}
def handle_error(self, error: Exception, context: Dict[str, Any]) -> ChunkingError:
error_type = self._classify_error(error)
handler = self.error_handlers.get(error_type, self._handle_generic_error)
return handler(error, context)
def _classify_error(self, error: Exception) -> str:
if isinstance(error, ValueError) and 'empty' in str(error).lower():
return 'empty_content'
elif isinstance(error, MemoryError):
return 'memory_error'
elif isinstance(error, UnicodeError):
return 'encoding_error'
elif 'too large' in str(error).lower():
return 'oversized_chunk'
elif 'parsing' in str(error).lower():
return 'structure_parsing_error'
else:
return 'generic_error'
def _handle_empty_content(self, error: Exception, context: Dict[str, Any]) -> ChunkingError:
self.logger.warning(f"Empty content encountered: {error}")
return ChunkingError(
error_type='empty_content',
message=str(error),
recovery_action='skip_empty_content'
)
def _handle_oversized_chunk(self, error: Exception, context: Dict[str, Any]) -> ChunkingError:
self.logger.warning(f"Oversized chunk detected: {error}")
return ChunkingError(
error_type='oversized_chunk',
message=str(error),
chunk_index=context.get('chunk_index'),
recovery_action='reduce_chunk_size'
)
def _handle_encoding_error(self, error: Exception, context: Dict[str, Any]) -> ChunkingError:
self.logger.error(f"Encoding error: {error}")
return ChunkingError(
error_type='encoding_error',
message=str(error),
recovery_action='fallback_encoding'
)
def _handle_memory_error(self, error: Exception, context: Dict[str, Any]) -> ChunkingError:
self.logger.error(f"Memory error during chunking: {error}")
return ChunkingError(
error_type='memory_error',
message=str(error),
recovery_action='process_in_batches'
)
def _handle_structure_parsing_error(self, error: Exception, context: Dict[str, Any]) -> ChunkingError:
self.logger.warning(f"Structure parsing failed: {error}")
return ChunkingError(
error_type='structure_parsing_error',
message=str(error),
recovery_action='fallback_to_fixed_size'
)
def _handle_generic_error(self, error: Exception, context: Dict[str, Any]) -> ChunkingError:
self.logger.error(f"Unexpected error during chunking: {error}")
return ChunkingError(
error_type='generic_error',
message=str(error),
recovery_action='skip_and_continue'
)import hashlib
import pickle
from functools import lru_cache
from typing import Dict, Any, Optional
import redis
import json
class ChunkingCache:
def __init__(self, redis_url: Optional[str] = None):
if redis_url:
self.redis_client = redis.from_url(redis_url)
else:
self.redis_client = None
self.local_cache = {}
def _generate_cache_key(self, content: str, strategy: str, params: Dict[str, Any]) -> str:
content_hash = hashlib.md5(content.encode()).hexdigest()
params_str = json.dumps(params, sort_keys=True)
params_hash = hashlib.md5(params_str.encode()).hexdigest()
return f"chunking:{strategy}:{content_hash}:{params_hash}"
def get(self, content: str, strategy: str, params: Dict[str, Any]) -> Optional[List[Dict[str, Any]]]:
cache_key = self._generate_cache_key(content, strategy, params)
# Try local cache first
if cache_key in self.local_cache:
return self.local_cache[cache_key]
# Try Redis cache
if self.redis_client:
try:
cached_data = self.redis_client.get(cache_key)
if cached_data:
chunks = pickle.loads(cached_data)
self.local_cache[cache_key] = chunks # Cache locally too
return chunks
except Exception as e:
logging.warning(f"Redis cache error: {e}")
return None
def set(self, content: str, strategy: str, params: Dict[str, Any], chunks: List[Dict[str, Any]]) -> None:
cache_key = self._generate_cache_key(content, strategy, params)
# Store in local cache
self.local_cache[cache_key] = chunks
# Store in Redis cache
if self.redis_client:
try:
cached_data = pickle.dumps(chunks)
self.redis_client.setex(cache_key, 3600, cached_data) # 1 hour TTL
except Exception as e:
logging.warning(f"Redis cache set error: {e}")
def clear_local_cache(self):
self.local_cache.clear()
def clear_redis_cache(self):
if self.redis_client:
pattern = "chunking:*"
keys = self.redis_client.keys(pattern)
if keys:
self.redis_client.delete(*keys)import asyncio
import concurrent.futures
from typing import List, Callable, Any
class BatchChunkingProcessor:
def __init__(self, max_workers: int = 4, batch_size: int = 10):
self.max_workers = max_workers
self.batch_size = batch_size
def process_documents_batch(self, documents: List[str],
chunking_function: Callable[[str], List[Dict[str, Any]]]) -> List[List[Dict[str, Any]]]:
"""Process multiple documents in parallel"""
results = []
# Process in batches to avoid memory issues
for i in range(0, len(documents), self.batch_size):
batch = documents[i:i + self.batch_size]
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_doc = {
executor.submit(chunking_function, doc): doc
for doc in batch
}
batch_results = []
for future in concurrent.futures.as_completed(future_to_doc):
try:
chunks = future.result()
batch_results.append(chunks)
except Exception as e:
logging.error(f"Error processing document: {e}")
batch_results.append([]) # Empty result for failed processing
results.extend(batch_results)
return results
async def process_documents_async(self, documents: List[str],
chunking_function: Callable[[str], List[Dict[str, Any]]]) -> List[List[Dict[str, Any]]]:
"""Process documents asynchronously"""
semaphore = asyncio.Semaphore(self.max_workers)
async def process_single_document(doc: str) -> List[Dict[str, Any]]:
async with semaphore:
# Run the synchronous chunking function in an executor
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, chunking_function, doc)
tasks = [process_single_document(doc) for doc in documents]
return await asyncio.gather(*tasks, return_exceptions=True)import time
from dataclasses import dataclass
from typing import Dict, Any, List
from collections import defaultdict
@dataclass
class ChunkingMetrics:
total_documents: int
total_chunks: int
avg_chunk_size: float
processing_time: float
memory_usage: float
error_count: int
strategy_distribution: Dict[str, int]
class MetricsCollector:
def __init__(self):
self.metrics = defaultdict(list)
self.start_time = None
def start_timing(self):
self.start_time = time.time()
def end_timing(self) -> float:
if self.start_time:
duration = time.time() - self.start_time
self.metrics['processing_time'].append(duration)
self.start_time = None
return duration
return 0.0
def record_chunk_count(self, count: int):
self.metrics['chunk_count'].append(count)
def record_chunk_size(self, size: int):
self.metrics['chunk_size'].append(size)
def record_strategy_usage(self, strategy: str):
self.metrics['strategy'][strategy] = self.metrics['strategy'].get(strategy, 0) + 1
def record_error(self, error_type: str):
self.metrics['errors'].append(error_type)
def record_memory_usage(self, memory_mb: float):
self.metrics['memory_usage'].append(memory_mb)
def get_summary(self) -> ChunkingMetrics:
return ChunkingMetrics(
total_documents=len(self.metrics['processing_time']),
total_chunks=sum(self.metrics['chunk_count']),
avg_chunk_size=sum(self.metrics['chunk_size']) / max(len(self.metrics['chunk_size']), 1),
processing_time=sum(self.metrics['processing_time']),
memory_usage=sum(self.metrics['memory_usage']) / max(len(self.metrics['memory_usage']), 1),
error_count=len(self.metrics['errors']),
strategy_distribution=dict(self.metrics['strategy'])
)
def reset(self):
self.metrics.clear()
self.start_time = NoneThis implementation guide provides a comprehensive foundation for building robust, scalable chunking systems that can handle various document types and use cases while maintaining high quality and performance.
plugins
developer-kit-ai
skills
chunking-strategy
prompt-engineering
developer-kit-aws
skills
aws
aws-cli-beast
aws-cost-optimization
aws-drawio-architecture-diagrams
aws-sam-bootstrap
aws-cloudformation
aws-cloudformation-auto-scaling
references
aws-cloudformation-bedrock
references
aws-cloudformation-cloudfront
references
aws-cloudformation-cloudwatch
references
aws-cloudformation-dynamodb
references
aws-cloudformation-ec2
aws-cloudformation-ecs
references
aws-cloudformation-elasticache
aws-cloudformation-iam
references
aws-cloudformation-lambda
references
aws-cloudformation-rds
aws-cloudformation-s3
references
aws-cloudformation-security
references
aws-cloudformation-task-ecs-deploy-gh
aws-cloudformation-vpc
developer-kit-core
skills
developer-kit-java
skills
aws-lambda-java-integration
aws-rds-spring-boot-integration
aws-sdk-java-v2-bedrock
aws-sdk-java-v2-core
aws-sdk-java-v2-dynamodb
aws-sdk-java-v2-kms
aws-sdk-java-v2-lambda
aws-sdk-java-v2-messaging
aws-sdk-java-v2-rds
aws-sdk-java-v2-s3
aws-sdk-java-v2-secrets-manager
graalvm-native-image
langchain4j
langchain4j-mcp-server-patterns
langchain4j-ai-services-patterns
references
langchain4j-mcp-server-patterns
references
langchain4j-rag-implementation-patterns
references
langchain4j-spring-boot-integration
langchain4j-testing-strategies
langchain4j-tool-function-calling-patterns
langchain4j-vector-stores-configuration
references
qdrant
references
spring-ai-mcp-server-patterns
references
spring-boot-actuator
spring-boot-cache
spring-boot-crud-patterns
spring-boot-dependency-injection
spring-boot-event-driven-patterns
spring-boot-openapi-documentation
spring-boot-project-creator
spring-boot-resilience4j
spring-boot-rest-api-standards
spring-boot-saga-pattern
spring-boot-security-jwt
assets
references
scripts
spring-boot-test-patterns
spring-data-jpa
references
spring-data-neo4j
references
unit-test-application-events
unit-test-bean-validation
unit-test-boundary-conditions
unit-test-caching
unit-test-config-properties
unit-test-controller-layer
unit-test-exception-handler
unit-test-json-serialization
unit-test-mapper-converter
unit-test-parameterized
unit-test-scheduled-async
unit-test-service-layer
unit-test-utility-methods
unit-test-wiremock-rest-api
developer-kit-php
skills
aws-lambda-php-integration
developer-kit-python
skills
aws-lambda-python-integration
developer-kit-tools
developer-kit-typescript
skills
aws-lambda-typescript-integration
better-auth
drizzle-orm-patterns
dynamodb-toolbox-patterns
references
nestjs
nestjs-best-practices
nestjs-code-review
nestjs-drizzle-crud-generator
scripts
nextjs-app-router
nextjs-authentication
nextjs-code-review
nextjs-data-fetching
references
nextjs-deployment
nextjs-performance
nx-monorepo
react-code-review
react-patterns
references
shadcn-ui
tailwind-css-patterns
references
tailwind-design-system
references
turborepo-monorepo
typescript-docs
typescript-security-review
zod-validation-utilities