Comprehensive Python SDK for AI application observability and experimentation with OpenTelemetry-based tracing, automatic instrumentation, and dataset management.
Specialized span types for different AI application components, each optimized for specific use cases with appropriate metadata and visualization in the Langfuse UI.
All observation types inherit common functionality from the base wrapper class.
class LangfuseObservationWrapper:
def end(self, *, end_time: int = None) -> "LangfuseObservationWrapper":
"""End the observation."""
def update(self, *, name: str = None, input: Any = None, output: Any = None,
metadata: Any = None, level: SpanLevel = None,
status_message: str = None, **kwargs) -> "LangfuseObservationWrapper":
"""Update observation attributes."""
def update_trace(self, *, name: str = None, user_id: str = None,
session_id: str = None, tags: List[str] = None,
**kwargs) -> "LangfuseObservationWrapper":
"""Update trace-level attributes."""
def score(self, *, name: str, value: Union[float, str],
data_type: ScoreDataType = None, comment: str = None) -> None:
"""Add score to this observation."""
def score_trace(self, *, name: str, value: Union[float, str],
data_type: ScoreDataType = None, comment: str = None) -> None:
"""Add score to the entire trace."""
# Attributes
trace_id: str
id: strGeneral-purpose span for tracing any operation. Use when no other specialized type fits your use case.
class LangfuseSpan(LangfuseObservationWrapper):
def start_span(self, name: str, *, input: Any = None, output: Any = None,
metadata: Any = None, **kwargs) -> "LangfuseSpan":
"""Create child span."""
def start_as_current_span(self, *, name: str, **kwargs) -> ContextManager["LangfuseSpan"]:
"""Create child span as context manager (deprecated)."""
def start_generation(self, *, name: str, **kwargs) -> "LangfuseGeneration":
"""Create child generation (deprecated)."""
def create_event(self, *, name: str, **kwargs) -> "LangfuseEvent":
"""Create event observation."""Usage Example:
# General operations
with langfuse.start_as_current_observation(name="data-processing", as_type="span") as span:
result = process_data()
span.update(output=result)Specialized span for AI model generation operations with support for model metrics, token usage, and cost tracking.
class LangfuseGeneration(LangfuseObservationWrapper):
def update(self, *, completion_start_time: datetime = None, model: str = None,
model_parameters: Dict[str, Any] = None, usage_details: Dict[str, int] = None,
cost_details: Dict[str, float] = None, prompt: PromptClient = None,
**kwargs) -> "LangfuseGeneration":
"""Update generation with model-specific attributes.
Args:
completion_start_time: When model started generating response
model: Model name/identifier (e.g., "gpt-4", "claude-3")
model_parameters: Model parameters (temperature, max_tokens, etc.)
usage_details: Token usage (prompt_tokens, completion_tokens, etc.)
cost_details: Cost breakdown (input_cost, output_cost, total_cost)
prompt: Associated prompt template
"""Usage Example:
@observe(as_type="generation")
def call_openai(prompt):
response = openai.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
temperature=0.7
)
# Automatically captured by decorator or update manually:
# span.update(
# model="gpt-4",
# model_parameters={"temperature": 0.7},
# usage_details={
# "prompt_tokens": response.usage.prompt_tokens,
# "completion_tokens": response.usage.completion_tokens
# }
# )
return response.choices[0].message.contentPoint-in-time event observations for discrete occurrences. Events are automatically ended and cannot be updated.
class LangfuseEvent(LangfuseObservationWrapper):
def update(self, **kwargs) -> "LangfuseEvent":
"""Update is not allowed for events. Logs warning and returns self."""Usage Example:
# Log discrete events
event = langfuse.create_event(
name="user-login",
input={"user_id": "123", "method": "oauth"},
metadata={"ip": "192.168.1.1"}
)Observation for agent reasoning blocks that act on tools using LLM guidance. Use for autonomous agents and AI assistants.
class LangfuseAgent(LangfuseObservationWrapper):
"""Agent observation for reasoning blocks using LLM guidance."""Usage Example:
@observe(as_type="agent")
def autonomous_agent(task):
# Agent reasoning with tool usage
plan = create_plan(task)
for step in plan:
with langfuse.start_as_current_observation(name="tool-call", as_type="tool") as tool:
result = execute_tool(step)
tool.update(output=result)
return final_resultObservation for external tool calls such as API requests, database queries, or file operations.
class LangfuseTool(LangfuseObservationWrapper):
"""Tool observation for external tool calls."""Usage Example:
@observe(as_type="tool")
def call_weather_api(location):
response = requests.get(f"https://api.weather.com/v1/{location}")
return response.json()
@observe(as_type="tool")
def database_query(query):
with database.connection() as conn:
result = conn.execute(query)
return result.fetchall()Observation for connecting LLM application steps, representing workflows or pipelines that pass context between stages.
class LangfuseChain(LangfuseObservationWrapper):
"""Chain observation for connecting application steps."""Usage Example:
@observe(as_type="chain")
def rag_pipeline(question):
# Multi-step RAG chain
with langfuse.start_as_current_observation(name="retrieve", as_type="retriever") as retriever:
documents = vector_search(question)
retriever.update(output=documents)
with langfuse.start_as_current_observation(name="generate", as_type="generation") as gen:
context = format_context(documents)
answer = llm.generate(f"Context: {context}\nQuestion: {question}")
gen.update(output=answer)
return answerObservation for data retrieval operations such as vector database searches, document lookups, or knowledge base queries.
class LangfuseRetriever(LangfuseObservationWrapper):
"""Retriever observation for data retrieval operations."""Usage Example:
@observe(as_type="retriever")
def vector_search(query, top_k=5):
embedding = embed_query(query)
results = vector_db.search(embedding, top_k=top_k)
return [{"content": r.content, "score": r.score} for r in results]
@observe(as_type="retriever")
def knowledge_lookup(entity):
return knowledge_graph.get_facts(entity)Specialized observation for embedding generation operations with support for model metrics like generation observations.
class LangfuseEmbedding(LangfuseObservationWrapper):
"""Embedding observation for embedding generation operations."""Usage Example:
@observe(as_type="embedding")
def generate_embeddings(texts):
response = openai.embeddings.create(
model="text-embedding-ada-002",
input=texts
)
return [embedding.embedding for embedding in response.data]Observation for evaluation and assessment operations, measuring quality, correctness, or other metrics.
class LangfuseEvaluator(LangfuseObservationWrapper):
"""Evaluator observation for assessment operations."""Usage Example:
@observe(as_type="evaluator")
def relevance_evaluator(query, response):
# Evaluate response relevance
relevance_score = calculate_relevance(query, response)
return {"relevance": relevance_score, "threshold": 0.8}
@observe(as_type="evaluator")
def toxicity_checker(text):
toxicity_score = toxicity_model.predict(text)
return {"is_toxic": toxicity_score > 0.7, "score": toxicity_score}Observation for safety and security checks such as jailbreak prevention, content filtering, or policy enforcement.
class LangfuseGuardrail(LangfuseObservationWrapper):
"""Guardrail observation for safety/security checks."""Usage Example:
@observe(as_type="guardrail")
def content_filter(user_input):
# Check for inappropriate content
if contains_inappropriate_content(user_input):
return {"allowed": False, "reason": "inappropriate_content"}
return {"allowed": True}
@observe(as_type="guardrail")
def jailbreak_detector(prompt):
jailbreak_score = jailbreak_model.predict(prompt)
return {
"is_jailbreak": jailbreak_score > 0.8,
"score": jailbreak_score,
"blocked": jailbreak_score > 0.8
}@observe(as_type="chain")
def complete_workflow():
with langfuse.start_as_current_observation(name="safety-check", as_type="guardrail") as guard:
safety_result = check_safety()
guard.update(output=safety_result)
if safety_result["allowed"]:
with langfuse.start_as_current_observation(name="retrieve", as_type="retriever") as ret:
context = retrieve_context()
ret.update(output=context)
with langfuse.start_as_current_observation(name="generate", as_type="generation") as gen:
response = generate_response(context)
gen.update(output=response)
return response
else:
return "Request blocked by safety filter"@observe(as_type="tool")
def external_api_call():
try:
result = make_api_request()
return result
except APIError as e:
# Error automatically captured with ERROR level
raiseInstall with Tessl CLI
npx tessl i tessl/pypi-langfuse