or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-processing.mddistributed-execution.mddynamic-compilation.mdexecution-contexts.mdhttp-services.mdindex.mdruntime-providers.mdtransaction-management.md

transaction-management.mddocs/

0

# Transaction Management

1

2

Transaction handling capabilities that provide ACID properties for Spark operations within the CDAP platform, enabling consistent data access across distributed Spark executors and proper integration with CDAP's transactional data systems.

3

4

## Capabilities

5

6

### Spark Transaction Handler

7

8

Manages transaction lifecycle for Spark jobs and stages, coordinating with CDAP's transaction system to ensure data consistency during distributed Spark execution.

9

10

```java { .api }

11

/**

12

* Handles transaction lifecycle for Spark jobs within CDAP

13

* Coordinates with TransactionSystemClient to manage ACID properties

14

*/

15

public class SparkTransactionHandler {

16

/**

17

* Marks the start of a Spark job without transaction context

18

* @param jobId Spark job identifier

19

* @param stageIds Set of stage IDs associated with this job

20

*/

21

public void jobStarted(Integer jobId, Set<Integer> stageIds);

22

23

/**

24

* Marks the start of a Spark job with transaction context

25

* @param jobId Spark job identifier

26

* @param stageIds Set of stage IDs associated with this job

27

* @param txInfo Transaction information for this job

28

*/

29

public void jobStarted(Integer jobId, Set<Integer> stageIds, SparkTransactional.TransactionInfo txInfo);

30

31

/**

32

* Marks the completion of a Spark job

33

* @param jobId Spark job identifier

34

* @param succeeded Whether the job completed successfully

35

*/

36

public void jobEnded(Integer jobId, boolean succeeded);

37

38

/**

39

* Marks the submission of a Spark stage

40

* @param stageId Spark stage identifier

41

*/

42

public void stageSubmitted(Integer stageId);

43

44

/**

45

* Marks the completion of a Spark stage

46

* @param stageId Spark stage identifier

47

*/

48

public void stageCompleted(Integer stageId);

49

50

/**

51

* Gets the current transaction for a job if available

52

* @param jobId Spark job identifier

53

* @return Optional transaction info, empty if no transaction active

54

*/

55

public Optional<SparkTransactional.TransactionInfo> getJobTransaction(Integer jobId);

56

}

57

```

58

59

### Spark Transactional

60

61

Provides transactional execution context for Spark operations, enabling dataset operations within transaction boundaries and proper commit/rollback behavior.

62

63

```java { .api }

64

/**

65

* Transactional execution context for Spark operations

66

* Enables ACID properties for dataset access within Spark applications

67

*/

68

public class SparkTransactional {

69

/**

70

* Retrieves transaction information by key

71

* @param key Transaction key identifier

72

* @return TransactionInfo containing transaction details, or null if not found

73

*/

74

public TransactionInfo getTransactionInfo(String key);

75

76

/**

77

* Executes a runnable within a transaction context

78

* @param type Transaction type (SHORT or LONG)

79

* @param runnable Code to execute within transaction

80

* @throws TransactionFailureException if transaction fails

81

*/

82

public void execute(TransactionType type, TxRunnable runnable) throws TransactionFailureException;

83

84

/**

85

* Executes a callable within a transaction context and returns result

86

* @param type Transaction type (SHORT or LONG)

87

* @param callable Code to execute within transaction

88

* @return Result of the callable execution

89

* @throws TransactionFailureException if transaction fails

90

*/

91

public <T> T execute(TransactionType type, TxCallable<T> callable) throws TransactionFailureException;

92

93

/**

94

* Gets the current transaction context if available

95

* @return Current TransactionContext, or null if no transaction active

96

*/

97

public TransactionContext getCurrentTransactionContext();

98

}

99

```

100

101

### Transaction Information and Types

102

103

Transaction metadata and execution context classes that provide access to transaction state and configuration.

104

105

```java { .api }

106

/**

107

* Container for transaction execution information

108

*/

109

public static class TransactionInfo {

110

/**

111

* Gets the underlying transaction object

112

* @return Transaction instance

113

*/

114

public Transaction getTransaction();

115

116

/**

117

* Indicates whether transaction should commit when job ends

118

* @return true if auto-commit is enabled

119

*/

120

public boolean commitOnJobEnded();

121

122

/**

123

* Gets the transaction timeout in seconds

124

* @return Timeout value in seconds

125

*/

126

public int getTimeoutInSeconds();

127

}

128

129

/**

130

* Enumeration of transaction types supported by CDAP

131

*/

132

public enum TransactionType {

133

/** Short-lived transactions for quick operations */

134

SHORT,

135

/** Long-running transactions for complex operations */

136

LONG

137

}

138

139

/**

140

* Functional interface for transactional operations without return value

141

*/

142

@FunctionalInterface

143

public interface TxRunnable {

144

void run(TransactionContext context) throws Exception;

145

}

146

147

/**

148

* Functional interface for transactional operations with return value

149

*/

150

@FunctionalInterface

151

public interface TxCallable<T> {

152

T call(TransactionContext context) throws Exception;

153

}

154

```

155

156

## Usage Examples

157

158

**Basic Transaction Management:**

159

160

```java

161

import co.cask.cdap.app.runtime.spark.SparkTransactionHandler;

162

import co.cask.cdap.app.runtime.spark.SparkTransactional;

163

import co.cask.cdap.app.runtime.spark.SparkTransactional.TransactionType;

164

165

// Transaction handler setup (typically done by CDAP framework)

166

SparkTransactionHandler txHandler = new SparkTransactionHandler(txSystemClient);

167

168

// Job starts with transaction

169

Set<Integer> stageIds = Sets.newHashSet(1, 2, 3);

170

TransactionInfo txInfo = sparkTransactional.getTransactionInfo("tx-key-123");

171

txHandler.jobStarted(jobId, stageIds, txInfo);

172

173

// Execute operations within transaction

174

sparkTransactional.execute(TransactionType.SHORT, (context) -> {

175

// Dataset operations within transaction

176

Table table = context.getDataset("myTable");

177

table.write("key", "value");

178

// Transaction automatically committed if no exceptions

179

});

180

181

// Job completion

182

txHandler.jobEnded(jobId, true);

183

```

184

185

**Dataset Access with Transactions:**

186

187

```scala

188

import co.cask.cdap.app.runtime.spark.SparkTransactional

189

import co.cask.cdap.api.dataset.table.Table

190

191

// Transactional dataset access in Spark

192

val result = sparkTransactional.execute(TransactionType.LONG, (context) => {

193

val inputTable = context.getDataset[Table]("input")

194

val outputTable = context.getDataset[Table]("output")

195

196

// Read from input table

197

val data = inputTable.read("sourceKey")

198

199

// Process and write to output table

200

val processed = processData(data)

201

outputTable.write("targetKey", processed)

202

203

processed

204

})

205

```

206

207

**Transaction Context in Spark Jobs:**

208

209

```java

210

// Check for active transaction in Spark job

211

SparkListenerJobStart jobStart = ...;

212

String txKey = jobStart.getProperties().getProperty(SparkTransactional.ACTIVE_TRANSACTION_KEY);

213

214

if (txKey != null && !txKey.isEmpty()) {

215

TransactionInfo txInfo = sparkTransactional.getTransactionInfo(txKey);

216

if (txInfo != null) {

217

LOG.info("Job {} running with transaction: {}, auto-commit: {}",

218

jobStart.jobId(), txInfo.getTransaction(), txInfo.commitOnJobEnded());

219

}

220

}

221

```