CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-apache-beam

Provider package for Apache Beam integration with Apache Airflow supporting Python, Java, and Go pipeline execution

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

java-pipelines.mddocs/

Java Pipeline Execution

Execute Apache Beam pipelines written in Java using self-executing JAR files with comprehensive support for various runners, Dataflow integration, and job lifecycle management. The Java pipeline operator provides robust execution capabilities for enterprise-grade data processing workflows.

Capabilities

BeamRunJavaPipelineOperator

Launch Apache Beam pipelines written in Java using self-executing JAR files with support for job classes, multiple runners, and cloud integration.

class BeamRunJavaPipelineOperator(BeamBasePipelineOperator):
    def __init__(
        self,
        *,
        jar: str,
        runner: str = "DirectRunner",
        job_class: str | None = None,
        default_pipeline_options: dict | None = None,
        pipeline_options: dict | None = None,
        gcp_conn_id: str = "google_cloud_default",
        dataflow_config: DataflowConfiguration | dict | None = None,
        deferrable: bool = False,
        **kwargs,
    ) -> None:
        """
        Initialize Java pipeline operator.

        Parameters:
        - jar (str): Path to self-executing JAR file, supports local paths and gs:// URLs
        - runner (str): Runner type (DirectRunner, DataflowRunner, SparkRunner, etc.)
        - job_class (str): Java class name for pipeline, often not the main class
        - default_pipeline_options (dict): High-level pipeline options applied to all tasks
        - pipeline_options (dict): Task-specific pipeline options that override defaults
        - gcp_conn_id (str): Google Cloud connection ID for GCS and Dataflow access
        - dataflow_config (DataflowConfiguration|dict): Dataflow-specific configuration
        - deferrable (bool): Enable deferrable execution mode
        """

    def execute(self, context: Context):
        """Execute the Apache Beam Java Pipeline."""

    def execute_on_dataflow(self, context: Context):
        """Execute the Apache Beam Pipeline on Dataflow runner."""

    def on_kill(self) -> None:
        """Cancel the pipeline when task is killed."""

Usage Examples

Basic Local Execution

from airflow.providers.apache.beam.operators.beam import BeamRunJavaPipelineOperator

run_local_java_pipeline = BeamRunJavaPipelineOperator(
    task_id='run_local_java_pipeline',
    jar='/path/to/pipeline.jar',
    runner='DirectRunner',
    pipeline_options={
        '--output': '/tmp/output',
        '--tempLocation': '/tmp/beam_temp',
    },
)

Dataflow Execution with Job Class

run_dataflow_java_pipeline = BeamRunJavaPipelineOperator(
    task_id='run_dataflow_java_pipeline',
    jar='gs://my-bucket/pipeline.jar',
    job_class='com.company.pipelines.DataProcessingPipeline',
    runner='DataflowRunner',
    pipeline_options={
        '--project': 'my-gcp-project',
        '--region': 'us-central1',
        '--tempLocation': 'gs://my-bucket/temp',
        '--stagingLocation': 'gs://my-bucket/staging',
        '--numWorkers': '4',
        '--maxNumWorkers': '10',
        '--machineType': 'n1-standard-4',
        '--inputTable': 'project:dataset.input_table',
        '--outputTable': 'project:dataset.output_table',
    },
    dataflow_config={
        'job_name': 'java-dataflow-job',
        'project_id': 'my-gcp-project',
        'location': 'us-central1',
        'wait_until_finished': True,
        'check_if_running': 'WaitForRun',
    },
)

Self-Executing JAR (No Job Class)

run_self_executing_jar = BeamRunJavaPipelineOperator(
    task_id='run_self_executing_jar',
    jar='gs://my-bucket/self-executing-pipeline.jar',
    # job_class not specified - uses JAR main class
    runner='DataflowRunner',
    pipeline_options={
        '--project': 'my-gcp-project',
        '--tempLocation': 'gs://my-bucket/temp',
        '--input': 'gs://input-bucket/data/*',
        '--output': 'gs://output-bucket/results/',
    },
)

