CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-temporal--temporal-sdk

Temporal Workflow Java SDK - A framework for authoring Workflows and Activities in Java

Overview
Eval results
Files

data-conversion.mddocs/

Payload Conversion and Serialization

Pluggable payload conversion system for serializing workflow arguments, results, and signals with support for custom codecs and encryption.

Capabilities

Payload Converter

Core interface for converting between Java objects and Temporal payloads.

/**
 * Converts between Java objects and Temporal payloads.
 */
public interface PayloadConverter {
    /**
     * Converts Java object to payload.
     * @param value object to convert
     * @return optional payload if conversion successful
     * @throws DataConverterException if conversion fails
     */
    Optional<Payload> toData(Object value) throws DataConverterException;

    /**
     * Converts payload to Java object.
     * @param content payload to convert
     * @param valueClass target class
     * @param valueType target generic type
     * @return converted object
     * @throws DataConverterException if conversion fails
     */
    <T> T fromData(Payload content, Class<T> valueClass, Type valueType) throws DataConverterException;

    /**
     * Gets encoding type for this converter.
     * @return encoding type string
     */
    String getEncodingType();
}

Data Converter

Main interface for data conversion with support for multiple payload converters.

/**
 * Main interface for data conversion with support for multiple payload converters.
 */
public interface DataConverter {
    /**
     * Gets default data converter instance.
     * @return default data converter
     */
    static DataConverter getDefaultInstance();

    /**
     * Creates data converter with custom payload converters.
     * @param payloadConverters array of payload converters
     * @return data converter with custom converters
     */
    static DataConverter newDefaultInstance(PayloadConverter... payloadConverters);

    /**
     * Converts single value to payloads.
     * @param value value to convert
     * @return optional payloads
     */
    Optional<Payloads> toPayloads(Object... value) throws DataConverterException;

    /**
     * Converts payloads to array of values.
     * @param content payloads to convert
     * @param valueTypes target types
     * @return array of converted values
     */
    <T> T[] fromPayloads(int index, Optional<Payloads> content, Class<T> valueType, Type... valueTypes) throws DataConverterException;

    /**
     * Converts single payload to value.
     * @param content payload to convert
     * @param valueClass target class
     * @param valueType target generic type
     * @return converted value
     */
    <T> T fromPayload(Payload content, Class<T> valueClass, Type valueType) throws DataConverterException;

    /**
     * Gets payload converters.
     * @return list of payload converters
     */
    List<PayloadConverter> getPayloadConverters();

    /**
     * Converts failure to payload.
     * @param failure failure to convert
     * @return failure payload
     */
    Failure exceptionToFailure(Throwable failure);

    /**
     * Converts payload to failure.
     * @param failure failure payload
     * @return failure exception
     */
    Throwable failureToException(Failure failure);

    /**
     * Creates data converter with payload codec.
     * @param payloadCodec payload codec for encoding/decoding
     * @return data converter with codec
     */
    DataConverter withPayloadCodec(PayloadCodec payloadCodec);

    /**
     * Creates data converter with failure converter.
     * @param failureConverter failure converter
     * @return data converter with failure converter
     */
    DataConverter withFailureConverter(FailureConverter failureConverter);
}

Usage Examples:

public class CustomDataConverterExample {
    public static void setupCustomDataConverter() {
        // Create custom payload converters
        PayloadConverter jsonConverter = new JacksonJsonPayloadConverter();
        PayloadConverter protobufConverter = new ProtobufJsonPayloadConverter();
        PayloadConverter customConverter = new MyCustomPayloadConverter();

        // Create data converter with custom converters
        DataConverter dataConverter = DataConverter.newDefaultInstance(
            customConverter,      // Highest priority
            protobufConverter,
            jsonConverter        // Fallback
        );

        // Use with workflow client
        WorkflowClient client = WorkflowClient.newInstance(
            service,
            WorkflowClientOptions.newBuilder()
                .setDataConverter(dataConverter)
                .build()
        );

        // Use with worker
        Worker worker = factory.newWorker(
            "task-queue",
            WorkerOptions.newBuilder()
                .setDataConverter(dataConverter)
                .build()
        );
    }

    private static class MyCustomPayloadConverter implements PayloadConverter {
        private static final String ENCODING_TYPE = "application/x-custom";

        @Override
        public String getEncodingType() {
            return ENCODING_TYPE;
        }

