Process mining library for discovering, analyzing and visualizing business processes from event data
—
Comprehensive statistical analysis functions and advanced analytical operations for process behavior, performance metrics, model analysis, and process intelligence. PM4PY provides both descriptive statistics and advanced analytical capabilities.
Fundamental statistical functions for extracting basic information from event logs.
def get_start_activities(log, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name'):
"""
Get start activities and their frequencies across all cases.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- activity_key (str): Activity attribute name
- timestamp_key (str): Timestamp attribute name
- case_id_key (str): Case ID attribute name
Returns:
Dict[str, int]: Start activities with their frequencies
"""
def get_end_activities(log, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name'):
"""
Get end activities and their frequencies across all cases.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- activity_key (str): Activity attribute name
- timestamp_key (str): Timestamp attribute name
- case_id_key (str): Case ID attribute name
Returns:
Dict[str, int]: End activities with their frequencies
"""
def get_event_attributes(log):
"""
Get list of all event attribute names in the log.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
Returns:
List[str]: List of event attribute names
"""
def get_event_attribute_values(log, attribute_key):
"""
Get all unique values for a specific event attribute.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- attribute_key (str): Attribute name to extract values for
Returns:
List[Any]: Unique values of the specified attribute
"""
def get_trace_attributes(log):
"""
Get list of all trace (case-level) attribute names.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
Returns:
List[str]: List of trace attribute names
"""
def get_trace_attribute_values(log, attribute_key):
"""
Get all unique values for a specific trace attribute.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- attribute_key (str): Trace attribute name
Returns:
List[Any]: Unique values of the specified trace attribute
"""Analyze process variants (unique activity sequences) and their characteristics.
def get_variants(log, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name'):
"""
Get trace variants with their corresponding case IDs.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- activity_key (str): Activity attribute name
- timestamp_key (str): Timestamp attribute name
- case_id_key (str): Case ID attribute name
Returns:
Dict[Tuple[str, ...], List[str]]: Variants mapped to list of case IDs
"""
def get_variants_as_tuples(log, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name'):
"""
Get variants as tuples with their frequencies.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- activity_key (str): Activity attribute name
- timestamp_key (str): Timestamp attribute name
- case_id_key (str): Case ID attribute name
Returns:
Dict[Tuple[str, ...], int]: Variants with their frequencies
"""
def split_by_process_variant(log, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name'):
"""
Split log into separate logs by process variant.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- activity_key (str): Activity attribute name
- timestamp_key (str): Timestamp attribute name
- case_id_key (str): Case ID attribute name
Returns:
Dict[Tuple[str, ...], Union[EventLog, pd.DataFrame]]: Variants mapped to their sub-logs
"""Analyze temporal patterns including case durations, arrival rates, and performance metrics.
def get_case_arrival_average(log, timestamp_key='time:timestamp', case_id_key='case:concept:name'):
"""
Calculate average case arrival rate (cases per time unit).
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- timestamp_key (str): Timestamp attribute name
- case_id_key (str): Case ID attribute name
Returns:
float: Average case arrival rate (cases per second)
"""
def get_all_case_durations(log, timestamp_key='time:timestamp', case_id_key='case:concept:name'):
"""
Get durations of all cases in the log.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- timestamp_key (str): Timestamp attribute name
- case_id_key (str): Case ID attribute name
Returns:
List[float]: List of case durations in seconds
"""
def get_case_duration(log, timestamp_key='time:timestamp', case_id_key='case:concept:name'):
"""
Calculate average case duration across all cases.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- timestamp_key (str): Timestamp attribute name
- case_id_key (str): Case ID attribute name
Returns:
float: Average case duration in seconds
"""
def get_cycle_time(log, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name'):
"""
Calculate cycle time of the process (end-to-end duration).
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- activity_key (str): Activity attribute name
- timestamp_key (str): Timestamp attribute name
- case_id_key (str): Case ID attribute name
Returns:
float: Average cycle time in seconds
"""
def get_service_time(log, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name'):
"""
Calculate service time for each activity.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- activity_key (str): Activity attribute name
- timestamp_key (str): Timestamp attribute name
- case_id_key (str): Case ID attribute name
Returns:
Dict[str, float]: Service times per activity in seconds
"""
def get_variants_paths_duration(log, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name'):
"""
Get durations for each variant path.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- activity_key (str): Activity attribute name
- timestamp_key (str): Timestamp attribute name
- case_id_key (str): Case ID attribute name
Returns:
Dict[Tuple[str, ...], List[float]]: Durations per variant
"""Complex statistical analysis including loops, segments, and behavioral patterns.
def get_minimum_self_distances(log, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name'):
"""
Calculate minimum self-distances for activities (loop detection).
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- activity_key (str): Activity attribute name
- timestamp_key (str): Timestamp attribute name
- case_id_key (str): Case ID attribute name
Returns:
Dict[str, int]: Minimum self-distances per activity
"""
def get_minimum_self_distance_witnesses(log, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name'):
"""
Get witness traces for minimum self-distances.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- activity_key (str): Activity attribute name
- timestamp_key (str): Timestamp attribute name
- case_id_key (str): Case ID attribute name
Returns:
Dict[str, List[str]]: Witness cases per activity
"""
def get_frequent_trace_segments(log, min_length=2, max_length=5, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name'):
"""
Extract frequent trace segments of specified lengths.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- min_length (int): Minimum segment length
- max_length (int): Maximum segment length
- activity_key (str): Activity attribute name
- timestamp_key (str): Timestamp attribute name
- case_id_key (str): Case ID attribute name
Returns:
Dict[Tuple[str, ...], int]: Frequent segments with frequencies
"""
def get_rework_cases_per_activity(log, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name'):
"""
Get count of cases with rework per activity.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- activity_key (str): Activity attribute name
- timestamp_key (str): Timestamp attribute name
- case_id_key (str): Case ID attribute name
Returns:
Dict[str, int]: Rework cases count per activity
"""
def get_case_overlap(log, timestamp_key='time:timestamp', case_id_key='case:concept:name'):
"""
Calculate case overlap measure (parallel case execution).
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- timestamp_key (str): Timestamp attribute name
- case_id_key (str): Case ID attribute name
Returns:
float: Case overlap ratio
"""
def get_activity_position_summary(log, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name'):
"""
Get position summary statistics for each activity.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- activity_key (str): Activity attribute name
- timestamp_key (str): Timestamp attribute name
- case_id_key (str): Case ID attribute name
Returns:
Dict[str, Dict[str, Any]]: Position statistics per activity
"""Generate probabilistic representations of process behavior.
def get_stochastic_language(log, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name'):
"""
Generate stochastic language from log or model.
Creates probabilistic representation of process behavior.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- activity_key (str): Activity attribute name
- timestamp_key (str): Timestamp attribute name
- case_id_key (str): Case ID attribute name
Returns:
Dict[List[str], float]: Traces mapped to their probabilities
"""Advanced analytical functions for process model evaluation and manipulation.
def check_soundness(petri_net, initial_marking, final_marking):
"""
Check if Petri net is sound (proper termination, no deadlocks).
Parameters:
- petri_net (PetriNet): Petri net model
- initial_marking (Marking): Initial marking
- final_marking (Marking): Final marking
Returns:
bool: True if the Petri net is sound
"""
def check_is_workflow_net(petri_net):
"""
Check if Petri net is a workflow net (single source, single sink).
Parameters:
- petri_net (PetriNet): Petri net model
Returns:
bool: True if it's a workflow net
"""
def simplicity_petri_net(petri_net):
"""
Calculate simplicity metric of Petri net.
Parameters:
- petri_net (PetriNet): Petri net model
Returns:
float: Simplicity value between 0 and 1
"""Mathematical analysis functions for process models and languages.
def compute_emd(language1, language2):
"""
Compute Earth Mover Distance between two stochastic languages.
Parameters:
- language1 (Dict): First stochastic language
- language2 (Dict): Second stochastic language
Returns:
float: Earth Mover Distance value
"""
def solve_marking_equation(petri_net, initial_marking, final_marking, cost_function=None):
"""
Solve marking equation for Petri net reachability.
Parameters:
- petri_net (PetriNet): Petri net model
- initial_marking (Marking): Initial marking
- final_marking (Marking): Target marking
- cost_function (Optional[Callable]): Cost function for optimization
Returns:
float: Solution cost or distance
"""
def solve_extended_marking_equation(petri_net, initial_marking, final_marking, **kwargs):
"""
Solve extended marking equation with additional constraints.
Parameters:
- petri_net (PetriNet): Petri net model
- initial_marking (Marking): Initial marking
- final_marking (Marking): Target marking
- **kwargs: Additional parameters and constraints
Returns:
Dict[str, Any]: Solution with detailed information
"""Calculate similarity between models, logs, and process representations.
def behavioral_similarity(model1, model2, **kwargs):
"""
Calculate behavioral similarity between two process models.
Parameters:
- model1 (Any): First process model
- model2 (Any): Second process model
- **kwargs: Similarity computation parameters
Returns:
float: Behavioral similarity score (0-1)
"""
def structural_similarity(model1, model2, **kwargs):
"""
Calculate structural similarity between two process models.
Parameters:
- model1 (Any): First process model
- model2 (Any): Second process model
- **kwargs: Similarity computation parameters
Returns:
float: Structural similarity score (0-1)
"""
def embeddings_similarity(log1, log2, **kwargs):
"""
Calculate embeddings-based similarity between event logs.
Parameters:
- log1 (Union[EventLog, pd.DataFrame]): First event log
- log2 (Union[EventLog, pd.DataFrame]): Second event log
- **kwargs: Embedding parameters
Returns:
float: Embeddings similarity score (0-1)
"""
def label_sets_similarity(model1, model2, **kwargs):
"""
Calculate label set similarity between models.
Parameters:
- model1 (Any): First process model
- model2 (Any): Second process model
- **kwargs: Similarity parameters
Returns:
float: Label set similarity score (0-1)
"""Utility functions for model manipulation and analysis.
def get_enabled_transitions(petri_net, marking):
"""
Get list of transitions enabled in specific marking.
Parameters:
- petri_net (PetriNet): Petri net model
- marking (Marking): Current marking
Returns:
List[PetriNet.Transition]: List of enabled transitions
"""
def get_activity_labels(model):
"""
Get set of activity labels from process model.
Parameters:
- model (Any): Process model (Petri net, process tree, etc.)
Returns:
Set[str]: Set of activity labels
"""
def replace_activity_labels(model, replacement_dict):
"""
Replace activity labels in process model.
Parameters:
- model (Any): Process model to modify
- replacement_dict (Dict[str, str]): Label replacement mapping
Returns:
Any: Modified process model
"""
def map_labels_from_second_model(model1, model2):
"""
Create label mapping between two models.
Parameters:
- model1 (Any): First process model
- model2 (Any): Second process model
Returns:
Dict[str, str]: Label mapping from model1 to model2
"""import pm4py
# Load event log
log = pm4py.read_xes('event_log.xes')
# Basic statistics
start_activities = pm4py.get_start_activities(log)
end_activities = pm4py.get_end_activities(log)
print("Start Activities:")
for activity, count in sorted(start_activities.items(), key=lambda x: x[1], reverse=True):
print(f" {activity}: {count}")
print("End Activities:")
for activity, count in sorted(end_activities.items(), key=lambda x: x[1], reverse=True):
print(f" {activity}: {count}")
# Attribute analysis
event_attributes = pm4py.get_event_attributes(log)
trace_attributes = pm4py.get_trace_attributes(log)
print(f"Event attributes: {event_attributes}")
print(f"Trace attributes: {trace_attributes}")import pm4py
# Get variants with frequencies
variants = pm4py.get_variants_as_tuples(log)
print(f"Total variants: {len(variants)}")
print("Top 10 variants:")
for variant, count in sorted(variants.items(), key=lambda x: x[1], reverse=True)[:10]:
print(f" {' -> '.join(variant)}: {count} cases")
# Split log by variants
variant_logs = pm4py.split_by_process_variant(log)
print("Variant analysis:")
for variant, sub_log in variant_logs.items():
case_count = len(sub_log)
avg_duration = pm4py.get_case_duration(sub_log)
print(f" Variant with {len(variant)} steps: {case_count} cases, avg duration: {avg_duration/3600:.1f} hours")import pm4py
# Case duration analysis
all_durations = pm4py.get_all_case_durations(log)
avg_duration = pm4py.get_case_duration(log)
cycle_time = pm4py.get_cycle_time(log)
print(f"Case Duration Statistics:")
print(f" Average: {avg_duration/3600:.1f} hours")
print(f" Cycle time: {cycle_time/3600:.1f} hours")
print(f" Min: {min(all_durations)/3600:.1f} hours")
print(f" Max: {max(all_durations)/3600:.1f} hours")
# Arrival rate analysis
arrival_rate = pm4py.get_case_arrival_average(log)
print(f" Arrival rate: {arrival_rate*3600:.1f} cases/hour")
# Service time analysis
service_times = pm4py.get_service_time(log)
print("Service Times:")
for activity, time in sorted(service_times.items(), key=lambda x: x[1], reverse=True):
print(f" {activity}: {time/60:.1f} minutes")import pm4py
# Loop analysis
self_distances = pm4py.get_minimum_self_distances(log)
witnesses = pm4py.get_minimum_self_distance_witnesses(log)
print("Loop Analysis:")
for activity, distance in self_distances.items():
if distance > 1: # Activity can loop
print(f" {activity}: min distance {distance} (witness cases: {len(witnesses[activity])})")
# Rework analysis
rework_cases = pm4py.get_rework_cases_per_activity(log)
total_cases = len(set(log['case:concept:name']) if isinstance(log, pd.DataFrame) else log)
print("Rework Analysis:")
for activity, rework_count in rework_cases.items():
rework_percentage = (rework_count / total_cases) * 100
print(f" {activity}: {rework_count} cases ({rework_percentage:.1f}%)")
# Case overlap analysis
overlap = pm4py.get_case_overlap(log)
print(f"Case Overlap: {overlap:.3f}")
# Activity position analysis
position_summary = pm4py.get_activity_position_summary(log)
print("Activity Position Summary:")
for activity, stats in position_summary.items():
print(f" {activity}:")
print(f" Avg position: {stats['mean_position']:.1f}")
print(f" Position range: {stats['min_position']} - {stats['max_position']}")import pm4py
# Find frequent trace segments
frequent_segments = pm4py.get_frequent_trace_segments(
log,
min_length=2,
max_length=4
)
print("Frequent Trace Segments:")
for segment, frequency in sorted(frequent_segments.items(), key=lambda x: x[1], reverse=True)[:20]:
print(f" {' -> '.join(segment)}: {frequency} occurrences")
# Variant duration analysis
variant_durations = pm4py.get_variants_paths_duration(log)
print("Variant Performance Analysis:")
for variant, durations in variant_durations.items():
if len(durations) >= 5: # Only variants with sufficient data
avg_duration = sum(durations) / len(durations)
print(f" {' -> '.join(variant[:3])}: {avg_duration/3600:.1f}h avg ({len(durations)} cases)")import pm4py
# Discover model
net, initial_marking, final_marking = pm4py.discover_petri_net_inductive(log)
# Check model properties
is_sound = pm4py.check_soundness(net, initial_marking, final_marking)
is_workflow = pm4py.check_is_workflow_net(net)
simplicity = pm4py.simplicity_petri_net(net)
print("Model Quality Assessment:")
print(f" Sound: {is_sound}")
print(f" Workflow net: {is_workflow}")
print(f" Simplicity: {simplicity:.3f}")
# Get model labels
activity_labels = pm4py.get_activity_labels(net)
print(f" Activities in model: {len(activity_labels)}")
print(f" Activity labels: {sorted(activity_labels)}")
# Check enabled transitions in initial marking
enabled = pm4py.get_enabled_transitions(net, initial_marking)
print(f" Initially enabled transitions: {len(enabled)}")import pm4py
# Generate stochastic language
stochastic_lang = pm4py.get_stochastic_language(log)
print("Stochastic Language Analysis:")
print(f" Unique traces: {len(stochastic_lang)}")
print(f" Most probable traces:")
# Show top traces by probability
sorted_traces = sorted(stochastic_lang.items(), key=lambda x: x[1], reverse=True)[:10]
for trace, prob in sorted_traces:
trace_str = ' -> '.join(trace[:5]) # Limit length for display
if len(trace) > 5:
trace_str += "..."
print(f" {trace_str}: {prob:.4f}")
# Calculate entropy
import math
entropy = -sum(p * math.log2(p) for p in stochastic_lang.values() if p > 0)
print(f" Process entropy: {entropy:.3f} bits")import pm4py
# Discover two different models
net1, im1, fm1 = pm4py.discover_petri_net_inductive(log)
net2, im2, fm2 = pm4py.discover_petri_net_heuristics(log)
# Calculate similarities
behavioral_sim = pm4py.behavioral_similarity(net1, net2)
structural_sim = pm4py.structural_similarity(net1, net2)
label_sim = pm4py.label_sets_similarity(net1, net2)
print("Model Similarity Analysis:")
print(f" Behavioral similarity: {behavioral_sim:.3f}")
print(f" Structural similarity: {structural_sim:.3f}")
print(f" Label set similarity: {label_sim:.3f}")
# Create label mapping
label_mapping = pm4py.map_labels_from_second_model(net1, net2)
print(f" Common labels: {len(label_mapping)}")
# Compare model languages
lang1 = pm4py.get_stochastic_language(log) # Would use model if available
lang2 = pm4py.get_stochastic_language(log) # Would use different model
# emd_distance = pm4py.compute_emd(lang1, lang2)
# print(f" Earth Mover Distance: {emd_distance:.3f}")import pm4py
def comprehensive_process_analysis(log):
"""Generate comprehensive process analysis report."""
print("=" * 60)
print("COMPREHENSIVE PROCESS ANALYSIS REPORT")
print("=" * 60)
# Basic statistics
total_cases = len(set(log['case:concept:name']) if isinstance(log, pd.DataFrame) else log)
total_events = len(log)
print(f"Dataset Overview:")
print(f" Cases: {total_cases:,}")
print(f" Events: {total_events:,}")
print(f" Events per case: {total_events/total_cases:.1f}")
# Temporal analysis
durations = pm4py.get_all_case_durations(log)
avg_duration = sum(durations) / len(durations)
print(f"\nTemporal Analysis:")
print(f" Average case duration: {avg_duration/3600:.1f} hours")
print(f" Shortest case: {min(durations)/60:.1f} minutes")
print(f" Longest case: {max(durations)/3600:.1f} hours")
# Variant analysis
variants = pm4py.get_variants_as_tuples(log)
variant_coverage = sum(sorted(variants.values(), reverse=True)[:10]) / total_cases
print(f"\nVariant Analysis:")
print(f" Total variants: {len(variants)}")
print(f" Top 10 variants cover: {variant_coverage:.1%} of cases")
# Behavioral patterns
rework = pm4py.get_rework_cases_per_activity(log)
total_rework = sum(rework.values())
print(f"\nBehavioral Patterns:")
print(f" Cases with rework: {total_rework} ({total_rework/total_cases:.1%})")
# Process model quality
net, im, fm = pm4py.discover_petri_net_inductive(log)
fitness = pm4py.fitness_alignments(log, net, im, fm)
precision = pm4py.precision_alignments(log, net, im, fm)
print(f"\nProcess Model Quality:")
print(f" Fitness: {fitness['log_fitness']:.3f}")
print(f" Precision: {precision:.3f}")
print(f" Soundness: {pm4py.check_soundness(net, im, fm)}")
return {
'cases': total_cases,
'events': total_events,
'avg_duration': avg_duration,
'variants': len(variants),
'rework_rate': total_rework/total_cases,
'fitness': fitness['log_fitness'],
'precision': precision
}
# Run comprehensive analysis
analysis_results = comprehensive_process_analysis(log)Install with Tessl CLI
npx tessl i tessl/pypi-pm4py