Airbyte source connector that enables data extraction from Mailchimp's API for email marketing analytics and audience management
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Specialized record extraction and transformation for complex Mailchimp API responses, particularly email activity data. Provides custom extractors that handle nested data structures and flatten complex API responses for downstream consumption.
Custom extractor for processing email activity data with nested activity arrays that need to be flattened into individual records.
class MailChimpRecordExtractorEmailActivity(DpathExtractor):
"""
Custom record extractor for email activity data.
Extends DpathExtractor to handle complex nested activity structures
from Mailchimp's email activity API responses. Flattens activity
arrays into individual records while preserving parent record data.
"""
def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]:
"""
Extract and flatten email activity records from API response.
Parameters:
- response: HTTP response from Mailchimp email activity API
Returns:
Iterator of flattened activity records
Process:
1. Extracts base records using parent DpathExtractor
2. Iterates through each record's 'activity' array
3. Flattens activity items by merging with parent record data
4. Yields individual activity records with complete context
Each output record contains both the email context and
specific activity details (opens, clicks, bounces, etc.).
"""The email activity API returns nested structures where each email record contains an array of activity events. The custom extractor transforms this structure:
Input Structure:
{
"email_id": "abc123",
"campaign_id": "xyz789",
"list_id": "list456",
"email_address": "user@example.com",
"activity": [
{
"action": "open",
"timestamp": "2023-01-01T12:00:00+00:00",
"url": null,
"ip": "192.168.1.1"
},
{
"action": "click",
"timestamp": "2023-01-01T12:05:00+00:00",
"url": "https://example.com/link",
"ip": "192.168.1.1"
}
]
}Output Records:
[
{
"email_id": "abc123",
"campaign_id": "xyz789",
"list_id": "list456",
"email_address": "user@example.com",
"action": "open",
"timestamp": "2023-01-01T12:00:00+00:00",
"url": null,
"ip": "192.168.1.1"
},
{
"email_id": "abc123",
"campaign_id": "xyz789",
"list_id": "list456",
"email_address": "user@example.com",
"action": "click",
"timestamp": "2023-01-01T12:05:00+00:00",
"url": "https://example.com/link",
"ip": "192.168.1.1"
}
]import requests
from typing import Any, Iterable, Mapping
from airbyte_cdk.sources.declarative.extractors import DpathExtractor
class MailChimpRecordExtractorEmailActivity(DpathExtractor):
def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]:
# Get base records from parent extractor
records = super().extract_records(response=response)
# Flatten each record's activity array
for record in records:
activity_items = record.pop("activity", [])
for activity_item in activity_items:
# Merge parent record data with activity item
yield {**record, **activity_item}The custom extractor is configured in the manifest.yaml for the email_activity stream:
email_activity_stream:
type: DeclarativeStream
retriever:
record_selector:
type: RecordSelector
extractor:
type: CustomRecordExtractor
class_name: source_mailchimp.components.MailChimpRecordExtractorEmailActivity
field_path: ["emails"]# 1. API response received with nested activity data
# 2. Base DpathExtractor processes response structure
base_records = super().extract_records(response)
# 3. Custom extractor flattens activity arrays
for record in base_records:
activities = record.pop("activity", []) # Remove activity array
# 4. Create individual records for each activity
for activity in activities:
flattened_record = {**record, **activity} # Merge data
yield flattened_record # Output individual activity recordInstall with Tessl CLI
npx tessl i tessl/pypi-airbyte-source-mailchimp