        @Override
        public Optional<Payload> toData(Object value) throws DataConverterException {
            if (value instanceof MyCustomObject) {
                try {
                    byte[] data = serializeCustomObject((MyCustomObject) value);
                    return Optional.of(
                        Payload.newBuilder()
                            .putMetadata(EncodingKeys.METADATA_ENCODING_KEY,
                                ByteString.copyFromUtf8(ENCODING_TYPE))
                            .setData(ByteString.copyFrom(data))
                            .build()
                    );
                } catch (Exception e) {
                    throw new DataConverterException("Failed to serialize custom object", e);
                }
            }
            return Optional.empty();
        }

        @Override
        public <T> T fromData(Payload content, Class<T> valueClass, Type valueType) throws DataConverterException {
            if (!valueClass.isAssignableFrom(MyCustomObject.class)) {
                throw new DataConverterException("Unsupported type: " + valueClass);
            }

            try {
                byte[] data = content.getData().toByteArray();
                MyCustomObject obj = deserializeCustomObject(data);
                return valueClass.cast(obj);
            } catch (Exception e) {
                throw new DataConverterException("Failed to deserialize custom object", e);
            }
        }

        private byte[] serializeCustomObject(MyCustomObject obj) {
            // Custom serialization logic
            return obj.serialize();
        }

        private MyCustomObject deserializeCustomObject(byte[] data) {
            // Custom deserialization logic
            return MyCustomObject.deserialize(data);
        }
    }
}

Payload Codec

Interface for encoding/decoding payloads (compression, encryption).

/**
 * Interface for encoding/decoding of payloads (encryption, compression).
 */
public interface PayloadCodec {
    /**
     * Encodes list of payloads.
     * @param payloads payloads to encode
     * @return encoded payloads
     */
    List<Payload> encode(List<Payload> payloads);

    /**
     * Decodes list of payloads.
     * @param payloads payloads to decode
     * @return decoded payloads
     */
    List<Payload> decode(List<Payload> payloads);
}

Usage Examples:

public class EncryptionCodecExample {
    public static class AESEncryptionCodec implements PayloadCodec {
        private static final String ENCODING_TYPE = "binary/encrypted";
        private final byte[] encryptionKey;

        public AESEncryptionCodec(byte[] encryptionKey) {
            this.encryptionKey = encryptionKey;
        }

        @Override
        public List<Payload> encode(List<Payload> payloads) {
            return payloads.stream()
                .map(this::encryptPayload)
                .collect(Collectors.toList());
        }

        @Override
        public List<Payload> decode(List<Payload> payloads) {
            return payloads.stream()
                .map(this::decryptPayload)
                .collect(Collectors.toList());
        }

        private Payload encryptPayload(Payload payload) {
            try {
                // Skip if already encrypted
                String encoding = payload.getMetadataMap()
                    .get(EncodingKeys.METADATA_ENCODING_KEY)
                    .toStringUtf8();

                if (ENCODING_TYPE.equals(encoding)) {
                    return payload;
                }

                // Encrypt payload data
                byte[] data = payload.getData().toByteArray();
                byte[] encryptedData = encrypt(data, encryptionKey);

                return Payload.newBuilder()
                    .putAllMetadata(payload.getMetadataMap())
                    .putMetadata(EncodingKeys.METADATA_ENCODING_KEY,
                        ByteString.copyFromUtf8(ENCODING_TYPE))
                    .putMetadata("encryption-algorithm",
                        ByteString.copyFromUtf8("AES-256-GCM"))
                    .setData(ByteString.copyFrom(encryptedData))
                    .build();
            } catch (Exception e) {
                throw new RuntimeException("Failed to encrypt payload", e);
            }
        }

        private Payload decryptPayload(Payload payload) {
            try {
                String encoding = payload.getMetadataMap()
                    .get(EncodingKeys.METADATA_ENCODING_KEY)
                    .toStringUtf8();

                if (!ENCODING_TYPE.equals(encoding)) {
                    return payload; // Not encrypted
                }

                // Decrypt payload data
                byte[] encryptedData = payload.getData().toByteArray();
                byte[] decryptedData = decrypt(encryptedData, encryptionKey);

                // Remove encryption metadata and restore original encoding
                Map<String, ByteString> metadata = new HashMap<>(payload.getMetadataMap());
                metadata.remove(EncodingKeys.METADATA_ENCODING_KEY);
                metadata.remove("encryption-algorithm");

                return Payload.newBuilder()
                    .putAllMetadata(metadata)
                    .setData(ByteString.copyFrom(decryptedData))
                    .build();
            } catch (Exception e) {
                throw new RuntimeException("Failed to decrypt payload", e);
            }
        }

