Process mining library for discovering, analyzing and visualizing business processes from event data
—
Utility functions for data manipulation, format conversion, and model transformation between different representations. PM4PY provides comprehensive tools for data preprocessing, serialization, and interoperability between various process mining formats and tools.
Core utilities for formatting and preparing data for process mining analysis.
def format_dataframe(df, case_id='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp', start_timestamp_key='start_timestamp', timest_format=None):
"""
Format DataFrame for process mining with proper column names and data types.
Parameters:
- df (pd.DataFrame): Input DataFrame to format
- case_id (str): Column name for case identifier
- activity_key (str): Column name for activity
- timestamp_key (str): Column name for timestamp
- start_timestamp_key (str): Column name for start timestamp (optional)
- timest_format (Optional[str]): Timestamp format string
Returns:
pd.DataFrame: Formatted DataFrame ready for process mining
"""
def get_properties(log, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name', resource_key='org:resource', group_key=None, start_timestamp_key=None, **kwargs):
"""
Retrieve properties from a log object.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log to extract properties from
- activity_key (str): Column name for activity attribute
- timestamp_key (str): Column name for timestamp attribute
- case_id_key (str): Column name for case identifier attribute
- resource_key (str): Column name for resource attribute
- group_key (Optional[str]): Optional column name for group identifier
- start_timestamp_key (Optional[str]): Optional column name for start timestamp
- **kwargs: Additional keyword arguments
Returns:
dict: Dictionary of properties extracted from the log
"""Parse model representations from strings and serialize/deserialize PM4PY objects.
def parse_process_tree(process_tree_string):
"""
Parse process tree from string representation.
Parameters:
- process_tree_string (str): String representation of process tree
Returns:
ProcessTree: Parsed process tree object
"""
def parse_powl_model_string(powl_string):
"""
Parse POWL model from string representation.
Parameters:
- powl_string (str): String representation of POWL model
Returns:
POWL: Parsed POWL model object
"""
def parse_event_log_string(event_log_string):
"""
Parse event log from string representation.
Parameters:
- event_log_string (str): String representation of event log
Returns:
EventLog: Parsed event log object
"""
def serialize(obj, file_path):
"""
Serialize PM4PY object to file for persistence.
Parameters:
- obj (Any): PM4PY object to serialize
- file_path (str): Path to save serialized object
Returns:
None
"""
def deserialize(file_path):
"""
Deserialize PM4PY object from file.
Parameters:
- file_path (str): Path to serialized object file
Returns:
Any: Deserialized PM4PY object
"""Functions for modifying and manipulating event log data.
def set_classifier(log, classifier_key):
"""
Set event classifier for log (changes the activity attribute used).
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log to modify
- classifier_key (str): New classifier attribute name
Returns:
Union[EventLog, pd.DataFrame]: Log with updated classifier
"""
def project_on_event_attribute(log, attribute_key):
"""
Project log on specific event attribute (filter events by attribute presence).
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- attribute_key (str): Attribute to project on
Returns:
Union[EventLog, pd.DataFrame]: Projected event log
"""
def sample_cases(log, n_cases):
"""
Sample n random cases from event log.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- n_cases (int): Number of cases to sample
Returns:
Union[EventLog, pd.DataFrame]: Sampled event log
"""
def sample_events(log, n_events):
"""
Sample n random events from event log.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- n_events (int): Number of events to sample
Returns:
Union[EventLog, pd.DataFrame]: Sampled event log
"""
def rebase(log, case_id_key='case:concept:name'):
"""
Rebase log timestamps to start from zero (normalize temporal data).
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- case_id_key (str): Case ID attribute name
Returns:
Union[EventLog, pd.DataFrame]: Rebased event log
"""Convert between different event log representations and formats.
def convert_to_event_log(obj, case_id_key='case:concept:name', **kwargs):
"""
Convert DataFrame or EventStream to EventLog object.
Parameters:
- obj (Union[pd.DataFrame, EventStream]): Object to convert
- case_id_key (str): Case identifier column
- **kwargs: Additional conversion parameters
Returns:
EventLog: Converted event log object
"""
def convert_to_event_stream(obj, case_id_key='case:concept:name', **kwargs):
"""
Convert log or DataFrame to EventStream format.
Parameters:
- obj (Union[EventLog, pd.DataFrame]): Object to convert
- case_id_key (str): Case identifier column
- **kwargs: Additional conversion parameters
Returns:
EventStream: Converted event stream object
"""
def convert_to_dataframe(obj, case_id_key='case:concept:name', **kwargs):
"""
Convert log objects to pandas DataFrame.
Parameters:
- obj (Union[EventLog, EventStream]): Object to convert
- case_id_key (str): Case identifier column
- **kwargs: Additional conversion parameters
Returns:
pd.DataFrame: Converted DataFrame
"""Convert between different process model representations.
def convert_to_bpmn(*args, **kwargs):
"""
Convert various models (Petri net, process tree) to BPMN format.
Parameters:
- *args: Model objects to convert
- **kwargs: Conversion parameters
Returns:
BPMN: Converted BPMN model
"""
def convert_to_petri_net(*args, **kwargs):
"""
Convert various models (process tree, BPMN, DFG) to Petri net.
Parameters:
- *args: Model objects to convert
- **kwargs: Conversion parameters
Returns:
Tuple[PetriNet, Marking, Marking]: Converted Petri net with markings
"""
def convert_to_process_tree(*args, **kwargs):
"""
Convert various models (Petri net, BPMN) to process tree.
Parameters:
- *args: Model objects to convert
- **kwargs: Conversion parameters
Returns:
ProcessTree: Converted process tree
"""
def convert_to_reachability_graph(petri_net, initial_marking, **kwargs):
"""
Convert Petri net to reachability graph (state space exploration).
Parameters:
- petri_net (PetriNet): Petri net model
- initial_marking (Marking): Initial marking
- **kwargs: Conversion parameters
Returns:
TransitionSystem: Reachability graph as transition system
"""
def convert_to_powl(*args, **kwargs):
"""
Convert various models to POWL (Partially Ordered Workflow Language).
Parameters:
- *args: Model objects to convert
- **kwargs: Conversion parameters
Returns:
POWL: Converted POWL model
"""Convert process mining objects to NetworkX graphs for network analysis.
def convert_log_to_networkx(log, **kwargs):
"""
Convert event log to NetworkX directed graph.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- **kwargs: Graph construction parameters
Returns:
nx.DiGraph: NetworkX directed graph representation
"""
def convert_ocel_to_networkx(ocel, **kwargs):
"""
Convert Object-Centric Event Log to NetworkX graph.
Parameters:
- ocel (OCEL): Object-centric event log
- **kwargs: Graph construction parameters
Returns:
nx.DiGraph: NetworkX directed graph representation
"""
def convert_petri_net_to_networkx(petri_net, initial_marking, **kwargs):
"""
Convert Petri net to NetworkX graph representation.
Parameters:
- petri_net (PetriNet): Petri net model
- initial_marking (Marking): Initial marking
- **kwargs: Graph construction parameters
Returns:
nx.DiGraph: NetworkX directed graph representation
"""Specialized conversion functions for specific use cases.
def convert_log_to_ocel(log, **kwargs):
"""
Convert traditional event log to Object-Centric Event Log.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Traditional event log
- **kwargs: OCEL conversion parameters
Returns:
OCEL: Converted object-centric event log
"""
def convert_log_to_time_intervals(log, **kwargs):
"""
Convert event log to time interval representation.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- **kwargs: Interval conversion parameters
Returns:
pd.DataFrame: Time intervals DataFrame
"""
def convert_petri_net_type(petri_net, target_type, **kwargs):
"""
Convert between different Petri net types and representations.
Parameters:
- petri_net (PetriNet): Source Petri net
- target_type (str): Target Petri net type
- **kwargs: Conversion parameters
Returns:
PetriNet: Converted Petri net
"""import pm4py
import pandas as pd
# Load raw data
raw_data = pd.read_csv('process_data.csv')
# Format for process mining
formatted_log = pm4py.format_dataframe(
raw_data,
case_id='CaseID',
activity_key='Activity',
timestamp_key='Timestamp'
)
print("Formatted log ready for process mining")
print(f"Cases: {formatted_log['case:concept:name'].nunique()}")
print(f"Events: {len(formatted_log)}")
print(f"Activities: {formatted_log['concept:name'].nunique()}")
# Rebase timestamps to start from zero
rebased_log = pm4py.rebase(formatted_log)
print("Timestamps rebased to start from zero")import pm4py
# Load full event log
log = pm4py.read_xes('large_event_log.xes')
print(f"Original log: {len(log)} cases")
# Sample subset for analysis
sample_log = pm4py.sample_cases(log, 1000)
print(f"Sampled log: {len(sample_log)} cases")
# Sample specific number of events
event_sample = pm4py.sample_events(log, 10000)
print(f"Event sample: {len(event_sample)} events")
# Change classifier
classified_log = pm4py.set_classifier(log, 'org:resource')
print("Changed classifier to resource")
# Project on specific attribute
projected_log = pm4py.project_on_event_attribute(log, 'lifecycle:transition')
print("Projected on lifecycle transition")import pm4py
# Discover process tree
log = pm4py.read_xes('event_log.xes')
tree = pm4py.discover_process_tree_inductive(log)
# Convert to different model formats
net, initial_marking, final_marking = pm4py.convert_to_petri_net(tree)
print("Converted process tree to Petri net")
bpmn_model = pm4py.convert_to_bpmn(tree)
print("Converted process tree to BPMN")
powl_model = pm4py.convert_to_powl(tree)
print("Converted process tree to POWL")
# Convert Petri net to reachability graph
reachability = pm4py.convert_to_reachability_graph(net, initial_marking)
print("Converted Petri net to reachability graph")import pm4py
# Convert between different log formats
log = pm4py.read_xes('event_log.xes')
# Convert to DataFrame
df = pm4py.convert_to_dataframe(log)
print(f"Converted to DataFrame: {len(df)} rows")
# Convert DataFrame back to EventLog
event_log = pm4py.convert_to_event_log(df)
print("Converted DataFrame back to EventLog")
# Convert to EventStream
event_stream = pm4py.convert_to_event_stream(log)
print("Converted to EventStream")
# Convert traditional log to OCEL
ocel = pm4py.convert_log_to_ocel(
log,
object_type_column='case:concept:name',
object_type_name='Case'
)
print("Converted traditional log to OCEL")import pm4py
import networkx as nx
# Convert log to NetworkX graph
log = pm4py.read_xes('event_log.xes')
G = pm4py.convert_log_to_networkx(log)
print(f"NetworkX graph: {G.number_of_nodes()} nodes, {G.number_of_edges()} edges")
# Analyze graph properties
print(f"Graph density: {nx.density(G):.3f}")
print(f"Strongly connected: {nx.is_strongly_connected(G)}")
# Convert Petri net to NetworkX
net, im, fm = pm4py.discover_petri_net_inductive(log)
net_graph = pm4py.convert_petri_net_to_networkx(net, im)
print(f"Petri net graph: {net_graph.number_of_nodes()} nodes")
# Convert OCEL to NetworkX
ocel = pm4py.read_ocel('ocel_data.csv')
ocel_graph = pm4py.convert_ocel_to_networkx(ocel)
print(f"OCEL graph: {ocel_graph.number_of_nodes()} nodes")import pm4py
# Parse models from string representations
tree_string = "->('A', +('B', 'C'), 'D')"
tree = pm4py.parse_process_tree(tree_string)
print("Parsed process tree from string")
# Serialize model for later use
pm4py.serialize(tree, 'process_tree.pkl')
print("Serialized process tree")
# Deserialize model
loaded_tree = pm4py.deserialize('process_tree.pkl')
print("Deserialized process tree")
# Parse event log from string
log_string = """
Case1: A, B, C
Case2: A, C, B
Case3: A, B, B, C
"""
parsed_log = pm4py.parse_event_log_string(log_string)
print("Parsed event log from string")import pm4py
# Convert log to time intervals
log = pm4py.read_xes('event_log.xes')
intervals = pm4py.convert_log_to_time_intervals(log)
print("Time Intervals Analysis:")
print(f"Intervals: {len(intervals)}")
print(intervals[['case_id', 'activity', 'start_time', 'end_time', 'duration']].head())
# Analyze interval patterns
avg_duration = intervals['duration'].mean()
max_duration = intervals['duration'].max()
print(f"Average interval duration: {avg_duration/60:.1f} minutes")
print(f"Maximum interval duration: {max_duration/3600:.1f} hours")import pm4py
import os
def convert_process_models(input_dir, output_dir):
"""Convert all process trees in directory to multiple formats."""
os.makedirs(output_dir, exist_ok=True)
for filename in os.listdir(input_dir):
if filename.endswith('.ptml'):
filepath = os.path.join(input_dir, filename)
base_name = filename[:-5] # Remove .ptml extension
# Read process tree
tree = pm4py.read_ptml(filepath)
# Convert to different formats
net, im, fm = pm4py.convert_to_petri_net(tree)
bpmn = pm4py.convert_to_bpmn(tree)
# Save in multiple formats
pm4py.write_pnml(net, im, fm, os.path.join(output_dir, f"{base_name}.pnml"))
pm4py.write_bpmn(bpmn, os.path.join(output_dir, f"{base_name}.bpmn"))
# Serialize for PM4PY
pm4py.serialize(tree, os.path.join(output_dir, f"{base_name}.pkl"))
print(f"Converted {filename} to multiple formats")
# Run batch conversion
convert_process_models('input_models/', 'output_models/')import pm4py
import pandas as pd
def enhance_log_quality(raw_log):
"""Comprehensive data quality enhancement pipeline."""
print("Starting data quality enhancement...")
# 1. Format DataFrame properly
if isinstance(raw_log, pd.DataFrame):
log = pm4py.format_dataframe(raw_log)
else:
log = raw_log
print(f"Original: {len(log)} events")
# 2. Remove incomplete cases (less than 2 events)
log = pm4py.filter_case_size(log, min_size=2, max_size=float('inf'))
print(f"After min size filter: {len(log)} events")
# 3. Remove extreme durations (outliers)
durations = pm4py.get_all_case_durations(log)
q1 = pd.Series(durations).quantile(0.25)
q3 = pd.Series(durations).quantile(0.75)
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
log = pm4py.filter_case_performance(log, lower_bound, upper_bound)
print(f"After duration outlier removal: {len(log)} events")
# 4. Keep only most frequent variants (80% coverage)
log = pm4py.filter_variants_by_coverage_percentage(log, 0.8)
print(f"After variant filtering: {len(log)} events")
# 5. Rebase timestamps
log = pm4py.rebase(log)
print("Timestamps rebased")
# 6. Sample if too large
if len(set(log['case:concept:name'])) > 5000:
log = pm4py.sample_cases(log, 5000)
print(f"Sampled to: {len(log)} events")
return log
# Apply quality enhancement
raw_data = pd.read_csv('messy_process_data.csv')
clean_log = enhance_log_quality(raw_data)import pm4py
def export_analysis_results(log, output_prefix='analysis'):
"""Export process mining analysis in multiple formats."""
# Discover models
net, im, fm = pm4py.discover_petri_net_inductive(log)
tree = pm4py.discover_process_tree_inductive(log)
dfg, start_acts, end_acts = pm4py.discover_dfg(log)
# Export logs
pm4py.write_xes(log, f'{output_prefix}_log.xes')
df = pm4py.convert_to_dataframe(log)
df.to_csv(f'{output_prefix}_log.csv', index=False)
# Export models
pm4py.write_pnml(net, im, fm, f'{output_prefix}_petri_net.pnml')
pm4py.write_ptml(tree, f'{output_prefix}_process_tree.ptml')
pm4py.write_dfg(dfg, start_acts, end_acts, f'{output_prefix}_dfg.dfg')
# Convert to NetworkX and export
G = pm4py.convert_log_to_networkx(log)
# Could export with nx.write_gml(G, f'{output_prefix}_graph.gml')
# Serialize PM4PY objects
pm4py.serialize(net, f'{output_prefix}_petri_net.pkl')
pm4py.serialize(tree, f'{output_prefix}_process_tree.pkl')
print(f"Analysis results exported with prefix: {output_prefix}")
# Export everything
export_analysis_results(log, 'my_process_analysis')Install with Tessl CLI
npx tessl i tessl/pypi-pm4py