CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pm4py

Process mining library for discovering, analyzing and visualizing business processes from event data

Pending
Overview
Eval results
Files

utilities-conversion.mddocs/

Utilities and Conversion

Utility functions for data manipulation, format conversion, and model transformation between different representations. PM4PY provides comprehensive tools for data preprocessing, serialization, and interoperability between various process mining formats and tools.

Capabilities

Data Format Utilities

Core utilities for formatting and preparing data for process mining analysis.

def format_dataframe(df, case_id='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp', start_timestamp_key='start_timestamp', timest_format=None):
    """
    Format DataFrame for process mining with proper column names and data types.
    
    Parameters:
    - df (pd.DataFrame): Input DataFrame to format
    - case_id (str): Column name for case identifier
    - activity_key (str): Column name for activity
    - timestamp_key (str): Column name for timestamp
    - start_timestamp_key (str): Column name for start timestamp (optional)
    - timest_format (Optional[str]): Timestamp format string
    
    Returns:
    pd.DataFrame: Formatted DataFrame ready for process mining
    """

def get_properties(log, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name', resource_key='org:resource', group_key=None, start_timestamp_key=None, **kwargs):
    """
    Retrieve properties from a log object.
    
    Parameters:
    - log (Union[EventLog, pd.DataFrame]): Event log to extract properties from
    - activity_key (str): Column name for activity attribute
    - timestamp_key (str): Column name for timestamp attribute
    - case_id_key (str): Column name for case identifier attribute
    - resource_key (str): Column name for resource attribute
    - group_key (Optional[str]): Optional column name for group identifier
    - start_timestamp_key (Optional[str]): Optional column name for start timestamp
    - **kwargs: Additional keyword arguments
    
    Returns:
    dict: Dictionary of properties extracted from the log
    """

Model Parsing and Serialization

Parse model representations from strings and serialize/deserialize PM4PY objects.

def parse_process_tree(process_tree_string):
    """
    Parse process tree from string representation.
    
    Parameters:
    - process_tree_string (str): String representation of process tree
    
    Returns:
    ProcessTree: Parsed process tree object
    """

def parse_powl_model_string(powl_string):
    """
    Parse POWL model from string representation.
    
    Parameters:
    - powl_string (str): String representation of POWL model
    
    Returns:
    POWL: Parsed POWL model object
    """

def parse_event_log_string(event_log_string):
    """
    Parse event log from string representation.
    
    Parameters:
    - event_log_string (str): String representation of event log
    
    Returns:
    EventLog: Parsed event log object
    """

def serialize(obj, file_path):
    """
    Serialize PM4PY object to file for persistence.
    
    Parameters:
    - obj (Any): PM4PY object to serialize
    - file_path (str): Path to save serialized object
    
    Returns:
    None
    """

def deserialize(file_path):
    """
    Deserialize PM4PY object from file.
    
    Parameters:
    - file_path (str): Path to serialized object file
    
    Returns:
    Any: Deserialized PM4PY object
    """

Log Manipulation Utilities

Functions for modifying and manipulating event log data.

def set_classifier(log, classifier_key):
    """
    Set event classifier for log (changes the activity attribute used).
    
    Parameters:
    - log (Union[EventLog, pd.DataFrame]): Event log to modify
    - classifier_key (str): New classifier attribute name
    
    Returns:
    Union[EventLog, pd.DataFrame]: Log with updated classifier
    """

def project_on_event_attribute(log, attribute_key):
    """
    Project log on specific event attribute (filter events by attribute presence).
    
    Parameters:
    - log (Union[EventLog, pd.DataFrame]): Event log data
    - attribute_key (str): Attribute to project on
    
    Returns:
    Union[EventLog, pd.DataFrame]: Projected event log
    """

def sample_cases(log, n_cases):
    """
    Sample n random cases from event log.
    
    Parameters:
    - log (Union[EventLog, pd.DataFrame]): Event log data
    - n_cases (int): Number of cases to sample
    
    Returns:
    Union[EventLog, pd.DataFrame]: Sampled event log
    """

def sample_events(log, n_events):
    """
    Sample n random events from event log.
    
    Parameters:
    - log (Union[EventLog, pd.DataFrame]): Event log data
    - n_events (int): Number of events to sample
    
    Returns:
    Union[EventLog, pd.DataFrame]: Sampled event log
    """

def rebase(log, case_id_key='case:concept:name'):
    """
    Rebase log timestamps to start from zero (normalize temporal data).
    
    Parameters:
    - log (Union[EventLog, pd.DataFrame]): Event log data
    - case_id_key (str): Case ID attribute name
    
    Returns:
    Union[EventLog, pd.DataFrame]: Rebased event log
    """

Log Format Conversions

Convert between different event log representations and formats.

def convert_to_event_log(obj, case_id_key='case:concept:name', **kwargs):
    """
    Convert DataFrame or EventStream to EventLog object.
    
    Parameters:
    - obj (Union[pd.DataFrame, EventStream]): Object to convert
    - case_id_key (str): Case identifier column
    - **kwargs: Additional conversion parameters
    
    Returns:
    EventLog: Converted event log object
    """

def convert_to_event_stream(obj, case_id_key='case:concept:name', **kwargs):
    """
    Convert log or DataFrame to EventStream format.
    
    Parameters:
    - obj (Union[EventLog, pd.DataFrame]): Object to convert
    - case_id_key (str): Case identifier column
    - **kwargs: Additional conversion parameters
    
    Returns:
    EventStream: Converted event stream object
    """

def convert_to_dataframe(obj, case_id_key='case:concept:name', **kwargs):
    """
    Convert log objects to pandas DataFrame.
    
    Parameters:
    - obj (Union[EventLog, EventStream]): Object to convert
    - case_id_key (str): Case identifier column
    - **kwargs: Additional conversion parameters
    
    Returns:
    pd.DataFrame: Converted DataFrame
    """

Model Conversions

Convert between different process model representations.

def convert_to_bpmn(*args, **kwargs):
    """
    Convert various models (Petri net, process tree) to BPMN format.
    
    Parameters:
    - *args: Model objects to convert
    - **kwargs: Conversion parameters
    
    Returns:
    BPMN: Converted BPMN model
    """

def convert_to_petri_net(*args, **kwargs):
    """
    Convert various models (process tree, BPMN, DFG) to Petri net.
    
    Parameters:
    - *args: Model objects to convert
    - **kwargs: Conversion parameters
    
    Returns:
    Tuple[PetriNet, Marking, Marking]: Converted Petri net with markings
    """

def convert_to_process_tree(*args, **kwargs):
    """
    Convert various models (Petri net, BPMN) to process tree.
    
    Parameters:
    - *args: Model objects to convert
    - **kwargs: Conversion parameters
    
    Returns:
    ProcessTree: Converted process tree
    """

def convert_to_reachability_graph(petri_net, initial_marking, **kwargs):
    """
    Convert Petri net to reachability graph (state space exploration).
    
    Parameters:
    - petri_net (PetriNet): Petri net model
    - initial_marking (Marking): Initial marking
    - **kwargs: Conversion parameters
    
    Returns:
    TransitionSystem: Reachability graph as transition system
    """

def convert_to_powl(*args, **kwargs):
    """
    Convert various models to POWL (Partially Ordered Workflow Language).
    
    Parameters:
    - *args: Model objects to convert
    - **kwargs: Conversion parameters
    
    Returns:
    POWL: Converted POWL model
    """

NetworkX Integration

Convert process mining objects to NetworkX graphs for network analysis.

def convert_log_to_networkx(log, **kwargs):
    """
    Convert event log to NetworkX directed graph.
    
    Parameters:
    - log (Union[EventLog, pd.DataFrame]): Event log data
    - **kwargs: Graph construction parameters
    
    Returns:
    nx.DiGraph: NetworkX directed graph representation
    """

def convert_ocel_to_networkx(ocel, **kwargs):
    """
    Convert Object-Centric Event Log to NetworkX graph.
    
    Parameters:
    - ocel (OCEL): Object-centric event log
    - **kwargs: Graph construction parameters
    
    Returns:
    nx.DiGraph: NetworkX directed graph representation
    """

def convert_petri_net_to_networkx(petri_net, initial_marking, **kwargs):
    """
    Convert Petri net to NetworkX graph representation.
    
    Parameters:
    - petri_net (PetriNet): Petri net model
    - initial_marking (Marking): Initial marking
    - **kwargs: Graph construction parameters
    
    Returns:
    nx.DiGraph: NetworkX directed graph representation
    """

Special Conversions

Specialized conversion functions for specific use cases.

def convert_log_to_ocel(log, **kwargs):
    """
    Convert traditional event log to Object-Centric Event Log.
    
    Parameters:
    - log (Union[EventLog, pd.DataFrame]): Traditional event log
    - **kwargs: OCEL conversion parameters
    
    Returns:
    OCEL: Converted object-centric event log
    """

def convert_log_to_time_intervals(log, **kwargs):
    """
    Convert event log to time interval representation.
    
    Parameters:
    - log (Union[EventLog, pd.DataFrame]): Event log data
    - **kwargs: Interval conversion parameters
    
    Returns:
    pd.DataFrame: Time intervals DataFrame
    """

def convert_petri_net_type(petri_net, target_type, **kwargs):
    """
    Convert between different Petri net types and representations.
    
    Parameters:
    - petri_net (PetriNet): Source Petri net
    - target_type (str): Target Petri net type
    - **kwargs: Conversion parameters
    
    Returns:
    PetriNet: Converted Petri net
    """

Usage Examples

Data Formatting and Preparation

import pm4py
import pandas as pd

# Load raw data
raw_data = pd.read_csv('process_data.csv')

# Format for process mining
formatted_log = pm4py.format_dataframe(
    raw_data,
    case_id='CaseID',
    activity_key='Activity',
    timestamp_key='Timestamp'
)

print("Formatted log ready for process mining")
print(f"Cases: {formatted_log['case:concept:name'].nunique()}")
print(f"Events: {len(formatted_log)}")
print(f"Activities: {formatted_log['concept:name'].nunique()}")

# Rebase timestamps to start from zero
rebased_log = pm4py.rebase(formatted_log)
print("Timestamps rebased to start from zero")

Log Sampling and Manipulation

import pm4py

# Load full event log
log = pm4py.read_xes('large_event_log.xes')
print(f"Original log: {len(log)} cases")

# Sample subset for analysis
sample_log = pm4py.sample_cases(log, 1000)
print(f"Sampled log: {len(sample_log)} cases")

# Sample specific number of events
event_sample = pm4py.sample_events(log, 10000)
print(f"Event sample: {len(event_sample)} events")

# Change classifier
classified_log = pm4py.set_classifier(log, 'org:resource')
print("Changed classifier to resource")

# Project on specific attribute
projected_log = pm4py.project_on_event_attribute(log, 'lifecycle:transition')
print("Projected on lifecycle transition")

Model Format Conversions

import pm4py

# Discover process tree
log = pm4py.read_xes('event_log.xes')
tree = pm4py.discover_process_tree_inductive(log)

# Convert to different model formats
net, initial_marking, final_marking = pm4py.convert_to_petri_net(tree)
print("Converted process tree to Petri net")

bpmn_model = pm4py.convert_to_bpmn(tree)
print("Converted process tree to BPMN")

powl_model = pm4py.convert_to_powl(tree)
print("Converted process tree to POWL")

# Convert Petri net to reachability graph
reachability = pm4py.convert_to_reachability_graph(net, initial_marking)
print("Converted Petri net to reachability graph")

Log Format Conversions

import pm4py

# Convert between different log formats
log = pm4py.read_xes('event_log.xes')

# Convert to DataFrame
df = pm4py.convert_to_dataframe(log)
print(f"Converted to DataFrame: {len(df)} rows")

# Convert DataFrame back to EventLog
event_log = pm4py.convert_to_event_log(df)
print("Converted DataFrame back to EventLog")

# Convert to EventStream
event_stream = pm4py.convert_to_event_stream(log)
print("Converted to EventStream")

# Convert traditional log to OCEL
ocel = pm4py.convert_log_to_ocel(
    log,
    object_type_column='case:concept:name',
    object_type_name='Case'
)
print("Converted traditional log to OCEL")

NetworkX Integration

import pm4py
import networkx as nx

# Convert log to NetworkX graph
log = pm4py.read_xes('event_log.xes')
G = pm4py.convert_log_to_networkx(log)

print(f"NetworkX graph: {G.number_of_nodes()} nodes, {G.number_of_edges()} edges")

# Analyze graph properties
print(f"Graph density: {nx.density(G):.3f}")
print(f"Strongly connected: {nx.is_strongly_connected(G)}")

# Convert Petri net to NetworkX
net, im, fm = pm4py.discover_petri_net_inductive(log)
net_graph = pm4py.convert_petri_net_to_networkx(net, im)
print(f"Petri net graph: {net_graph.number_of_nodes()} nodes")

# Convert OCEL to NetworkX
ocel = pm4py.read_ocel('ocel_data.csv')
ocel_graph = pm4py.convert_ocel_to_networkx(ocel)
print(f"OCEL graph: {ocel_graph.number_of_nodes()} nodes")

Model Parsing and Serialization

import pm4py

# Parse models from string representations
tree_string = "->('A', +('B', 'C'), 'D')"
tree = pm4py.parse_process_tree(tree_string)
print("Parsed process tree from string")

# Serialize model for later use
pm4py.serialize(tree, 'process_tree.pkl')
print("Serialized process tree")

# Deserialize model
loaded_tree = pm4py.deserialize('process_tree.pkl')
print("Deserialized process tree")

# Parse event log from string
log_string = """
Case1: A, B, C
Case2: A, C, B
Case3: A, B, B, C
"""
parsed_log = pm4py.parse_event_log_string(log_string)
print("Parsed event log from string")

Time Interval Analysis

import pm4py

# Convert log to time intervals
log = pm4py.read_xes('event_log.xes')
intervals = pm4py.convert_log_to_time_intervals(log)

print("Time Intervals Analysis:")
print(f"Intervals: {len(intervals)}")
print(intervals[['case_id', 'activity', 'start_time', 'end_time', 'duration']].head())

# Analyze interval patterns
avg_duration = intervals['duration'].mean()
max_duration = intervals['duration'].max()
print(f"Average interval duration: {avg_duration/60:.1f} minutes")
print(f"Maximum interval duration: {max_duration/3600:.1f} hours")

Batch Conversion Pipeline

import pm4py
import os

def convert_process_models(input_dir, output_dir):
    """Convert all process trees in directory to multiple formats."""
    
    os.makedirs(output_dir, exist_ok=True)
    
    for filename in os.listdir(input_dir):
        if filename.endswith('.ptml'):
            filepath = os.path.join(input_dir, filename)
            base_name = filename[:-5]  # Remove .ptml extension
            
            # Read process tree
            tree = pm4py.read_ptml(filepath)
            
            # Convert to different formats
            net, im, fm = pm4py.convert_to_petri_net(tree)
            bpmn = pm4py.convert_to_bpmn(tree)
            
            # Save in multiple formats
            pm4py.write_pnml(net, im, fm, os.path.join(output_dir, f"{base_name}.pnml"))
            pm4py.write_bpmn(bpmn, os.path.join(output_dir, f"{base_name}.bpmn"))
            
            # Serialize for PM4PY
            pm4py.serialize(tree, os.path.join(output_dir, f"{base_name}.pkl"))
            
            print(f"Converted {filename} to multiple formats")

# Run batch conversion
convert_process_models('input_models/', 'output_models/')

Data Quality Enhancement Pipeline

import pm4py
import pandas as pd

def enhance_log_quality(raw_log):
    """Comprehensive data quality enhancement pipeline."""
    
    print("Starting data quality enhancement...")
    
    # 1. Format DataFrame properly
    if isinstance(raw_log, pd.DataFrame):
        log = pm4py.format_dataframe(raw_log)
    else:
        log = raw_log
    
    print(f"Original: {len(log)} events")
    
    # 2. Remove incomplete cases (less than 2 events)
    log = pm4py.filter_case_size(log, min_size=2, max_size=float('inf'))
    print(f"After min size filter: {len(log)} events")
    
    # 3. Remove extreme durations (outliers)
    durations = pm4py.get_all_case_durations(log)
    q1 = pd.Series(durations).quantile(0.25)
    q3 = pd.Series(durations).quantile(0.75)
    iqr = q3 - q1
    lower_bound = q1 - 1.5 * iqr
    upper_bound = q3 + 1.5 * iqr
    
    log = pm4py.filter_case_performance(log, lower_bound, upper_bound)
    print(f"After duration outlier removal: {len(log)} events")
    
    # 4. Keep only most frequent variants (80% coverage)
    log = pm4py.filter_variants_by_coverage_percentage(log, 0.8)
    print(f"After variant filtering: {len(log)} events")
    
    # 5. Rebase timestamps
    log = pm4py.rebase(log)
    print("Timestamps rebased")
    
    # 6. Sample if too large
    if len(set(log['case:concept:name'])) > 5000:
        log = pm4py.sample_cases(log, 5000)
        print(f"Sampled to: {len(log)} events")
    
    return log

# Apply quality enhancement
raw_data = pd.read_csv('messy_process_data.csv')
clean_log = enhance_log_quality(raw_data)

Multi-Format Export

import pm4py

def export_analysis_results(log, output_prefix='analysis'):
    """Export process mining analysis in multiple formats."""
    
    # Discover models
    net, im, fm = pm4py.discover_petri_net_inductive(log)
    tree = pm4py.discover_process_tree_inductive(log)
    dfg, start_acts, end_acts = pm4py.discover_dfg(log)
    
    # Export logs
    pm4py.write_xes(log, f'{output_prefix}_log.xes')
    df = pm4py.convert_to_dataframe(log)
    df.to_csv(f'{output_prefix}_log.csv', index=False)
    
    # Export models
    pm4py.write_pnml(net, im, fm, f'{output_prefix}_petri_net.pnml')
    pm4py.write_ptml(tree, f'{output_prefix}_process_tree.ptml')
    pm4py.write_dfg(dfg, start_acts, end_acts, f'{output_prefix}_dfg.dfg')
    
    # Convert to NetworkX and export
    G = pm4py.convert_log_to_networkx(log)
    # Could export with nx.write_gml(G, f'{output_prefix}_graph.gml')
    
    # Serialize PM4PY objects
    pm4py.serialize(net, f'{output_prefix}_petri_net.pkl')
    pm4py.serialize(tree, f'{output_prefix}_process_tree.pkl')
    
    print(f"Analysis results exported with prefix: {output_prefix}")

# Export everything
export_analysis_results(log, 'my_process_analysis')

Install with Tessl CLI

npx tessl i tessl/pypi-pm4py

docs

conformance-checking.md

filtering.md

index.md

ml-organizational.md

object-centric.md

process-discovery.md

reading-writing.md

statistics-analysis.md

utilities-conversion.md

visualization.md

tile.json