Monitor and react to content distribution events using Sling Distribution API (org.apache.sling.distribution). Covers distribution event handling, queue monitoring, and distribution lifecycle tracking.
41
41%
Does it follow best practices?
Impact
—
No eval scenarios have been run
Passed
No known issues
Optimize this skill with Tessl
npx tessl skill review --optimize ./plugins/aem/cloud-service/skills/content-distribution/sling-distribution/SKILL.mdMonitor and react to content distribution lifecycle events using the Sling Distribution API.
Use Sling Distribution event handling for:
For programmatic publishing, use the Replication API instead.
Key Packages:
org.apache.sling.distribution - Core distribution interfacesorg.apache.sling.distribution.event - Event topics and propertiesorg.apache.sling.distribution.queue - Queue monitoringSling Distribution is the underlying transport mechanism for content replication in AEM Cloud Service. When you use the Replication API (Replicator.replicate()), Sling Distribution handles:
Replication API Call
↓
[AGENT_PACKAGE_CREATED] - Package assembled
↓
[AGENT_PACKAGE_QUEUED] - Package added to queue
↓
[AGENT_PACKAGE_DISTRIBUTED] - Package sent to pipeline
↓
[IMPORTER_PACKAGE_IMPORTED] - Package imported on target tierOR
[AGENT_PACKAGE_DROPPED] - Package failed and was removedSling Distribution fires events at each stage of the distribution lifecycle:
| Event Topic | When It Fires | Use Case |
|---|---|---|
AGENT_PACKAGE_CREATED | After package creation | Track what's being published |
AGENT_PACKAGE_QUEUED | After package is queued | Monitor queue depth |
AGENT_PACKAGE_DISTRIBUTED | After successful distribution | Trigger post-publish actions |
AGENT_PACKAGE_DROPPED | When package fails and is dropped | Handle failures, alert on-call |
IMPORTER_PACKAGE_IMPORTED | After successful import on target | Confirm content is live |
import org.apache.sling.distribution.event.DistributionEventTopics;
// Event topic strings
DistributionEventTopics.AGENT_PACKAGE_CREATED // Package created
DistributionEventTopics.AGENT_PACKAGE_QUEUED // Package queued
DistributionEventTopics.AGENT_PACKAGE_DISTRIBUTED // Package distributed
DistributionEventTopics.AGENT_PACKAGE_DROPPED // Package dropped
DistributionEventTopics.IMPORTER_PACKAGE_IMPORTED // Package importedimport org.apache.sling.distribution.event.DistributionEventTopics;
import org.apache.sling.distribution.event.DistributionEventProperties;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Component(
service = EventHandler.class,
property = {
org.osgi.service.event.EventConstants.EVENT_TOPIC + "=" +
DistributionEventTopics.AGENT_PACKAGE_CREATED,
org.osgi.service.event.EventConstants.EVENT_TOPIC + "=" +
DistributionEventTopics.AGENT_PACKAGE_QUEUED,
org.osgi.service.event.EventConstants.EVENT_TOPIC + "=" +
DistributionEventTopics.AGENT_PACKAGE_DISTRIBUTED,
org.osgi.service.event.EventConstants.EVENT_TOPIC + "=" +
DistributionEventTopics.AGENT_PACKAGE_DROPPED,
org.osgi.service.event.EventConstants.EVENT_TOPIC + "=" +
DistributionEventTopics.IMPORTER_PACKAGE_IMPORTED
}
)
public class DistributionEventLogger implements EventHandler {
private static final Logger LOG = LoggerFactory.getLogger(DistributionEventLogger.class);
@Override
public void handleEvent(Event event) {
String topic = event.getTopic();
// Extract event properties
String componentName = (String) event.getProperty(
DistributionEventProperties.DISTRIBUTION_COMPONENT_NAME
);
String componentKind = (String) event.getProperty(
DistributionEventProperties.DISTRIBUTION_COMPONENT_KIND
);
String distributionType = (String) event.getProperty(
DistributionEventProperties.DISTRIBUTION_TYPE
);
String packageId = (String) event.getProperty(
DistributionEventProperties.DISTRIBUTION_PACKAGE_ID
);
String[] paths = (String[]) event.getProperty(
DistributionEventProperties.DISTRIBUTION_PATHS
);
Long timestamp = (Long) event.getProperty(
DistributionEventProperties.DISTRIBUTION_ENQUEUE_TIMESTAMP
);
LOG.info("Distribution Event: topic={}, component={}, type={}, packageId={}, paths={}, timestamp={}",
topic, componentName, distributionType, packageId,
paths != null ? String.join(",", paths) : "null",
timestamp
);
}
}Every distribution event contains these properties:
| Property | Type | Description |
|---|---|---|
DISTRIBUTION_COMPONENT_NAME | String | Name of component generating the event |
DISTRIBUTION_COMPONENT_KIND | String | Kind of component (agent, importer, etc.) |
DISTRIBUTION_TYPE | String | Type of distribution request (ADD, DELETE, etc.) |
DISTRIBUTION_PACKAGE_ID | String | Unique package identifier |
DISTRIBUTION_PATHS | String[] | Content paths being distributed |
DISTRIBUTION_DEEP_PATHS | String[] | Deep paths (full subtree) |
DISTRIBUTION_ENQUEUE_TIMESTAMP | Long | When item was enqueued (milliseconds) |
import org.apache.sling.distribution.event.DistributionEventProperties;
@Override
public void handleEvent(Event event) {
// Component information
String componentName = (String) event.getProperty(
DistributionEventProperties.DISTRIBUTION_COMPONENT_NAME
);
String componentKind = (String) event.getProperty(
DistributionEventProperties.DISTRIBUTION_COMPONENT_KIND
);
// Distribution details
String type = (String) event.getProperty(
DistributionEventProperties.DISTRIBUTION_TYPE
);
String packageId = (String) event.getProperty(
DistributionEventProperties.DISTRIBUTION_PACKAGE_ID
);
// Content paths
String[] paths = (String[]) event.getProperty(
DistributionEventProperties.DISTRIBUTION_PATHS
);
String[] deepPaths = (String[]) event.getProperty(
DistributionEventProperties.DISTRIBUTION_DEEP_PATHS
);
// Timing
Long enqueueTime = (Long) event.getProperty(
DistributionEventProperties.DISTRIBUTION_ENQUEUE_TIMESTAMP
);
}import org.apache.sling.distribution.event.DistributionEventTopics;
@Component(
service = EventHandler.class,
property = {
org.osgi.service.event.EventConstants.EVENT_TOPIC + "=" +
DistributionEventTopics.AGENT_PACKAGE_DROPPED
}
)
public class DistributionFailureAlertHandler implements EventHandler {
private static final Logger LOG = LoggerFactory.getLogger(
DistributionFailureAlertHandler.class
);
@Reference
private AlertService alertService; // Custom alert service
@Override
public void handleEvent(Event event) {
String packageId = (String) event.getProperty(
DistributionEventProperties.DISTRIBUTION_PACKAGE_ID
);
String[] paths = (String[]) event.getProperty(
DistributionEventProperties.DISTRIBUTION_PATHS
);
LOG.error("Distribution package dropped: packageId={}, paths={}",
packageId, String.join(",", paths));
// Send alert to operations team
alertService.sendAlert(
"CRITICAL: Distribution Failed",
String.format("Package %s was dropped. Paths: %s",
packageId, String.join(",", paths))
);
}
}@Component(
service = EventHandler.class,
property = {
org.osgi.service.event.EventConstants.EVENT_TOPIC + "=" +
DistributionEventTopics.IMPORTER_PACKAGE_IMPORTED
}
)
public class CdnCacheWarmingHandler implements EventHandler {
private static final Logger LOG = LoggerFactory.getLogger(
CdnCacheWarmingHandler.class
);
@Reference
private HttpClient httpClient;
@Override
public void handleEvent(Event event) {
String[] paths = (String[]) event.getProperty(
DistributionEventProperties.DISTRIBUTION_PATHS
);
String distributionType = (String) event.getProperty(
DistributionEventProperties.DISTRIBUTION_TYPE
);
// Only warm cache on ADD (activation)
if ("ADD".equals(distributionType) && paths != null) {
for (String path : paths) {
warmCache(path);
}
}
}
private void warmCache(String path) {
try {
// Convert JCR path to public URL
String publicUrl = "https://www.example.com" +
path.replace("/content/mysite", "") + ".html";
LOG.info("Warming CDN cache for: {}", publicUrl);
HttpGet request = new HttpGet(publicUrl);
httpClient.execute(request);
} catch (Exception e) {
LOG.error("Cache warming failed for path: " + path, e);
}
}
}@Component(
service = EventHandler.class,
property = {
org.osgi.service.event.EventConstants.EVENT_TOPIC + "=" +
DistributionEventTopics.AGENT_PACKAGE_CREATED,
org.osgi.service.event.EventConstants.EVENT_TOPIC + "=" +
DistributionEventTopics.AGENT_PACKAGE_DISTRIBUTED,
org.osgi.service.event.EventConstants.EVENT_TOPIC + "=" +
DistributionEventTopics.AGENT_PACKAGE_DROPPED
}
)
public class DistributionAnalyticsHandler implements EventHandler {
@Reference
private AnalyticsService analyticsService;
@Override
public void handleEvent(Event event) {
String topic = event.getTopic();
String packageId = (String) event.getProperty(
DistributionEventProperties.DISTRIBUTION_PACKAGE_ID
);
String[] paths = (String[]) event.getProperty(
DistributionEventProperties.DISTRIBUTION_PATHS
);
Long timestamp = (Long) event.getProperty(
DistributionEventProperties.DISTRIBUTION_ENQUEUE_TIMESTAMP
);
// Track metrics
if (DistributionEventTopics.AGENT_PACKAGE_CREATED.equals(topic)) {
analyticsService.trackEvent("distribution.package.created",
Map.of("packageId", packageId, "pathCount", paths.length));
}
else if (DistributionEventTopics.AGENT_PACKAGE_DISTRIBUTED.equals(topic)) {
long duration = System.currentTimeMillis() - timestamp;
analyticsService.trackEvent("distribution.package.distributed",
Map.of("packageId", packageId, "duration", duration));
}
else if (DistributionEventTopics.AGENT_PACKAGE_DROPPED.equals(topic)) {
analyticsService.trackEvent("distribution.package.failed",
Map.of("packageId", packageId, "paths", String.join(",", paths)));
}
}
}@Component(
service = EventHandler.class,
property = {
org.osgi.service.event.EventConstants.EVENT_TOPIC + "=" +
DistributionEventTopics.IMPORTER_PACKAGE_IMPORTED
}
)
public class ProductionPublishNotificationHandler implements EventHandler {
@Reference
private SlackService slackService;
@Reference
private SlingSettingsService slingSettings;
@Override
public void handleEvent(Event event) {
// Only notify for production environment
if (!slingSettings.getRunModes().contains("prod")) {
return;
}
String[] paths = (String[]) event.getProperty(
DistributionEventProperties.DISTRIBUTION_PATHS
);
String componentName = (String) event.getProperty(
DistributionEventProperties.DISTRIBUTION_COMPONENT_NAME
);
// Only notify for publish agent (not preview)
if (!"publish".equals(componentName)) {
return;
}
String message = String.format(
":rocket: Content published to production: %s",
String.join(", ", paths)
);
slackService.postToChannel("#content-releases", message);
}
}import javax.jcr.Node;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
@Component(
service = EventHandler.class,
property = {
org.osgi.service.event.EventConstants.EVENT_TOPIC + "=" +
DistributionEventTopics.AGENT_PACKAGE_CREATED,
org.osgi.service.event.EventConstants.EVENT_TOPIC + "=" +
DistributionEventTopics.AGENT_PACKAGE_QUEUED,
org.osgi.service.event.EventConstants.EVENT_TOPIC + "=" +
DistributionEventTopics.AGENT_PACKAGE_DISTRIBUTED,
org.osgi.service.event.EventConstants.EVENT_TOPIC + "=" +
DistributionEventTopics.AGENT_PACKAGE_DROPPED,
org.osgi.service.event.EventConstants.EVENT_TOPIC + "=" +
DistributionEventTopics.IMPORTER_PACKAGE_IMPORTED
}
)
public class DistributionAuditHandler implements EventHandler {
@Reference
private ResourceResolverFactory resolverFactory;
@Override
public void handleEvent(Event event) {
try (ResourceResolver resolver = getServiceResolver()) {
String topic = event.getTopic();
String packageId = (String) event.getProperty(
DistributionEventProperties.DISTRIBUTION_PACKAGE_ID
);
String[] paths = (String[]) event.getProperty(
DistributionEventProperties.DISTRIBUTION_PATHS
);
Long timestamp = (Long) event.getProperty(
DistributionEventProperties.DISTRIBUTION_ENQUEUE_TIMESTAMP
);
// Create audit log entry under /var/audit/distribution
String auditPath = "/var/audit/distribution/" +
System.currentTimeMillis();
Node auditNode = resolver.getResource("/var/audit/distribution")
.adaptTo(Node.class)
.addNode(String.valueOf(System.currentTimeMillis()), "nt:unstructured");
auditNode.setProperty("topic", topic);
auditNode.setProperty("packageId", packageId);
auditNode.setProperty("paths", paths);
auditNode.setProperty("timestamp", timestamp);
auditNode.setProperty("auditTime", System.currentTimeMillis());
resolver.commit();
} catch (Exception e) {
LOG.error("Failed to create audit log", e);
}
}
private ResourceResolver getServiceResolver() throws Exception {
Map<String, Object> param = Map.of(
ResourceResolverFactory.SUBSERVICE, "distributionAuditor"
);
return resolverFactory.getServiceResourceResolver(param);
}
}When monitoring events, the DISTRIBUTION_TYPE property indicates the type of operation:
| Request Type | Description | When Used |
|---|---|---|
ADD | Content is being added/activated | Normal publishing |
DELETE | Content is being deleted | Unpublishing, deletion |
PULL | Content is being pulled from target | Reverse replication |
INVALIDATE | Cache invalidation only | CDN purge |
TEST | Connection test | Health checks |
@Override
public void handleEvent(Event event) {
String distributionType = (String) event.getProperty(
DistributionEventProperties.DISTRIBUTION_TYPE
);
String[] paths = (String[]) event.getProperty(
DistributionEventProperties.DISTRIBUTION_PATHS
);
switch (distributionType) {
case "ADD":
LOG.info("Content activated: {}", String.join(",", paths));
break;
case "DELETE":
LOG.info("Content deleted: {}", String.join(",", paths));
break;
case "INVALIDATE":
LOG.info("Cache invalidated: {}", String.join(",", paths));
break;
case "TEST":
LOG.debug("Distribution test executed");
break;
default:
LOG.warn("Unknown distribution type: {}", distributionType);
}
}Only listen to events you need:
// Good: Listen to specific events
property = {
org.osgi.service.event.EventConstants.EVENT_TOPIC + "=" +
DistributionEventTopics.AGENT_PACKAGE_DROPPED
}
// Avoid: Listening to all events if not neededEvent handlers should be fast. Offload heavy processing:
@Reference
private JobManager jobManager;
@Override
public void handleEvent(Event event) {
// Queue a job for async processing
Map<String, Object> jobProperties = new HashMap<>();
jobProperties.put("packageId", event.getProperty(
DistributionEventProperties.DISTRIBUTION_PACKAGE_ID));
jobManager.addJob("com/myapp/distribution/process", jobProperties);
}Event handlers should use service users, not admin sessions:
<!-- Service user mapping -->
{
"user.mapping": [
"com.myapp.core:distributionEventHandler=myapp-distribution-service"
]
}Not all properties are available in all events:
String[] paths = (String[]) event.getProperty(
DistributionEventProperties.DISTRIBUTION_PATHS
);
if (paths != null && paths.length > 0) {
// Process paths
}Use appropriate log levels:
// INFO for normal flow
LOG.info("Package distributed: {}", packageId);
// WARN for unexpected situations
LOG.warn("Package queued longer than expected: {}", packageId);
// ERROR for failures
LOG.error("Package dropped: {}", packageId);While you can't directly query distribution queues via the API in Cloud Service, you can monitor via:
AGENT_PACKAGE_QUEUED and AGENT_PACKAGE_DISTRIBUTED events/system/console/slingjobsimport java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
@Component(
service = {EventHandler.class, QueueMonitor.class},
property = {
org.osgi.service.event.EventConstants.EVENT_TOPIC + "=" +
DistributionEventTopics.AGENT_PACKAGE_QUEUED,
org.osgi.service.event.EventConstants.EVENT_TOPIC + "=" +
DistributionEventTopics.AGENT_PACKAGE_DISTRIBUTED,
org.osgi.service.event.EventConstants.EVENT_TOPIC + "=" +
DistributionEventTopics.AGENT_PACKAGE_DROPPED
}
)
public class QueueMonitor implements EventHandler {
private final Map<String, Long> queuedPackages = new ConcurrentHashMap<>();
@Override
public void handleEvent(Event event) {
String topic = event.getTopic();
String packageId = (String) event.getProperty(
DistributionEventProperties.DISTRIBUTION_PACKAGE_ID
);
if (DistributionEventTopics.AGENT_PACKAGE_QUEUED.equals(topic)) {
queuedPackages.put(packageId, System.currentTimeMillis());
LOG.info("Queue depth: {}", queuedPackages.size());
// Alert if queue is too deep
if (queuedPackages.size() > 50) {
LOG.warn("Distribution queue depth exceeds threshold: {}",
queuedPackages.size());
}
}
else if (DistributionEventTopics.AGENT_PACKAGE_DISTRIBUTED.equals(topic) ||
DistributionEventTopics.AGENT_PACKAGE_DROPPED.equals(topic)) {
Long queuedTime = queuedPackages.remove(packageId);
if (queuedTime != null) {
long duration = System.currentTimeMillis() - queuedTime;
LOG.info("Package processed in {}ms. Queue depth: {}",
duration, queuedPackages.size());
}
}
}
public int getQueueDepth() {
return queuedPackages.size();
}
}Causes:
Solution:
// Verify event topic matches exactly
property = {
org.osgi.service.event.EventConstants.EVENT_TOPIC + "=" +
DistributionEventTopics.AGENT_PACKAGE_CREATED // Exact constant
}
// Check component is active in Felix console
// /system/console/componentsCause: Not all properties are available in all events
Solution: Always null-check:
String[] paths = (String[]) event.getProperty(
DistributionEventProperties.DISTRIBUTION_PATHS
);
if (paths == null) {
LOG.warn("No paths in distribution event");
return;
}Cause: Synchronous processing in event handler
Solution: Use async job processing:
@Reference
private JobManager jobManager;
@Override
public void handleEvent(Event event) {
// Don't do heavy work here
jobManager.addJob("com/myapp/process", eventData);
}Key Understanding:
com.day.cq.replication.Replicator) - What you call to publish contentorg.apache.sling.distribution.event) - What fires during the distribution lifecycleWorkflow:
Your Code: replicator.replicate(...)
↓
Sling Distribution: Creates package → [AGENT_PACKAGE_CREATED]
↓
Sling Distribution: Queues package → [AGENT_PACKAGE_QUEUED]
↓
Sling Distribution: Sends package → [AGENT_PACKAGE_DISTRIBUTED]
↓
Target Tier: Imports package → [IMPORTER_PACKAGE_IMPORTED]e23271f
If you maintain this skill, you can claim it as your own. Once claimed, you can manage eval scenarios, bundle related skills, attach documentation or rules, and ensure cross-agent compatibility.