CtrlK
BlogDocsLog inGet started
Tessl Logo

giuseppe-trisciuoglio/developer-kit

Comprehensive developer toolkit providing reusable skills for Java/Spring Boot, TypeScript/NestJS/React/Next.js, Python, PHP, AWS CloudFormation, AI/RAG, DevOps, and more.

82

Quality

82%

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Risky

Do not use without reviewing

Validation failed for skills in this tile
One or more skills have errors that need to be fixed before they can move to Implementation and Discovery review.
Overview
Quality
Evals
Security
Files

implementation.mdplugins/developer-kit-ai/skills/chunking-strategy/references/

Complete Implementation Guidelines

This document provides comprehensive implementation guidance for building effective chunking systems.

System Architecture

Core Components

Document Processor
├── Ingestion Layer
│   ├── Document Type Detection
│   ├── Format Parsing (PDF, HTML, Markdown, etc.)
│   └── Content Extraction
├── Analysis Layer
│   ├── Structure Analysis
│   ├── Content Type Identification
│   └── Complexity Assessment
├── Strategy Selection Layer
│   ├── Rule-based Selection
│   ├── ML-based Prediction
│   └── Adaptive Configuration
├── Chunking Layer
│   ├── Strategy Implementation
│   ├── Parameter Optimization
│   └── Quality Validation
└── Output Layer
    ├── Chunk Metadata Generation
    ├── Embedding Integration
    └── Storage Preparation

Pre-processing Pipeline

Document Analysis Framework

from dataclasses import dataclass
from typing import List, Dict, Any
import re

@dataclass
class DocumentAnalysis:
    doc_type: str
    structure_score: float  # 0-1, higher means more structured
    complexity_score: float  # 0-1, higher means more complex
    content_types: List[str]
    language: str
    estimated_tokens: int
    has_multimodal: bool