        private byte[] encrypt(byte[] data, byte[] key) throws Exception {
            // AES encryption implementation
            Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
            SecretKeySpec keySpec = new SecretKeySpec(key, "AES");
            cipher.init(Cipher.ENCRYPT_MODE, keySpec);
            return cipher.doFinal(data);
        }

        private byte[] decrypt(byte[] encryptedData, byte[] key) throws Exception {
            // AES decryption implementation
            Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
            SecretKeySpec keySpec = new SecretKeySpec(key, "AES");
            cipher.init(Cipher.DECRYPT_MODE, keySpec);
            return cipher.doFinal(encryptedData);
        }
    }

    public static void useEncryptionCodec() {
        // Generate encryption key (in practice, use secure key management)
        byte[] encryptionKey = generateSecureKey();

        // Create data converter with encryption codec
        PayloadCodec encryptionCodec = new AESEncryptionCodec(encryptionKey);
        DataConverter dataConverter = DataConverter.getDefaultInstance()
            .withPayloadCodec(encryptionCodec);

        // Use with workflow client
        WorkflowClient client = WorkflowClient.newInstance(
            service,
            WorkflowClientOptions.newBuilder()
                .setDataConverter(dataConverter)
                .build()
        );
    }
}

Serialization Context

Context information for payload conversion.

/**
 * Context information for payload conversion.
 */
public interface SerializationContext {
    /**
     * Gets namespace for the context.
     * @return namespace
     */
    String getNamespace();
}

/**
 * Activity-specific serialization context.
 */
public interface ActivitySerializationContext extends SerializationContext {
    /**
     * Gets namespace for the activity.
     * @return namespace
     */
    String getNamespace();

    /**
     * Gets task queue for the activity.
     * @return task queue name
     */
    String getTaskQueue();

    /**
     * Gets workflow ID that scheduled the activity.
     * @return workflow ID
     */
    String getWorkflowId();

    /**
     * Gets run ID of the workflow.
     * @return run ID
     */
    String getRunId();

    /**
     * Gets activity ID.
     * @return activity ID
     */
    String getActivityId();

    /**
     * Gets activity type.
     * @return activity type
     */
    String getActivityType();

    /**
     * Gets attempt number.
     * @return attempt number
     */
    int getAttempt();
}

Built-in Payload Converters

Default payload converters provided by the SDK.

/**
 * Converts null values to empty payloads.
 */
public class NullPayloadConverter implements PayloadConverter {
    public static final NullPayloadConverter INSTANCE = new NullPayloadConverter();

    @Override
    public String getEncodingType() {
        return "binary/null";
    }

    @Override
    public Optional<Payload> toData(Object value) throws DataConverterException {
        if (value == null) {
            return Optional.of(
                Payload.newBuilder()
                    .putMetadata(EncodingKeys.METADATA_ENCODING_KEY,
                        ByteString.copyFromUtf8(getEncodingType()))
                    .build()
            );
        }
        return Optional.empty();
    }

    @Override
    public <T> T fromData(Payload content, Class<T> valueClass, Type valueType) throws DataConverterException {
        return null;
    }
}

/**
 * Converts byte arrays to binary payloads.
 */
public class ByteArrayPayloadConverter implements PayloadConverter {
    public static final ByteArrayPayloadConverter INSTANCE = new ByteArrayPayloadConverter();

    @Override
    public String getEncodingType() {
        return "binary/plain";
    }

    @Override
    public Optional<Payload> toData(Object value) throws DataConverterException {
        if (value instanceof byte[]) {
            return Optional.of(
                Payload.newBuilder()
                    .putMetadata(EncodingKeys.METADATA_ENCODING_KEY,
                        ByteString.copyFromUtf8(getEncodingType()))
                    .setData(ByteString.copyFrom((byte[]) value))
                    .build()
            );
        }
        return Optional.empty();
    }

    @Override
    public <T> T fromData(Payload content, Class<T> valueClass, Type valueType) throws DataConverterException {
        if (!valueClass.isAssignableFrom(byte[].class)) {
            throw new DataConverterException("Cannot convert to " + valueClass);
        }
        return valueClass.cast(content.getData().toByteArray());
    }
}

/**
 * Converts protobuf messages to JSON payloads.
 */
public class ProtobufJsonPayloadConverter implements PayloadConverter {
    @Override
    public String getEncodingType() {
        return "json/protobuf";
    }

