Google Cloud Natural Language API client library providing sentiment analysis, entity recognition, text classification, and content moderation capabilities
—
Detects and flags potentially harmful, inappropriate, or unsafe content in text, providing moderation categories and confidence scores for content filtering applications. Essential for maintaining safe online environments, protecting users from harmful content, and ensuring compliance with content policies.
Analyzes the provided text to detect potentially harmful or inappropriate content across multiple safety categories.
def moderate_text(
self,
request: Optional[Union[ModerateTextRequest, dict]] = None,
*,
document: Optional[Document] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, Union[str, bytes]]] = ()
) -> ModerateTextResponse:
"""
Moderates text to detect potentially harmful or inappropriate content.
Args:
request: The request object containing document
document: Input document for moderation
retry: Retry configuration for the request
timeout: Request timeout in seconds
metadata: Additional metadata to send with the request
Returns:
ModerateTextResponse containing moderation results
"""from google.cloud import language
# Initialize client
client = language.LanguageServiceClient()
# Create document
document = language.Document(
content="This content contains inappropriate language and harmful statements.",
type_=language.Document.Type.PLAIN_TEXT
)
# Moderate content
response = client.moderate_text(
request={"document": document}
)
# Process moderation results
print("Content Moderation Results:")
for category in response.moderation_categories:
print(f"Category: {category.name}")
print(f"Confidence: {category.confidence:.3f}")
# Check if content should be flagged
if category.confidence > 0.5: # Threshold can be adjusted
print(f"⚠️ Content flagged for: {category.name}")
print()
# Overall safety assessment
flagged_categories = [
cat for cat in response.moderation_categories
if cat.confidence > 0.5
]
if flagged_categories:
print(f"Content FLAGGED - {len(flagged_categories)} safety issues detected")
else:
print("Content appears safe")class ModerateTextRequest:
document: Documentclass ModerateTextResponse:
moderation_categories: MutableSequence[ClassificationCategory]The system detects various types of harmful content:
Each category includes a confidence score from 0.0 to 1.0:
class ContentModerator:
def __init__(self, client, thresholds=None):
self.client = client
self.thresholds = thresholds or {
'Toxic': 0.7,
'Severe Toxicity': 0.5,
'Identity Attack': 0.6,
'Insult': 0.8,
'Profanity': 0.9,
'Threat': 0.3,
'Sexually Explicit': 0.8,
'Flirtation': 0.9
}
def moderate_content(self, text):
"""Moderate content with configurable thresholds."""
document = language.Document(
content=text,
type_=language.Document.Type.PLAIN_TEXT
)
response = self.client.moderate_text(
request={"document": document}
)
violations = []
warnings = []
for category in response.moderation_categories:
category_name = category.name
confidence = category.confidence
# Check against custom thresholds
threshold = self.thresholds.get(category_name, 0.5)
if confidence >= threshold:
severity = 'high' if confidence >= 0.7 else 'medium'
violations.append({
'category': category_name,
'confidence': confidence,
'severity': severity,
'threshold': threshold
})
elif confidence >= 0.3: # Warning threshold
warnings.append({
'category': category_name,
'confidence': confidence
})
return {
'violations': violations,
'warnings': warnings,
'safe': len(violations) == 0,
'all_categories': response.moderation_categories
}
def get_action_recommendation(self, moderation_result):
"""Get recommended action based on moderation results."""
violations = moderation_result['violations']
if not violations:
return 'approve'
# Check for severe violations
severe_violations = [v for v in violations if v['severity'] == 'high']
threat_violations = [v for v in violations if v['category'] == 'Threat']
if severe_violations or threat_violations:
return 'block'
elif len(violations) >= 3:
return 'review'
elif any(v['confidence'] >= 0.8 for v in violations):
return 'review'
else:
return 'flag'
# Usage
moderator = ContentModerator(client)
test_texts = [
"This is a normal, friendly message.",
"You're such an idiot and I hate you!",
"I'm going to hurt you if you don't stop.",
"That's a really inappropriate and offensive comment."
]
for text in test_texts:
result = moderator.moderate_content(text)
action = moderator.get_action_recommendation(result)
print(f"Text: {text[:50]}...")
print(f"Action: {action}")
print(f"Safe: {result['safe']}")
if result['violations']:
print("Violations:")
for violation in result['violations']:
print(f" - {violation['category']}: {violation['confidence']:.3f} ({violation['severity']})")
if result['warnings']:
print("Warnings:")
for warning in result['warnings']:
print(f" - {warning['category']}: {warning['confidence']:.3f}")
print()def moderate_content_batch(client, texts, batch_size=10):
"""Moderate multiple texts efficiently."""
results = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
batch_results = []
for text in batch:
try:
document = language.Document(
content=text,
type_=language.Document.Type.PLAIN_TEXT
)
response = client.moderate_text(
request={"document": document}
)
# Categorize results
violations = []
max_confidence = 0
for category in response.moderation_categories:
if category.confidence > 0.5:
violations.append({
'category': category.name,
'confidence': category.confidence
})
max_confidence = max(max_confidence, category.confidence)
batch_results.append({
'text': text,
'violations': violations,
'max_confidence': max_confidence,
'safe': len(violations) == 0,
'all_categories': response.moderation_categories
})
except Exception as e:
batch_results.append({
'text': text,
'error': str(e),
'safe': None
})
results.extend(batch_results)
return results
def generate_moderation_report(results):
"""Generate a summary report from batch moderation results."""
total_texts = len(results)
safe_count = sum(1 for r in results if r.get('safe') == True)
flagged_count = sum(1 for r in results if r.get('safe') == False)
error_count = sum(1 for r in results if 'error' in r)
# Category statistics
category_counts = {}
for result in results:
if 'violations' in result:
for violation in result['violations']:
category = violation['category']
category_counts[category] = category_counts.get(category, 0) + 1
print(f"Moderation Report")
print(f"================")
print(f"Total texts processed: {total_texts}")
print(f"Safe content: {safe_count} ({safe_count/total_texts*100:.1f}%)")
print(f"Flagged content: {flagged_count} ({flagged_count/total_texts*100:.1f}%)")
print(f"Processing errors: {error_count}")
print()
if category_counts:
print("Most common violations:")
sorted_categories = sorted(category_counts.items(), key=lambda x: x[1], reverse=True)
for category, count in sorted_categories[:5]:
print(f" {category}: {count} ({count/total_texts*100:.1f}%)")
return {
'total': total_texts,
'safe': safe_count,
'flagged': flagged_count,
'errors': error_count,
'category_counts': category_counts
}
# Usage
sample_texts = [
"Welcome to our community! Please be respectful.",
"This is completely inappropriate and offensive.",
"Great post! Thanks for sharing this information.",
"You're an absolute moron and should be banned.",
"I love this product and would recommend it to others."
]
batch_results = moderate_content_batch(client, sample_texts)
report = generate_moderation_report(batch_results)class RealTimeContentFilter:
def __init__(self, client, auto_block_threshold=0.8):
self.client = client
self.auto_block_threshold = auto_block_threshold
self.cache = {} # Simple cache for repeated content
def filter_message(self, message, user_id=None):
"""Filter a message in real-time with caching."""
# Check cache first
cache_key = hash(message.strip().lower())
if cache_key in self.cache:
return self.cache[cache_key]
document = language.Document(
content=message,
type_=language.Document.Type.PLAIN_TEXT
)
try:
response = self.client.moderate_text(
request={"document": document}
)
# Analyze results
max_confidence = 0
violations = []
for category in response.moderation_categories:
if category.confidence > 0.3: # Low threshold for tracking
violations.append({
'category': category.name,
'confidence': category.confidence
})
max_confidence = max(max_confidence, category.confidence)
# Determine action
if max_confidence >= self.auto_block_threshold:
action = 'block'
reason = f"High confidence violation ({max_confidence:.3f})"
elif max_confidence >= 0.5:
action = 'review'
reason = f"Moderate confidence violation ({max_confidence:.3f})"
else:
action = 'allow'
reason = "Content appears safe"
result = {
'action': action,
'reason': reason,
'confidence': max_confidence,
'violations': violations,
'user_id': user_id,
'timestamp': None # Would be set in real implementation
}
# Cache result
self.cache[cache_key] = result
return result
except Exception as e:
# Fail safe - allow content but log error
return {
'action': 'allow',
'reason': f"Moderation error: {str(e)}",
'confidence': 0,
'violations': [],
'user_id': user_id,
'error': True
}
def get_filter_stats(self):
"""Get statistics about filtering actions."""
if not self.cache:
return {}
actions = [result['action'] for result in self.cache.values()]
stats = {
'total_processed': len(actions),
'blocked': actions.count('block'),
'reviewed': actions.count('review'),
'allowed': actions.count('allow')
}
stats['block_rate'] = stats['blocked'] / stats['total_processed'] * 100
stats['review_rate'] = stats['reviewed'] / stats['total_processed'] * 100
return stats
# Usage
filter_system = RealTimeContentFilter(client, auto_block_threshold=0.7)
messages = [
("Hello everyone!", "user1"),
("This is absolutely disgusting content.", "user2"),
("Thanks for the helpful information.", "user3"),
("You're all idiots and I hate this place.", "user4"),
("Looking forward to the next update!", "user5")
]
print("Real-time Content Filtering:")
for message, user in messages:
result = filter_system.filter_message(message, user)
print(f"User {user}: {message[:30]}...")
print(f" Action: {result['action']} - {result['reason']}")
if result['violations']:
print(f" Violations: {len(result['violations'])}")
for violation in result['violations'][:2]: # Show top 2
print(f" - {violation['category']}: {violation['confidence']:.3f}")
print()
# Show filtering statistics
stats = filter_system.get_filter_stats()
print("Filtering Statistics:")
for key, value in stats.items():
print(f" {key}: {value}")class ModerationPipeline:
def __init__(self, client):
self.client = client
self.processing_queue = []
self.processed_results = []
def add_content(self, content_id, text, metadata=None):
"""Add content to moderation queue."""
self.processing_queue.append({
'id': content_id,
'text': text,
'metadata': metadata or {},
'status': 'queued'
})
def process_queue(self):
"""Process all queued content."""
processed_count = 0
for item in self.processing_queue:
if item['status'] == 'queued':
try:
# Moderate content
document = language.Document(
content=item['text'],
type_=language.Document.Type.PLAIN_TEXT
)
response = self.client.moderate_text(
request={"document": document}
)
# Process results
violations = []
for category in response.moderation_categories:
violations.append({
'category': category.name,
'confidence': category.confidence
})
# Determine final action
high_confidence_violations = [
v for v in violations if v['confidence'] >= 0.7
]
if high_confidence_violations:
final_action = 'reject'
elif any(v['confidence'] >= 0.5 for v in violations):
final_action = 'review'
else:
final_action = 'approve'
result = {
'id': item['id'],
'text': item['text'],
'metadata': item['metadata'],
'action': final_action,
'violations': violations,
'processed': True,
'error': None
}
item['status'] = 'processed'
self.processed_results.append(result)
processed_count += 1
except Exception as e:
result = {
'id': item['id'],
'text': item['text'],
'metadata': item['metadata'],
'action': 'error',
'violations': [],
'processed': False,
'error': str(e)
}
item['status'] = 'error'
self.processed_results.append(result)
return processed_count
def get_results_by_action(self, action):
"""Get all results with a specific action."""
return [r for r in self.processed_results if r['action'] == action]
def export_review_queue(self):
"""Export items that need human review."""
review_items = self.get_results_by_action('review')
export_data = []
for item in review_items:
export_data.append({
'content_id': item['id'],
'text_preview': item['text'][:100] + "..." if len(item['text']) > 100 else item['text'],
'violations': item['violations'],
'metadata': item['metadata']
})
return export_data
# Usage
pipeline = ModerationPipeline(client)
# Add content to queue
content_samples = [
("post_1", "This is a great article about technology trends."),
("comment_2", "Your opinion is completely wrong and stupid."),
("review_3", "The product works well and I'm satisfied."),
("message_4", "I'm going to report you for this behavior."),
("post_5", "Looking forward to the conference next week!")
]
for content_id, text in content_samples:
pipeline.add_content(content_id, text, {'source': 'user_generated'})
# Process the queue
processed = pipeline.process_queue()
print(f"Processed {processed} items")
# Get results by action
approved = pipeline.get_results_by_action('approve')
rejected = pipeline.get_results_by_action('reject')
review_needed = pipeline.get_results_by_action('review')
print(f"Approved: {len(approved)}")
print(f"Rejected: {len(rejected)}")
print(f"Needs Review: {len(review_needed)}")
# Export review queue
if review_needed:
review_queue = pipeline.export_review_queue()
print("\nItems needing human review:")
for item in review_queue:
print(f"ID: {item['content_id']}")
print(f"Text: {item['text_preview']}")
print(f"Violations: {len(item['violations'])}")
print()from google.api_core import exceptions
try:
response = client.moderate_text(
request={"document": document},
timeout=10.0
)
except exceptions.InvalidArgument as e:
print(f"Invalid document: {e}")
# Common causes: empty document, unsupported content type
except exceptions.ResourceExhausted:
print("API quota exceeded")
except exceptions.DeadlineExceeded:
print("Request timed out")
except exceptions.GoogleAPIError as e:
print(f"API error: {e}")
# Handle no moderation results
if not response.moderation_categories:
print("No moderation categories returned - content may be too short")Install with Tessl CLI
npx tessl i tessl/pypi-google-cloud-language