class DocumentAnalyzer:
    def __init__(self):
        self.structure_patterns = {
            'markdown': [r'^#+\s', r'^\*\*.*\*\*$', r'^\* ', r'^\d+\. '],
            'html': [r'<h[1-6]>', r'<p>', r'<div>', r'<table>'],
            'latex': [r'\\section', r'\\subsection', r'\\begin\{', r'\\end\{'],
            'academic': [r'^\d+\.', r'^\d+\.\d+', r'^[A-Z]\.', r'^Figure \d+']
        }

    def analyze(self, content: str) -> DocumentAnalysis:
        doc_type = self.detect_document_type(content)
        structure_score = self.calculate_structure_score(content, doc_type)
        complexity_score = self.calculate_complexity_score(content)
        content_types = self.identify_content_types(content)
        language = self.detect_language(content)
        estimated_tokens = self.estimate_tokens(content)
        has_multimodal = self.detect_multimodal_content(content)

        return DocumentAnalysis(
            doc_type=doc_type,
            structure_score=structure_score,
            complexity_score=complexity_score,
            content_types=content_types,
            language=language,
            estimated_tokens=estimated_tokens,
            has_multimodal=has_multimodal
        )

    def detect_document_type(self, content: str) -> str:
        content_lower = content.lower()

        if '<html' in content_lower or '<body' in content_lower:
            return 'html'
        elif '#' in content and '##' in content:
            return 'markdown'
        elif '\\documentclass' in content_lower or '\\begin{' in content_lower:
            return 'latex'
        elif any(keyword in content_lower for keyword in ['abstract', 'introduction', 'conclusion', 'references']):
            return 'academic'
        elif 'def ' in content or 'class ' in content or 'function ' in content_lower:
            return 'code'
        else:
            return 'plain'

    def calculate_structure_score(self, content: str, doc_type: str) -> float:
        patterns = self.structure_patterns.get(doc_type, [])
        if not patterns:
            return 0.5  # Default for unstructured content

        line_count = len(content.split('\n'))
        structured_lines = 0

        for line in content.split('\n'):
            for pattern in patterns:
                if re.search(pattern, line.strip()):
                    structured_lines += 1
                    break

        return min(structured_lines / max(line_count, 1), 1.0)

    def calculate_complexity_score(self, content: str) -> float:
        # Factors that increase complexity
        avg_sentence_length = self.calculate_avg_sentence_length(content)
        vocabulary_richness = self.calculate_vocabulary_richness(content)
        nested_structure = self.detect_nested_structure(content)

        # Normalize and combine
        complexity = (
            min(avg_sentence_length / 30, 1.0) * 0.3 +
            vocabulary_richness * 0.4 +
            nested_structure * 0.3
        )

        return min(complexity, 1.0)

    def identify_content_types(self, content: str) -> List[str]:
        types = []

        if '```' in content or 'def ' in content or 'function ' in content.lower():
            types.append('code')
        if '|' in content and '\n' in content:
            types.append('tables')
        if re.search(r'\!\[.*\]\(.*\)', content):
            types.append('images')
        if re.search(r'http[s]?://', content):
            types.append('links')
        if re.search(r'\d+\.\d+', content) or re.search(r'\$\d', content):
            types.append('numbers')

        return types if types else ['text']

    def detect_language(self, content: str) -> str:
        # Simple language detection - can be enhanced with proper language detection libraries
        if re.search(r'[\u4e00-\u9fff]', content):
            return 'chinese'
        elif re.search(r'[u0600-\u06ff]', content):
            return 'arabic'
        elif re.search(r'[u0400-\u04ff]', content):
            return 'russian'
        else:
            return 'english'  # Default assumption

    def estimate_tokens(self, content: str) -> int:
        # Rough estimation - actual tokenization varies by model
        word_count = len(content.split())
        return int(word_count * 1.3)  # Average tokens per word

    def detect_multimodal_content(self, content: str) -> bool:
        multimodal_indicators = [
            r'\!\[.*\]\(.*\)',  # Images
            r'<iframe',        # Embedded content
            r'<object',        # Embedded objects
            r'<embed',         # Embedded media
        ]

        return any(re.search(pattern, content) for pattern in multimodal_indicators)

    def calculate_avg_sentence_length(self, content: str) -> float:
        sentences = re.split(r'[.!?]+', content)
        sentences = [s.strip() for s in sentences if s.strip()]
        if not sentences:
            return 0
        return sum(len(s.split()) for s in sentences) / len(sentences)

    def calculate_vocabulary_richness(self, content: str) -> float:
        words = content.lower().split()
        if not words:
            return 0
        unique_words = set(words)
        return len(unique_words) / len(words)

    def detect_nested_structure(self, content: str) -> float:
        # Detect nested lists, indented content, etc.
        lines = content.split('\n')
        indented_lines = 0

        for line in lines:
            if line.strip() and line.startswith(' '):
                indented_lines += 1

        return indented_lines / max(len(lines), 1)

Strategy Selection Engine

from abc import ABC, abstractmethod
from typing import Dict, Any

class ChunkingStrategy(ABC):
    @abstractmethod
    def chunk(self, content: str, analysis: DocumentAnalysis) -> List[Dict[str, Any]]:
        pass

class StrategySelector:
    def __init__(self):
        self.strategies = {
            'fixed_size': FixedSizeStrategy(),
            'recursive': RecursiveStrategy(),
            'structure_aware': StructureAwareStrategy(),
            'semantic': SemanticStrategy(),
            'adaptive': AdaptiveStrategy()
        }

    def select_strategy(self, analysis: DocumentAnalysis) -> str:
        # Rule-based selection logic
        if analysis.structure_score > 0.8 and analysis.doc_type in ['markdown', 'html', 'latex']:
            return 'structure_aware'
        elif analysis.complexity_score > 0.7 and analysis.estimated_tokens < 10000:
            return 'semantic'
        elif analysis.doc_type == 'code':
            return 'structure_aware'
        elif analysis.structure_score < 0.3:
            return 'fixed_size'
        elif analysis.complexity_score > 0.5:
            return 'recursive'
        else:
            return 'adaptive'

    def get_strategy(self, analysis: DocumentAnalysis) -> ChunkingStrategy:
        strategy_name = self.select_strategy(analysis)
        return self.strategies[strategy_name]