    @Override
    public Optional<Payload> toData(Object value) throws DataConverterException {
        if (value instanceof Message) {
            try {
                String json = JsonFormat.printer().print((Message) value);
                return Optional.of(
                    Payload.newBuilder()
                        .putMetadata(EncodingKeys.METADATA_ENCODING_KEY,
                            ByteString.copyFromUtf8(getEncodingType()))
                        .putMetadata("messageType",
                            ByteString.copyFromUtf8(value.getClass().getName()))
                        .setData(ByteString.copyFromUtf8(json))
                        .build()
                );
            } catch (Exception e) {
                throw new DataConverterException("Failed to convert protobuf to JSON", e);
            }
        }
        return Optional.empty();
    }

    @Override
    public <T> T fromData(Payload content, Class<T> valueClass, Type valueType) throws DataConverterException {
        if (!Message.class.isAssignableFrom(valueClass)) {
            throw new DataConverterException("Not a protobuf message type");
        }

        try {
            String json = content.getData().toStringUtf8();
            Message.Builder builder = getMessageBuilder(valueClass);
            JsonFormat.parser().merge(json, builder);
            return valueClass.cast(builder.build());
        } catch (Exception e) {
            throw new DataConverterException("Failed to convert JSON to protobuf", e);
        }
    }

    private Message.Builder getMessageBuilder(Class<?> messageClass) throws Exception {
        Method method = messageClass.getMethod("newBuilder");
        return (Message.Builder) method.invoke(null);
    }
}

/**
 * Converts objects to JSON using Jackson.
 */
public class JacksonJsonPayloadConverter implements PayloadConverter {
    private final ObjectMapper objectMapper;

    public JacksonJsonPayloadConverter(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
    }

    public JacksonJsonPayloadConverter() {
        this(new ObjectMapper()
            .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
            .registerModule(new JavaTimeModule())
        );
    }

    @Override
    public String getEncodingType() {
        return "json/jackson";
    }

    @Override
    public Optional<Payload> toData(Object value) throws DataConverterException {
        // Skip primitive types and strings (handled by other converters)
        if (value == null || isPrimitive(value.getClass())) {
            return Optional.empty();
        }

        try {
            byte[] json = objectMapper.writeValueAsBytes(value);
            return Optional.of(
                Payload.newBuilder()
                    .putMetadata(EncodingKeys.METADATA_ENCODING_KEY,
                        ByteString.copyFromUtf8(getEncodingType()))
                    .setData(ByteString.copyFrom(json))
                    .build()
            );
        } catch (Exception e) {
            throw new DataConverterException("Failed to serialize to JSON", e);
        }
    }

    @Override
    public <T> T fromData(Payload content, Class<T> valueClass, Type valueType) throws DataConverterException {
        try {
            byte[] data = content.getData().toByteArray();
            if (valueType != null && valueType != valueClass) {
                TypeReference<T> typeReference = new TypeReference<T>() {
                    @Override
                    public Type getType() {
                        return valueType;
                    }
                };
                return objectMapper.readValue(data, typeReference);
            } else {
                return objectMapper.readValue(data, valueClass);
            }
        } catch (Exception e) {
            throw new DataConverterException("Failed to deserialize from JSON", e);
        }
    }

    private boolean isPrimitive(Class<?> clazz) {
        return clazz.isPrimitive() ||
               clazz == String.class ||
               Number.class.isAssignableFrom(clazz) ||
               clazz == Boolean.class;
    }
}

Data Converter Exception

Exception thrown during data conversion operations.

/**
 * Exception thrown during data conversion operations.
 */
public class DataConverterException extends Exception {
    /**
     * Creates DataConverterException with message.
     * @param message exception message
     */
    public DataConverterException(String message);

    /**
     * Creates DataConverterException with message and cause.
     * @param message exception message
     * @param cause underlying cause
     */
    public DataConverterException(String message, Throwable cause);

    /**
     * Creates DataConverterException with cause.
     * @param cause underlying cause
     */
    public DataConverterException(Throwable cause);
}

Usage Examples:

