0
# Transaction Management
1
2
Transaction handling capabilities that provide ACID properties for Spark operations within the CDAP platform, enabling consistent data access across distributed Spark executors and proper integration with CDAP's transactional data systems.
3
4
## Capabilities
5
6
### Spark Transaction Handler
7
8
Manages transaction lifecycle for Spark jobs and stages, coordinating with CDAP's transaction system to ensure data consistency during distributed Spark execution.
9
10
```java { .api }
11
/**
12
* Handles transaction lifecycle for Spark jobs within CDAP
13
* Coordinates with TransactionSystemClient to manage ACID properties
14
*/
15
public class SparkTransactionHandler {
16
/**
17
* Marks the start of a Spark job without transaction context
18
* @param jobId Spark job identifier
19
* @param stageIds Set of stage IDs associated with this job
20
*/
21
public void jobStarted(Integer jobId, Set<Integer> stageIds);
22
23
/**
24
* Marks the start of a Spark job with transaction context
25
* @param jobId Spark job identifier
26
* @param stageIds Set of stage IDs associated with this job
27
* @param txInfo Transaction information for this job
28
*/
29
public void jobStarted(Integer jobId, Set<Integer> stageIds, SparkTransactional.TransactionInfo txInfo);
30
31
/**
32
* Marks the completion of a Spark job
33
* @param jobId Spark job identifier
34
* @param succeeded Whether the job completed successfully
35
*/
36
public void jobEnded(Integer jobId, boolean succeeded);
37
38
/**
39
* Marks the submission of a Spark stage
40
* @param stageId Spark stage identifier
41
*/
42
public void stageSubmitted(Integer stageId);
43
44
/**
45
* Marks the completion of a Spark stage
46
* @param stageId Spark stage identifier
47
*/
48
public void stageCompleted(Integer stageId);
49
50
/**
51
* Gets the current transaction for a job if available
52
* @param jobId Spark job identifier
53
* @return Optional transaction info, empty if no transaction active
54
*/
55
public Optional<SparkTransactional.TransactionInfo> getJobTransaction(Integer jobId);
56
}
57
```
58
59
### Spark Transactional
60
61
Provides transactional execution context for Spark operations, enabling dataset operations within transaction boundaries and proper commit/rollback behavior.
62
63
```java { .api }
64
/**
65
* Transactional execution context for Spark operations
66
* Enables ACID properties for dataset access within Spark applications
67
*/
68
public class SparkTransactional {
69
/**
70
* Retrieves transaction information by key
71
* @param key Transaction key identifier
72
* @return TransactionInfo containing transaction details, or null if not found
73
*/
74
public TransactionInfo getTransactionInfo(String key);
75
76
/**
77
* Executes a runnable within a transaction context
78
* @param type Transaction type (SHORT or LONG)
79
* @param runnable Code to execute within transaction
80
* @throws TransactionFailureException if transaction fails
81
*/
82
public void execute(TransactionType type, TxRunnable runnable) throws TransactionFailureException;
83
84
/**
85
* Executes a callable within a transaction context and returns result
86
* @param type Transaction type (SHORT or LONG)
87
* @param callable Code to execute within transaction
88
* @return Result of the callable execution
89
* @throws TransactionFailureException if transaction fails
90
*/
91
public <T> T execute(TransactionType type, TxCallable<T> callable) throws TransactionFailureException;
92
93
/**
94
* Gets the current transaction context if available
95
* @return Current TransactionContext, or null if no transaction active
96
*/
97
public TransactionContext getCurrentTransactionContext();
98
}
99
```
100
101
### Transaction Information and Types
102
103
Transaction metadata and execution context classes that provide access to transaction state and configuration.
104
105
```java { .api }
106
/**
107
* Container for transaction execution information
108
*/
109
public static class TransactionInfo {
110
/**
111
* Gets the underlying transaction object
112
* @return Transaction instance
113
*/
114
public Transaction getTransaction();
115
116
/**
117
* Indicates whether transaction should commit when job ends
118
* @return true if auto-commit is enabled
119
*/
120
public boolean commitOnJobEnded();
121
122
/**
123
* Gets the transaction timeout in seconds
124
* @return Timeout value in seconds
125
*/
126
public int getTimeoutInSeconds();
127
}
128
129
/**
130
* Enumeration of transaction types supported by CDAP
131
*/
132
public enum TransactionType {
133
/** Short-lived transactions for quick operations */
134
SHORT,
135
/** Long-running transactions for complex operations */
136
LONG
137
}
138
139
/**
140
* Functional interface for transactional operations without return value
141
*/
142
@FunctionalInterface
143
public interface TxRunnable {
144
void run(TransactionContext context) throws Exception;
145
}
146
147
/**
148
* Functional interface for transactional operations with return value
149
*/
150
@FunctionalInterface
151
public interface TxCallable<T> {
152
T call(TransactionContext context) throws Exception;
153
}
154
```
155
156
## Usage Examples
157
158
**Basic Transaction Management:**
159
160
```java
161
import co.cask.cdap.app.runtime.spark.SparkTransactionHandler;
162
import co.cask.cdap.app.runtime.spark.SparkTransactional;
163
import co.cask.cdap.app.runtime.spark.SparkTransactional.TransactionType;
164
165
// Transaction handler setup (typically done by CDAP framework)
166
SparkTransactionHandler txHandler = new SparkTransactionHandler(txSystemClient);
167
168
// Job starts with transaction
169
Set<Integer> stageIds = Sets.newHashSet(1, 2, 3);
170
TransactionInfo txInfo = sparkTransactional.getTransactionInfo("tx-key-123");
171
txHandler.jobStarted(jobId, stageIds, txInfo);
172
173
// Execute operations within transaction
174
sparkTransactional.execute(TransactionType.SHORT, (context) -> {
175
// Dataset operations within transaction
176
Table table = context.getDataset("myTable");
177
table.write("key", "value");
178
// Transaction automatically committed if no exceptions
179
});
180
181
// Job completion
182
txHandler.jobEnded(jobId, true);
183
```
184
185
**Dataset Access with Transactions:**
186
187
```scala
188
import co.cask.cdap.app.runtime.spark.SparkTransactional
189
import co.cask.cdap.api.dataset.table.Table
190
191
// Transactional dataset access in Spark
192
val result = sparkTransactional.execute(TransactionType.LONG, (context) => {
193
val inputTable = context.getDataset[Table]("input")
194
val outputTable = context.getDataset[Table]("output")
195
196
// Read from input table
197
val data = inputTable.read("sourceKey")
198
199
// Process and write to output table
200
val processed = processData(data)
201
outputTable.write("targetKey", processed)
202
203
processed
204
})
205
```
206
207
**Transaction Context in Spark Jobs:**
208
209
```java
210
// Check for active transaction in Spark job
211
SparkListenerJobStart jobStart = ...;
212
String txKey = jobStart.getProperties().getProperty(SparkTransactional.ACTIVE_TRANSACTION_KEY);
213
214
if (txKey != null && !txKey.isEmpty()) {
215
TransactionInfo txInfo = sparkTransactional.getTransactionInfo(txKey);
216
if (txInfo != null) {
217
LOG.info("Job {} running with transaction: {}, auto-commit: {}",
218
jobStart.jobId(), txInfo.getTransaction(), txInfo.commitOnJobEnded());
219
}
220
}
221
```