0
# Custom Data Processing
1
2
Specialized record extraction and transformation for complex Mailchimp API responses, particularly email activity data. Provides custom extractors that handle nested data structures and flatten complex API responses for downstream consumption.
3
4
## Capabilities
5
6
### Email Activity Record Extractor
7
8
Custom extractor for processing email activity data with nested activity arrays that need to be flattened into individual records.
9
10
```python { .api }
11
class MailChimpRecordExtractorEmailActivity(DpathExtractor):
12
"""
13
Custom record extractor for email activity data.
14
15
Extends DpathExtractor to handle complex nested activity structures
16
from Mailchimp's email activity API responses. Flattens activity
17
arrays into individual records while preserving parent record data.
18
"""
19
20
def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]:
21
"""
22
Extract and flatten email activity records from API response.
23
24
Parameters:
25
- response: HTTP response from Mailchimp email activity API
26
27
Returns:
28
Iterator of flattened activity records
29
30
Process:
31
1. Extracts base records using parent DpathExtractor
32
2. Iterates through each record's 'activity' array
33
3. Flattens activity items by merging with parent record data
34
4. Yields individual activity records with complete context
35
36
Each output record contains both the email context and
37
specific activity details (opens, clicks, bounces, etc.).
38
"""
39
```
40
41
## Data Transformation Details
42
43
### Email Activity Flattening
44
45
The email activity API returns nested structures where each email record contains an array of activity events. The custom extractor transforms this structure:
46
47
**Input Structure:**
48
```json
49
{
50
"email_id": "abc123",
51
"campaign_id": "xyz789",
52
"list_id": "list456",
53
"email_address": "user@example.com",
54
"activity": [
55
{
56
"action": "open",
57
"timestamp": "2023-01-01T12:00:00+00:00",
58
"url": null,
59
"ip": "192.168.1.1"
60
},
61
{
62
"action": "click",
63
"timestamp": "2023-01-01T12:05:00+00:00",
64
"url": "https://example.com/link",
65
"ip": "192.168.1.1"
66
}
67
]
68
}
69
```
70
71
**Output Records:**
72
```json
73
[
74
{
75
"email_id": "abc123",
76
"campaign_id": "xyz789",
77
"list_id": "list456",
78
"email_address": "user@example.com",
79
"action": "open",
80
"timestamp": "2023-01-01T12:00:00+00:00",
81
"url": null,
82
"ip": "192.168.1.1"
83
},
84
{
85
"email_id": "abc123",
86
"campaign_id": "xyz789",
87
"list_id": "list456",
88
"email_address": "user@example.com",
89
"action": "click",
90
"timestamp": "2023-01-01T12:05:00+00:00",
91
"url": "https://example.com/link",
92
"ip": "192.168.1.1"
93
}
94
]
95
```
96
97
## Usage Examples
98
99
### Custom Extractor Implementation
100
101
```python
102
import requests
103
from typing import Any, Iterable, Mapping
104
from airbyte_cdk.sources.declarative.extractors import DpathExtractor
105
106
class MailChimpRecordExtractorEmailActivity(DpathExtractor):
107
def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]:
108
# Get base records from parent extractor
109
records = super().extract_records(response=response)
110
111
# Flatten each record's activity array
112
for record in records:
113
activity_items = record.pop("activity", [])
114
for activity_item in activity_items:
115
# Merge parent record data with activity item
116
yield {**record, **activity_item}
117
```
118
119
### Integration with Declarative Framework
120
121
The custom extractor is configured in the manifest.yaml for the email_activity stream:
122
123
```yaml
124
email_activity_stream:
125
type: DeclarativeStream
126
retriever:
127
record_selector:
128
type: RecordSelector
129
extractor:
130
type: CustomRecordExtractor
131
class_name: source_mailchimp.components.MailChimpRecordExtractorEmailActivity
132
field_path: ["emails"]
133
```
134
135
### Record Processing Flow
136
137
```python
138
# 1. API response received with nested activity data
139
140
# 2. Base DpathExtractor processes response structure
141
base_records = super().extract_records(response)
142
143
# 3. Custom extractor flattens activity arrays
144
for record in base_records:
145
activities = record.pop("activity", []) # Remove activity array
146
147
# 4. Create individual records for each activity
148
for activity in activities:
149
flattened_record = {**record, **activity} # Merge data
150
yield flattened_record # Output individual activity record
151
```
152
153
## Benefits
154
155
### Enhanced Data Structure
156
157
- **Simplified Analytics**: Each activity becomes a separate record for easier analysis
158
- **Preserved Context**: Parent email data maintained in each activity record
159
- **Consistent Schema**: Flattened structure matches expected downstream formats
160
- **Complete Attribution**: Full email and campaign context for each activity event
161
162
### Performance Optimization
163
164
- **Streaming Processing**: Generator-based extraction for memory efficiency
165
- **Lazy Evaluation**: Records processed on-demand during iteration
166
- **Incremental Compatibility**: Works with timestamp-based incremental sync
167
- **Error Isolation**: Failed activity items don't impact other records