# Example strategy implementations
class FixedSizeStrategy(ChunkingStrategy):
    def __init__(self, default_size=512, default_overlap=50):
        self.default_size = default_size
        self.default_overlap = default_overlap

    def chunk(self, content: str, analysis: DocumentAnalysis) -> List[Dict[str, Any]]:
        # Adjust parameters based on analysis
        if analysis.complexity_score > 0.7:
            chunk_size = 1024
        elif analysis.complexity_score < 0.3:
            chunk_size = 256
        else:
            chunk_size = self.default_size

        overlap = int(chunk_size * 0.1)  # 10% overlap

        # Implementation here...
        return self._fixed_size_chunk(content, chunk_size, overlap)

    def _fixed_size_chunk(self, content: str, chunk_size: int, overlap: int) -> List[Dict[str, Any]]:
        # Implementation using RecursiveCharacterTextSplitter or custom logic
        pass

class AdaptiveStrategy(ChunkingStrategy):
    def chunk(self, content: str, analysis: DocumentAnalysis) -> List[Dict[str, Any]]:
        # Combine multiple strategies based on content characteristics
        if analysis.structure_score > 0.6:
            # Use structure-aware for structured parts
            structured_chunks = self._chunk_structured_parts(content, analysis)
        else:
            # Use fixed-size for unstructured parts
            unstructured_chunks = self._chunk_unstructured_parts(content, analysis)

        # Merge and optimize
        return self._merge_chunks(structured_chunks + unstructured_chunks)

    def _chunk_structured_parts(self, content: str, analysis: DocumentAnalysis) -> List[Dict[str, Any]]:
        # Implementation for structured content
        pass

    def _chunk_unstructured_parts(self, content: str, analysis: DocumentAnalysis) -> List[Dict[str, Any]]:
        # Implementation for unstructured content
        pass

    def _merge_chunks(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        # Implementation for merging and optimizing chunks
        pass

Quality Assurance Framework

Chunk Quality Metrics

from typing import List, Dict, Any
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

class ChunkQualityAssessor:
    def __init__(self):
        self.quality_weights = {
            'coherence': 0.3,
            'completeness': 0.25,
            'size_appropriateness': 0.2,
            'semantic_similarity': 0.15,
            'boundary_quality': 0.1
        }

    def assess_chunks(self, chunks: List[Dict[str, Any]], analysis: DocumentAnalysis) -> Dict[str, float]:
        scores = {}

        # Coherence: Do chunks make sense on their own?
        scores['coherence'] = self._assess_coherence(chunks)

        # Completeness: Do chunks preserve important information?
        scores['completeness'] = self._assess_completeness(chunks, analysis)

        # Size appropriateness: Are chunks within optimal size range?
        scores['size_appropriateness'] = self._assess_size(chunks)

        # Semantic similarity: Are chunks thematically consistent?
        scores['semantic_similarity'] = self._assess_semantic_consistency(chunks)

        # Boundary quality: Are chunk boundaries placed well?
        scores['boundary_quality'] = self._assess_boundary_quality(chunks)

        # Calculate overall quality score
        overall_score = sum(
            score * self.quality_weights[metric]
            for metric, score in scores.items()
        )

        scores['overall'] = overall_score
        return scores

    def _assess_coherence(self, chunks: List[Dict[str, Any]]) -> float:
        # Simple heuristic-based coherence assessment
        coherence_scores = []

        for chunk in chunks:
            content = chunk['content']

            # Check for complete sentences
            sentences = re.split(r'[.!?]+', content)
            complete_sentences = sum(1 for s in sentences if s.strip())
            coherence = complete_sentences / max(len(sentences), 1)

            coherence_scores.append(coherence)

        return np.mean(coherence_scores)

    def _assess_completeness(self, chunks: List[Dict[str, Any]], analysis: DocumentAnalysis) -> float:
        # Check if important structural elements are preserved
        if analysis.doc_type in ['markdown', 'html']:
            return self._assess_structure_preservation(chunks, analysis)
        else:
            return self._assess_content_preservation(chunks)

    def _assess_structure_preservation(self, chunks: List[Dict[str, Any]], analysis: DocumentAnalysis) -> float:
        # Check if headings, lists, and other structural elements are preserved
        preserved_elements = 0
        total_elements = 0

        for chunk in chunks:
            content = chunk['content']

            # Count preserved structural elements
            headings = len(re.findall(r'^#+\s', content, re.MULTILINE))
            lists = len(re.findall(r'^\s*[-*+]\s', content, re.MULTILINE))

            preserved_elements += headings + lists
            total_elements += 1  # Simplified count

        return preserved_elements / max(total_elements, 1)

    def _assess_content_preservation(self, chunks: List[Dict[str, Any]]) -> float:
        # Simple check based on content ratio
        total_content = ''.join(chunk['content'] for chunk in chunks)
        # This would need comparison with original content
        return 0.8  # Placeholder

    def _assess_size(self, chunks: List[Dict[str, Any]]) -> float:
        optimal_min = 100  # tokens
        optimal_max = 1000  # tokens

        size_scores = []
        for chunk in chunks:
            token_count = self._estimate_tokens(chunk['content'])
            if optimal_min <= token_count <= optimal_max:
                score = 1.0
            elif token_count < optimal_min:
                score = token_count / optimal_min
            else:
                score = max(0, 1 - (token_count - optimal_max) / optimal_max)

            size_scores.append(score)

        return np.mean(size_scores)

    def _assess_semantic_consistency(self, chunks: List[Dict[str, Any]]) -> float:
        # This would require embedding models for actual implementation
        # Placeholder implementation
        return 0.7

    def _assess_boundary_quality(self, chunks: List[Dict[str, Any]]) -> float:
        # Check if boundaries don't split important content
        boundary_scores = []

        for i, chunk in enumerate(chunks):
            content = chunk['content']

            # Check for incomplete sentences at boundaries
            if not content.strip().endswith(('.', '!', '?', '>', '}')):
                boundary_scores.append(0.5)
            else:
                boundary_scores.append(1.0)

        return np.mean(boundary_scores)

    def _estimate_tokens(self, content: str) -> int:
        # Simple token estimation
        return len(content.split()) * 4 // 3  # Rough approximation

Error Handling and Edge Cases

Robust Error Handling

import logging
from typing import Optional, List
from dataclasses import dataclass

@dataclass
class ChunkingError:
    error_type: str
    message: str
    chunk_index: Optional[int] = None
    recovery_action: Optional[str] = None

class ChunkingErrorHandler:
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.error_handlers = {
            'empty_content': self._handle_empty_content,
            'oversized_chunk': self._handle_oversized_chunk,
            'encoding_error': self._handle_encoding_error,
            'memory_error': self._handle_memory_error,
            'structure_parsing_error': self._handle_structure_parsing_error
        }

    def handle_error(self, error: Exception, context: Dict[str, Any]) -> ChunkingError:
        error_type = self._classify_error(error)
        handler = self.error_handlers.get(error_type, self._handle_generic_error)
        return handler(error, context)

    def _classify_error(self, error: Exception) -> str:
        if isinstance(error, ValueError) and 'empty' in str(error).lower():
            return 'empty_content'
        elif isinstance(error, MemoryError):
            return 'memory_error'
        elif isinstance(error, UnicodeError):
            return 'encoding_error'
        elif 'too large' in str(error).lower():
            return 'oversized_chunk'
        elif 'parsing' in str(error).lower():
            return 'structure_parsing_error'
        else:
            return 'generic_error'

    def _handle_empty_content(self, error: Exception, context: Dict[str, Any]) -> ChunkingError:
        self.logger.warning(f"Empty content encountered: {error}")
        return ChunkingError(
            error_type='empty_content',
            message=str(error),
            recovery_action='skip_empty_content'
        )

    def _handle_oversized_chunk(self, error: Exception, context: Dict[str, Any]) -> ChunkingError:
        self.logger.warning(f"Oversized chunk detected: {error}")
        return ChunkingError(
            error_type='oversized_chunk',
            message=str(error),
            chunk_index=context.get('chunk_index'),
            recovery_action='reduce_chunk_size'
        )

    def _handle_encoding_error(self, error: Exception, context: Dict[str, Any]) -> ChunkingError:
        self.logger.error(f"Encoding error: {error}")
        return ChunkingError(
            error_type='encoding_error',
            message=str(error),
            recovery_action='fallback_encoding'
        )

    def _handle_memory_error(self, error: Exception, context: Dict[str, Any]) -> ChunkingError:
        self.logger.error(f"Memory error during chunking: {error}")
        return ChunkingError(
            error_type='memory_error',
            message=str(error),
            recovery_action='process_in_batches'
        )

    def _handle_structure_parsing_error(self, error: Exception, context: Dict[str, Any]) -> ChunkingError:
        self.logger.warning(f"Structure parsing failed: {error}")
        return ChunkingError(
            error_type='structure_parsing_error',
            message=str(error),
            recovery_action='fallback_to_fixed_size'
        )

    def _handle_generic_error(self, error: Exception, context: Dict[str, Any]) -> ChunkingError:
        self.logger.error(f"Unexpected error during chunking: {error}")
        return ChunkingError(
            error_type='generic_error',
            message=str(error),
            recovery_action='skip_and_continue'
        )

Performance Optimization

Caching and Memoization

import hashlib
import pickle
from functools import lru_cache
from typing import Dict, Any, Optional
import redis
import json

class ChunkingCache:
    def __init__(self, redis_url: Optional[str] = None):
        if redis_url:
            self.redis_client = redis.from_url(redis_url)
        else:
            self.redis_client = None
        self.local_cache = {}

    def _generate_cache_key(self, content: str, strategy: str, params: Dict[str, Any]) -> str:
        content_hash = hashlib.md5(content.encode()).hexdigest()
        params_str = json.dumps(params, sort_keys=True)
        params_hash = hashlib.md5(params_str.encode()).hexdigest()
        return f"chunking:{strategy}:{content_hash}:{params_hash}"

    def get(self, content: str, strategy: str, params: Dict[str, Any]) -> Optional[List[Dict[str, Any]]]:
        cache_key = self._generate_cache_key(content, strategy, params)

        # Try local cache first
        if cache_key in self.local_cache:
            return self.local_cache[cache_key]

        # Try Redis cache
        if self.redis_client:
            try:
                cached_data = self.redis_client.get(cache_key)
                if cached_data:
                    chunks = pickle.loads(cached_data)
                    self.local_cache[cache_key] = chunks  # Cache locally too
                    return chunks
            except Exception as e:
                logging.warning(f"Redis cache error: {e}")

        return None

    def set(self, content: str, strategy: str, params: Dict[str, Any], chunks: List[Dict[str, Any]]) -> None:
        cache_key = self._generate_cache_key(content, strategy, params)

        # Store in local cache
        self.local_cache[cache_key] = chunks

        # Store in Redis cache
        if self.redis_client:
            try:
                cached_data = pickle.dumps(chunks)
                self.redis_client.setex(cache_key, 3600, cached_data)  # 1 hour TTL
            except Exception as e:
                logging.warning(f"Redis cache set error: {e}")

    def clear_local_cache(self):
        self.local_cache.clear()

    def clear_redis_cache(self):
        if self.redis_client:
            pattern = "chunking:*"
            keys = self.redis_client.keys(pattern)
            if keys:
                self.redis_client.delete(*keys)

Batch Processing

import asyncio
import concurrent.futures
from typing import List, Callable, Any

class BatchChunkingProcessor:
    def __init__(self, max_workers: int = 4, batch_size: int = 10):
        self.max_workers = max_workers
        self.batch_size = batch_size

    def process_documents_batch(self, documents: List[str],
                               chunking_function: Callable[[str], List[Dict[str, Any]]]) -> List[List[Dict[str, Any]]]:
        """Process multiple documents in parallel"""
        results = []

        # Process in batches to avoid memory issues
        for i in range(0, len(documents), self.batch_size):
            batch = documents[i:i + self.batch_size]

            with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
                future_to_doc = {
                    executor.submit(chunking_function, doc): doc
                    for doc in batch
                }

                batch_results = []
                for future in concurrent.futures.as_completed(future_to_doc):
                    try:
                        chunks = future.result()
                        batch_results.append(chunks)
                    except Exception as e:
                        logging.error(f"Error processing document: {e}")
                        batch_results.append([])  # Empty result for failed processing

                results.extend(batch_results)

        return results

    async def process_documents_async(self, documents: List[str],
                                     chunking_function: Callable[[str], List[Dict[str, Any]]]) -> List[List[Dict[str, Any]]]:
        """Process documents asynchronously"""
        semaphore = asyncio.Semaphore(self.max_workers)

        async def process_single_document(doc: str) -> List[Dict[str, Any]]:
            async with semaphore:
                # Run the synchronous chunking function in an executor
                loop = asyncio.get_event_loop()
                return await loop.run_in_executor(None, chunking_function, doc)

        tasks = [process_single_document(doc) for doc in documents]
        return await asyncio.gather(*tasks, return_exceptions=True)

Monitoring and Observability

Metrics Collection

import time
from dataclasses import dataclass
from typing import Dict, Any, List
from collections import defaultdict

@dataclass
class ChunkingMetrics:
    total_documents: int
    total_chunks: int
    avg_chunk_size: float
    processing_time: float
    memory_usage: float
    error_count: int
    strategy_distribution: Dict[str, int]

class MetricsCollector:
    def __init__(self):
        self.metrics = defaultdict(list)
        self.start_time = None

    def start_timing(self):
        self.start_time = time.time()

    def end_timing(self) -> float:
        if self.start_time:
            duration = time.time() - self.start_time
            self.metrics['processing_time'].append(duration)
            self.start_time = None
            return duration
        return 0.0

    def record_chunk_count(self, count: int):
        self.metrics['chunk_count'].append(count)

    def record_chunk_size(self, size: int):
        self.metrics['chunk_size'].append(size)

    def record_strategy_usage(self, strategy: str):
        self.metrics['strategy'][strategy] = self.metrics['strategy'].get(strategy, 0) + 1

    def record_error(self, error_type: str):
        self.metrics['errors'].append(error_type)

    def record_memory_usage(self, memory_mb: float):
        self.metrics['memory_usage'].append(memory_mb)

    def get_summary(self) -> ChunkingMetrics:
        return ChunkingMetrics(
            total_documents=len(self.metrics['processing_time']),
            total_chunks=sum(self.metrics['chunk_count']),
            avg_chunk_size=sum(self.metrics['chunk_size']) / max(len(self.metrics['chunk_size']), 1),
            processing_time=sum(self.metrics['processing_time']),
            memory_usage=sum(self.metrics['memory_usage']) / max(len(self.metrics['memory_usage']), 1),
            error_count=len(self.metrics['errors']),
            strategy_distribution=dict(self.metrics['strategy'])
        )

    def reset(self):
        self.metrics.clear()
        self.start_time = None

This implementation guide provides a comprehensive foundation for building robust, scalable chunking systems that can handle various document types and use cases while maintaining high quality and performance.

plugins

developer-kit-ai

skills

chunking-strategy

references

advanced-strategies.md

evaluation.md

implementation.md

research.md

semantic-methods.md

strategies.md

tools.md

visualization-tools.md

SKILL.md

README.md

tile.json