Process mining library for discovering, analyzing and visualizing business processes from event data
—
Machine learning features for predictive process analytics and organizational mining for resource analysis and social network discovery. PM4PY provides tools for feature extraction, predictive modeling, and organizational pattern analysis.
Prepare event log data for machine learning applications including train/test splits and prefix extraction.
def split_train_test(log, train_percentage=0.8, case_id_key='case:concept:name'):
"""
Split event log into training and test sets for machine learning.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- train_percentage (float): Percentage of data for training (0.0-1.0)
- case_id_key (str): Case ID attribute name
Returns:
Union[Tuple[EventLog, EventLog], Tuple[pd.DataFrame, pd.DataFrame]]: (train_log, test_log)
"""
def get_prefixes_from_log(log, length, case_id_key='case:concept:name'):
"""
Extract trace prefixes of specified length for predictive modeling.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- length (int): Length of prefixes to extract
- case_id_key (str): Case ID attribute name
Returns:
Union[EventLog, pd.DataFrame]: Event log with prefixes
"""Extract features from event logs and OCEL for machine learning applications.
def extract_ocel_features(ocel, **kwargs):
"""
Extract machine learning features from Object-Centric Event Log.
Parameters:
- ocel (OCEL): Object-centric event log
- **kwargs: Feature extraction parameters including:
- feature_types: List of feature types to extract
- aggregation_methods: Methods for aggregating object features
- temporal_features: Whether to include temporal features
Returns:
pd.DataFrame: Feature matrix with one row per object or event
"""
def extract_features_dataframe(log, **kwargs):
"""
Extract comprehensive features from traditional event log.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- **kwargs: Feature extraction parameters including:
- case_features: Whether to extract case-level features
- event_features: Whether to extract event-level features
- temporal_features: Include temporal patterns
- categorical_encoding: Method for encoding categorical variables
Returns:
pd.DataFrame: Feature matrix ready for machine learning
"""
def extract_temporal_features_dataframe(log, **kwargs):
"""
Extract temporal features from event log for time-aware modeling.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- **kwargs: Temporal feature parameters including:
- time_windows: Time windows for feature aggregation
- cyclical_features: Include cyclical time features (hour, day, month)
- duration_features: Include activity and case duration features
Returns:
pd.DataFrame: Temporal feature matrix
"""
def extract_outcome_enriched_dataframe(log, **kwargs):
"""
Extract features enriched with outcome information for supervised learning.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- **kwargs: Outcome enrichment parameters including:
- outcome_definition: How to define positive/negative outcomes
- prediction_horizon: Time horizon for outcome prediction
- feature_encoding: Method for encoding features
Returns:
pd.DataFrame: Feature matrix with outcome labels
"""
def extract_target_vector(log, **kwargs):
"""
Extract target vector for supervised machine learning.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- **kwargs: Target extraction parameters including:
- target_type: Type of target ('next_activity', 'remaining_time', 'outcome')
- prediction_point: Point in trace for prediction
- encoding_method: Method for encoding targets
Returns:
List[Any]: Target vector for machine learning
"""Discover and analyze social networks and organizational patterns from event logs.
def discover_handover_of_work_network(log, beta=0, resource_key='org:resource', timestamp_key='time:timestamp', case_id_key='case:concept:name'):
"""
Discover handover of work network showing resource collaboration patterns.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- beta (float): Beta parameter for network weight calculation
- resource_key (str): Resource attribute name
- timestamp_key (str): Timestamp attribute name
- case_id_key (str): Case ID attribute name
Returns:
SNA: Social Network Analysis object with handover relationships
"""
def discover_activity_based_resource_similarity(log, **kwargs):
"""
Discover resource similarity network based on shared activities.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- **kwargs: Similarity calculation parameters including:
- similarity_metric: Method for calculating similarity
- min_shared_activities: Minimum shared activities for connection
- normalization: Normalization method for similarities
Returns:
SNA: Social network with resource similarities
"""
def discover_subcontracting_network(log, **kwargs):
"""
Discover subcontracting relationships between resources.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- **kwargs: Subcontracting detection parameters including:
- time_window: Time window for detecting subcontracting
- activity_patterns: Patterns indicating subcontracting
- threshold: Threshold for relationship strength
Returns:
SNA: Social network with subcontracting relationships
"""
def discover_working_together_network(log, resource_key='org:resource', timestamp_key='time:timestamp', case_id_key='case:concept:name'):
"""
Discover working together network showing collaborative relationships.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- resource_key (str): Resource attribute name
- timestamp_key (str): Timestamp attribute name
- case_id_key (str): Case ID attribute name
Returns:
SNA: Social network with collaboration relationships
"""Discover organizational roles and structures from resource behavior patterns.
def discover_organizational_roles(log, **kwargs):
"""
Discover organizational roles based on resource activity patterns.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- **kwargs: Role discovery parameters including:
- clustering_method: Method for role clustering
- min_role_size: Minimum number of resources per role
- activity_similarity_threshold: Threshold for activity similarity
Returns:
List[Role]: List of discovered organizational roles
"""
def discover_network_analysis(log, **kwargs):
"""
Perform comprehensive network analysis on organizational data.
Parameters:
- log (Union[EventLog, pd.DataFrame]): Event log data
- **kwargs: Network analysis parameters including:
- network_types: Types of networks to analyze
- centrality_measures: Centrality measures to compute
- community_detection: Whether to detect communities
Returns:
Dict[str, Any]: Comprehensive network analysis results
"""import pm4py
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report
# Load event log
log = pm4py.read_xes('event_log.xes')
# Split into train/test sets
train_log, test_log = pm4py.split_train_test(log, train_percentage=0.8)
print(f"Training cases: {len(train_log)}")
print(f"Test cases: {len(test_log)}")
# Extract prefixes for predictive modeling
prefix_length = 5
train_prefixes = pm4py.get_prefixes_from_log(train_log, prefix_length)
test_prefixes = pm4py.get_prefixes_from_log(test_log, prefix_length)
print(f"Training prefixes: {len(train_prefixes)} events")
print(f"Test prefixes: {len(test_prefixes)} events")import pm4py
import pandas as pd
# Extract comprehensive features
features_train = pm4py.extract_features_dataframe(
train_prefixes,
case_features=True,
event_features=True,
temporal_features=True,
categorical_encoding='onehot'
)
features_test = pm4py.extract_features_dataframe(
test_prefixes,
case_features=True,
event_features=True,
temporal_features=True,
categorical_encoding='onehot'
)
print("Extracted Features:")
print(f" Training features shape: {features_train.shape}")
print(f" Test features shape: {features_test.shape}")
print(f" Feature columns: {list(features_train.columns)}")
# Extract temporal features specifically
temporal_features = pm4py.extract_temporal_features_dataframe(
train_log,
time_windows=['1h', '1d', '1w'],
cyclical_features=True,
duration_features=True
)
print(f"Temporal features shape: {temporal_features.shape}")import pm4py
from sklearn.ensemble import RandomForestClassifier
# Extract features and targets for next activity prediction
X_train = pm4py.extract_features_dataframe(train_prefixes)
y_train = pm4py.extract_target_vector(
train_prefixes,
target_type='next_activity',
encoding_method='label'
)
X_test = pm4py.extract_features_dataframe(test_prefixes)
y_test = pm4py.extract_target_vector(
test_prefixes,
target_type='next_activity',
encoding_method='label'
)
# Train model
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# Predict and evaluate
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"Next Activity Prediction Accuracy: {accuracy:.3f}")
print("\nClassification Report:")
print(classification_report(y_test, y_pred))
# Feature importance
feature_importance = pd.DataFrame({
'feature': X_train.columns,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
print("\nTop 10 Important Features:")
print(feature_importance.head(10))import pm4py
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_absolute_error, r2_score
# Extract features and targets for remaining time prediction
X_train = pm4py.extract_temporal_features_dataframe(train_prefixes)
y_train = pm4py.extract_target_vector(
train_prefixes,
target_type='remaining_time',
time_unit='hours'
)
X_test = pm4py.extract_temporal_features_dataframe(test_prefixes)
y_test = pm4py.extract_target_vector(
test_prefixes,
target_type='remaining_time',
time_unit='hours'
)
# Train regression model
model = GradientBoostingRegressor(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# Predict and evaluate
y_pred = model.predict(X_test)
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
print(f"Remaining Time Prediction:")
print(f" Mean Absolute Error: {mae:.2f} hours")
print(f" R² Score: {r2:.3f}")import pm4py
# Load OCEL and extract features
ocel = pm4py.read_ocel('ocel_data.csv')
# Extract OCEL-specific features
ocel_features = pm4py.extract_ocel_features(
ocel,
feature_types=['object_lifecycle', 'interaction_patterns', 'temporal_patterns'],
aggregation_methods=['count', 'mean', 'std'],
temporal_features=True
)
print("OCEL Features:")
print(f" Feature matrix shape: {ocel_features.shape}")
print(f" Object types covered: {ocel_features['object_type'].nunique()}")
# Group features by object type
for obj_type in ocel_features['object_type'].unique():
obj_features = ocel_features[ocel_features['object_type'] == obj_type]
print(f" {obj_type}: {len(obj_features)} objects, {obj_features.shape[1]-1} features")import pm4py
# Discover handover of work network
handover_network = pm4py.discover_handover_of_work_network(log, beta=0.5)
print("Handover Network Statistics:")
print(f" Nodes (resources): {len(handover_network.nodes)}")
print(f" Edges (handovers): {len(handover_network.edges)}")
# Visualize handover network
pm4py.view_sna(handover_network)
pm4py.save_vis_sna(handover_network, 'handover_network.png')
# Discover working together network
collaboration_network = pm4py.discover_working_together_network(log)
print("Collaboration Network Statistics:")
print(f" Nodes: {len(collaboration_network.nodes)}")
print(f" Edges: {len(collaboration_network.edges)}")
# Activity-based similarity network
similarity_network = pm4py.discover_activity_based_resource_similarity(
log,
similarity_metric='jaccard',
min_shared_activities=3
)
print("Resource Similarity Network:")
print(f" Nodes: {len(similarity_network.nodes)}")
print(f" Edges: {len(similarity_network.edges)}")import pm4py
# Discover organizational roles
roles = pm4py.discover_organizational_roles(
log,
clustering_method='kmeans',
min_role_size=3,
activity_similarity_threshold=0.7
)
print(f"Discovered {len(roles)} organizational roles:")
for i, role in enumerate(roles):
print(f"\nRole {i+1}:")
print(f" Resources: {len(role.resources)}")
print(f" Main activities: {role.main_activities}")
print(f" Activity coverage: {role.activity_coverage:.2f}")
print(f" Resources: {list(role.resources)[:5]}{'...' if len(role.resources) > 5 else ''}")
# Comprehensive network analysis
network_analysis = pm4py.discover_network_analysis(
log,
network_types=['handover', 'collaboration', 'similarity'],
centrality_measures=['betweenness', 'closeness', 'degree'],
community_detection=True
)
print("\nComprehensive Network Analysis:")
print(f" Network types analyzed: {len(network_analysis['networks'])}")
print(f" Communities detected: {network_analysis['communities']['count']}")
print(f" Key resources (high centrality): {network_analysis['key_resources']}")import pm4py
from sklearn.linear_model import LogisticRegression
# Extract outcome-enriched features
outcome_features = pm4py.extract_outcome_enriched_dataframe(
log,
outcome_definition='case_duration > average',
prediction_horizon='50%', # Predict at 50% of case completion
feature_encoding='numerical'
)
print("Outcome Prediction Dataset:")
print(f" Total instances: {len(outcome_features)}")
print(f" Positive outcomes: {outcome_features['outcome'].sum()}")
print(f" Features: {outcome_features.shape[1] - 1}")
# Split features and targets
X = outcome_features.drop(['case_id', 'outcome'], axis=1)
y = outcome_features['outcome']
# Train outcome prediction model
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
model = LogisticRegression(random_state=42)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"\nOutcome Prediction Results:")
print(f" Accuracy: {accuracy:.3f}")
print(f" Classification Report:")
print(classification_report(y_test, y_pred))import pm4py
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import pandas as pd
def predictive_monitoring_pipeline(log, prefix_lengths=[3, 5, 7, 10]):
"""Complete predictive process monitoring pipeline."""
results = {}
# Split data
train_log, test_log = pm4py.split_train_test(log, 0.8)
for prefix_length in prefix_lengths:
print(f"\nAnalyzing prefix length: {prefix_length}")
# Extract prefixes
train_prefixes = pm4py.get_prefixes_from_log(train_log, prefix_length)
test_prefixes = pm4py.get_prefixes_from_log(test_log, prefix_length)
if len(train_prefixes) == 0 or len(test_prefixes) == 0:
print(f" Insufficient data for prefix length {prefix_length}")
continue
# Extract features
X_train = pm4py.extract_features_dataframe(train_prefixes)
X_test = pm4py.extract_features_dataframe(test_prefixes)
# Next activity prediction
y_train_activity = pm4py.extract_target_vector(train_prefixes, target_type='next_activity')
y_test_activity = pm4py.extract_target_vector(test_prefixes, target_type='next_activity')
model_activity = RandomForestClassifier(n_estimators=50, random_state=42)
model_activity.fit(X_train, y_train_activity)
activity_accuracy = accuracy_score(y_test_activity, model_activity.predict(X_test))
# Remaining time prediction (if applicable)
try:
y_train_time = pm4py.extract_target_vector(train_prefixes, target_type='remaining_time')
y_test_time = pm4py.extract_target_vector(test_prefixes, target_type='remaining_time')
from sklearn.ensemble import GradientBoostingRegressor
model_time = GradientBoostingRegressor(n_estimators=50, random_state=42)
model_time.fit(X_train, y_train_time)
time_mae = mean_absolute_error(y_test_time, model_time.predict(X_test))
except:
time_mae = None
results[prefix_length] = {
'activity_accuracy': activity_accuracy,
'time_mae': time_mae,
'train_samples': len(train_prefixes),
'test_samples': len(test_prefixes),
'features': X_train.shape[1]
}
print(f" Next activity accuracy: {activity_accuracy:.3f}")
if time_mae:
print(f" Remaining time MAE: {time_mae:.2f}")
return results
# Run predictive monitoring analysis
prediction_results = predictive_monitoring_pipeline(log)
# Summarize results
print("\n" + "="*50)
print("PREDICTIVE MONITORING SUMMARY")
print("="*50)
for prefix_len, metrics in prediction_results.items():
print(f"Prefix Length {prefix_len}:")
print(f" Activity Prediction Accuracy: {metrics['activity_accuracy']:.3f}")
if metrics['time_mae']:
print(f" Time Prediction MAE: {metrics['time_mae']:.2f}")
print(f" Training Samples: {metrics['train_samples']}")
print(f" Features: {metrics['features']}")import pm4py
def analyze_resource_performance(log):
"""Analyze individual resource performance and collaboration patterns."""
# Get basic resource statistics
resources = log['org:resource'].unique() if 'org:resource' in log.columns else []
print(f"Resource Performance Analysis ({len(resources)} resources)")
print("-" * 50)
# Discover networks
handover_net = pm4py.discover_handover_of_work_network(log)
collab_net = pm4py.discover_working_together_network(log)
# Calculate resource metrics
resource_metrics = []
for resource in resources:
# Filter log for this resource
resource_log = log[log['org:resource'] == resource]
# Basic metrics
cases_handled = resource_log['case:concept:name'].nunique()
events_performed = len(resource_log)
activities = resource_log['concept:name'].nunique()
# Network metrics
handover_connections = len([e for e in handover_net.edges if resource in e])
collab_connections = len([e for e in collab_net.edges if resource in e])
resource_metrics.append({
'resource': resource,
'cases_handled': cases_handled,
'events_performed': events_performed,
'activities': activities,
'handover_connections': handover_connections,
'collaboration_connections': collab_connections
})
# Convert to DataFrame for analysis
metrics_df = pd.DataFrame(resource_metrics)
print("Top 10 Resources by Cases Handled:")
top_resources = metrics_df.nlargest(10, 'cases_handled')
for _, row in top_resources.iterrows():
print(f" {row['resource']}: {row['cases_handled']} cases, "
f"{row['activities']} activities, {row['handover_connections']} handovers")
return metrics_df
# Run resource performance analysis
resource_analysis = analyze_resource_performance(log)Install with Tessl CLI
npx tessl i tessl/pypi-pm4py