Deferrable Execution with Monitoring

run_deferrable_java_pipeline = BeamRunJavaPipelineOperator(
    task_id='run_deferrable_java_pipeline',
    jar='gs://my-bucket/long-running-pipeline.jar',
    job_class='com.company.LongRunningPipeline',
    runner='DataflowRunner',
    deferrable=True,  # Enable deferrable mode
    pipeline_options={
        '--project': 'my-gcp-project',
        '--region': 'us-central1',
        '--tempLocation': 'gs://my-bucket/temp',
        '--streaming': 'true',  # Long-running streaming job
    },
    dataflow_config={
        'job_name': 'streaming-java-pipeline',
        'wait_until_finished': True,
        'multiple_jobs': True,  # Allow multiple instances
    },
)

Configuration Options

Java-Specific Pipeline Options

Java pipelines use camelCase option naming conventions:

  • Basic Options: --runner, --project, --region, --tempLocation, --stagingLocation
  • Scaling Options: --numWorkers, --maxNumWorkers, --machineType, --diskSizeGb
  • Processing Options: --streaming, --windowSize, --maxBundleSize
  • I/O Options: --inputTable, --outputTable, --inputTopic, --outputTopic

JAR File Requirements

Java pipelines require self-executing JAR files that include:

  • All dependencies bundled (fat JAR or shaded JAR)
  • Proper manifest with main class (if no job_class specified)
  • Apache Beam SDK dependencies
  • Any custom transforms and I/O connectors

Example Maven configuration for creating self-executing JAR:

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <executions>
        <execution>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <transformers>
                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                        <mainClass>com.company.pipelines.MyPipeline</mainClass>
                    </transformer>
                </transformers>
            </configuration>
        </execution>
    </executions>
</plugin>

Dataflow-Specific Options

When using DataflowRunner with Java pipelines:

dataflow_config = {
    'job_name': 'java-pipeline-job',
    'project_id': 'gcp-project-id',
    'location': 'us-central1',
    'check_if_running': 'WaitForRun',  # Wait for existing jobs to complete
    'multiple_jobs': False,  # Allow only one job instance
    'wait_until_finished': True,
    'cancel_timeout': 600,  # 10 minutes to cancel
}

Error Handling

The operator handles Java-specific error conditions:

  • JAR File Access: Validates JAR file accessibility and downloads from GCS
  • Class Loading: Handles job class loading and main class resolution
  • JVM Configuration: Manages Java runtime environment and memory settings
  • Dataflow Integration: Handles Dataflow job submission and monitoring
  • Job Lifecycle: Manages job cancellation and cleanup

Boolean Options Handling

Java pipelines have specific boolean option handling:

pipeline_options = {
    '--usePublicIps': 'false',  # Explicit false value for Java
    '--enableStreamingEngine': 'true',  # Explicit true value
    '--enableDataflowServiceOptions': 'true',
}

Templating Support

The operator supports Airflow templating for dynamic Java pipeline configuration:

run_templated_java_pipeline = BeamRunJavaPipelineOperator(
    task_id='run_templated_java_pipeline',
    jar='gs://{{ var.value.jar_bucket }}/pipeline-{{ ds_nodash }}.jar',
    job_class='{{ var.value.pipeline_class }}',
    runner='{{ var.value.runner }}',
    pipeline_options={
        '--project': '{{ var.value.gcp_project }}',
        '--inputTable': '{{ var.value.project }}:{{ var.value.dataset }}.input_{{ ds_nodash }}',
        '--outputTable': '{{ var.value.project }}:{{ var.value.dataset }}.output_{{ ds_nodash }}',
        '--jobName': 'java-pipeline-{{ ds_nodash }}-{{ ts_nodash }}',
    },
)

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-apache-beam

docs

go-pipelines.md

hooks-monitoring.md

index.md

java-pipelines.md

python-pipelines.md

triggers.md

tile.json