public class ComprehensiveDataConversionExample {
    public static void setupComprehensiveDataConverter() {
        // Create custom Jackson ObjectMapper
        ObjectMapper objectMapper = new ObjectMapper()
            .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
            .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
            .registerModule(new JavaTimeModule())
            .registerModule(new Jdk8Module());

        // Create payload converters in priority order
        List<PayloadConverter> payloadConverters = Arrays.asList(
            NullPayloadConverter.INSTANCE,                          // Handle nulls
            ByteArrayPayloadConverter.INSTANCE,                     // Handle byte arrays
            new ProtobufJsonPayloadConverter(),                     // Handle protobuf
            new JacksonJsonPayloadConverter(objectMapper),          // Handle POJOs
            new MyCustomPayloadConverter()                          // Handle custom types
        );

        // Create encryption codec
        PayloadCodec encryptionCodec = new AESEncryptionCodec(getEncryptionKey());

        // Create compression codec
        PayloadCodec compressionCodec = new GzipCompressionCodec();

        // Chain codecs (compression then encryption)
        PayloadCodec chainedCodec = new ChainedPayloadCodec(compressionCodec, encryptionCodec);

        // Create data converter
        DataConverter dataConverter = DataConverter.newDefaultInstance(
                payloadConverters.toArray(new PayloadConverter[0])
            )
            .withPayloadCodec(chainedCodec)
            .withFailureConverter(new CustomFailureConverter());

        // Use with client and workers
        WorkflowClientOptions clientOptions = WorkflowClientOptions.newBuilder()
            .setDataConverter(dataConverter)
            .build();

        WorkflowClient client = WorkflowClient.newInstance(service, clientOptions);
    }

    // Compression codec example
    public static class GzipCompressionCodec implements PayloadCodec {
        private static final String ENCODING_TYPE = "binary/gzip";
        private static final int MIN_SIZE_FOR_COMPRESSION = 1024; // Only compress if > 1KB

        @Override
        public List<Payload> encode(List<Payload> payloads) {
            return payloads.stream()
                .map(this::compressPayload)
                .collect(Collectors.toList());
        }

        @Override
        public List<Payload> decode(List<Payload> payloads) {
            return payloads.stream()
                .map(this::decompressPayload)
                .collect(Collectors.toList());
        }

        private Payload compressPayload(Payload payload) {
            byte[] data = payload.getData().toByteArray();

            // Only compress if data is large enough
            if (data.length < MIN_SIZE_FOR_COMPRESSION) {
                return payload;
            }

            try {
                byte[] compressedData = compress(data);

                // Only use compression if it actually saves space
                if (compressedData.length >= data.length) {
                    return payload;
                }

                return Payload.newBuilder()
                    .putAllMetadata(payload.getMetadataMap())
                    .putMetadata(EncodingKeys.METADATA_ENCODING_KEY,
                        ByteString.copyFromUtf8(ENCODING_TYPE))
                    .putMetadata("original-size",
                        ByteString.copyFromUtf8(String.valueOf(data.length)))
                    .setData(ByteString.copyFrom(compressedData))
                    .build();
            } catch (Exception e) {
                // If compression fails, return original payload
                return payload;
            }
        }

        private Payload decompressPayload(Payload payload) {
            String encoding = payload.getMetadataMap()
                .get(EncodingKeys.METADATA_ENCODING_KEY)
                .toStringUtf8();

            if (!ENCODING_TYPE.equals(encoding)) {
                return payload; // Not compressed
            }

            try {
                byte[] compressedData = payload.getData().toByteArray();
                byte[] decompressedData = decompress(compressedData);

                // Remove compression metadata
                Map<String, ByteString> metadata = new HashMap<>(payload.getMetadataMap());
                metadata.remove(EncodingKeys.METADATA_ENCODING_KEY);
                metadata.remove("original-size");

                return Payload.newBuilder()
                    .putAllMetadata(metadata)
                    .setData(ByteString.copyFrom(decompressedData))
                    .build();
            } catch (Exception e) {
                throw new RuntimeException("Failed to decompress payload", e);
            }
        }

        private byte[] compress(byte[] data) throws IOException {
            try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
                 GZIPOutputStream gzipOut = new GZIPOutputStream(baos)) {
                gzipOut.write(data);
                gzipOut.finish();
                return baos.toByteArray();
            }
        }

        private byte[] decompress(byte[] compressedData) throws IOException {
            try (ByteArrayInputStream bais = new ByteArrayInputStream(compressedData);
                 GZIPInputStream gzipIn = new GZIPInputStream(bais);
                 ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
                byte[] buffer = new byte[1024];
                int len;
                while ((len = gzipIn.read(buffer)) != -1) {
                    baos.write(buffer, 0, len);
                }
                return baos.toByteArray();
            }
        }
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-io-temporal--temporal-sdk

docs

activities.md

client.md

data-conversion.md

exceptions.md

index.md

workers.md

workflows.md

tile.json