Java Client Samples

This project demonstrates how to use the ArmoniK Java Client SDK to submit tasks and retrieve results.

Prerequisites

Project Structure

client/
├── pom.xml
└── src/main/java/fr/aneo/armonik/client/samples/
    ├── listener/
    │   └── SimpleBlobListener.java
    ├── HelloWorld.java
    ├── SharedBlob.java
    ├── TaskDependencies.java
    ├── DynamicLibrary.java
    └── CppDynamicLibrary.java

Samples Overview

Sample

Description

Required Partition

HelloWorld

Basic task submission with inline input

helloworld

SharedBlob

Reuse a blob across multiple tasks

sum

TaskDependencies

Chain tasks using outputs as inputs

sum

DynamicLibrary

Load a library JAR at runtime

javadynamic

CppDynamicLibrary

Java client calling a C++ dynamic worker (interoperability)

cppdynamic

Partition Setup

Before running the samples, you need to configure the required partitions in your ArmoniK deployment.

Edit the partitions.tfvars file in your ArmoniK deployment directory (typically infrastructure/quick-deploy/localhost/):

compute_plane = {
  # partition for HelloWorld sample
  helloworld = {
    replicas    = 0
    socket_type = "tcp"
    polling_agent = {
      limits   = { cpu = "2000m", memory = "2048Mi" }
      requests = { cpu = "50m", memory = "50Mi" }
    }
    worker = [{
      image    = "hello-world-worker"
      tag      = "latest"
      limits   = { cpu = "1000m", memory = "1024Mi" }
      requests = { cpu = "50m", memory = "50Mi" }
    }]
    hpa = {
      type              = "prometheus"
      polling_interval  = 15
      cooldown_period   = 300
      min_replica_count = 0
      max_replica_count = 5
      behavior = {
        restore_to_original_replica_count = true
        stabilization_window_seconds      = 300
        type                              = "Percent"
        value                             = 100
        period_seconds                    = 15
      }
      triggers = [
        { type = "prometheus", threshold = 2 }
      ]
    }
  },
  # partition for sum worker
  sum = {
    replicas    = 0
    socket_type = "tcp"
    polling_agent = {
      limits   = { cpu = "2000m", memory = "2048Mi" }
      requests = { cpu = "50m", memory = "50Mi" }
    }
    worker = [{
      image    = "sum-worker"
      tag      = "latest"
      limits   = { cpu = "1000m", memory = "1024Mi" }
      requests = { cpu = "50m", memory = "50Mi" }
    }]
    hpa = {
      type              = "prometheus"
      polling_interval  = 15
      cooldown_period   = 300
      min_replica_count = 0
      max_replica_count = 5
      behavior = {
        restore_to_original_replica_count = true
        stabilization_window_seconds      = 300
        type                              = "Percent"
        value                             = 100
        period_seconds                    = 15
      }
      triggers = [
        { type = "prometheus", threshold = 2 }
      ]
    }
  },
  # partition for Worker Library sample (uses pre-built image from Docker Hub)
  javadynamic = {
    replicas    = 0
    socket_type = "tcp"
    polling_agent = {
      limits   = { cpu = "2000m", memory = "2048Mi" }
      requests = { cpu = "50m", memory = "50Mi" }
    }
    worker = [{
      image    = "dockerhubaneo/armonik-dynamic-java-worker"
      tag      = "latest"
      limits   = { cpu = "1000m", memory = "1024Mi" }
      requests = { cpu = "50m", memory = "50Mi" }
    }]
    hpa = {
      type              = "prometheus"
      polling_interval  = 15
      cooldown_period   = 300
      min_replica_count = 0
      max_replica_count = 5
      behavior = {
        restore_to_original_replica_count = true
        stabilization_window_seconds      = 300
        type                              = "Percent"
        value                             = 100
        period_seconds                    = 15
      }
      triggers = [
        { type = "prometheus", threshold = 2 }
      ]
    }
  },

  # partition for CppDynamicLibrary sample (C++ DynamicWorker from the ArmoniK C++ SDK)
  cppdynamic = {
    replicas    = 0
    socket_type = "tcp"
    polling_agent = {
      limits   = { cpu = "2000m", memory = "2048Mi" }
      requests = { cpu = "50m", memory = "50Mi" }
    }
    worker = [{
      image    = "dockerhubaneo/armonik-sdk-cpp-dynamicworker"
      tag      = "0.5.2"
      limits   = { cpu = "1000m", memory = "1024Mi" }
      requests = { cpu = "50m", memory = "50Mi" }
    }]
    hpa = {
      type              = "prometheus"
      polling_interval  = 15
      cooldown_period   = 300
      min_replica_count = 0
      max_replica_count = 5
      behavior = {
        restore_to_original_replica_count = true
        stabilization_window_seconds      = 300
        type                              = "Percent"
        value                             = 100
        period_seconds                    = 15
      }
      triggers = [
        { type = "prometheus", threshold = 2 }
      ]
    }
  },

  # existing partitions
  default = {
    # ... existing default configuration
  }
}

Apply the configuration:

cd infrastructure/quick-deploy/localhost
make deploy

Verify partitions are created in the ArmoniK Admin GUI at http://localhost:5000/admin.

Building the Client

./mvnw clean package

Running the Samples

HelloWorld

Submits a single task with an inline input and waits for the greeting output.

./mvnw compile exec:java -Dexec.mainClass="fr.aneo.armonik.client.samples.HelloWorld"

Expected output:

Blob completed - id: <blob-id>, data: Hello John. Welcome to Armonik Java Worker !!

SharedBlob

Demonstrates creating a blob once and reusing it across multiple task inputs.

./mvnw compile exec:java -Dexec.mainClass="fr.aneo.armonik.client.samples.SharedBlob"

Expected output:

Blob completed - id: <blob-id>, data: 4
Blob completed - id: <blob-id>, data: 5

TaskDependencies

Creates a task graph where a third task depends on the outputs of two previous tasks:

task1 (1+2=3) ──┐
                ├──► task3 (3+7=10)
task2 (3+4=7) ──┘
./mvnw compile exec:java -Dexec.mainClass="fr.aneo.armonik.client.samples.TaskDependencies"

Expected output:

Blob completed - id: <blob-id>, data: 3
Blob completed - id: <blob-id>, data: 7
Blob completed - id: <blob-id>, data: 10

DynamicLibrary

Uploads a library ZIP at runtime and executes tasks using it.

Setup: Copy the multiply library ZIP to the resources folder:

cp ../worker-libraries/multiply-library/target/multiply-library.zip src/main/resources/

Run:

./mvnw compile exec:java -Dexec.mainClass="fr.aneo.armonik.client.samples.DynamicLibrary"

Expected output:

Blob completed - id: <blob-id>, data: 6

CppDynamicLibrary

Demonstrates Java-to-C++ interoperability: the Java client uploads a C++ shared library (.so) as a blob, then submits a task to the C++ DynamicWorker which loads the library at runtime and calls the specified function.

Setup: Build the C++ worker shared library from the cpp/ChainedArithmetic sample. Then provide its path via the fr.aneo.armonik.client.samples.dynlib.path system property.

Run:

./mvnw compile exec:java \
  -Dexec.mainClass="fr.aneo.armonik.client.samples.CppDynamicLibrary" \
  -Dfr.aneo.armonik.client.samples.dynlib.path=/path/to/libArmoniK.Samples.Cpp.ChainedArithmetic.Worker.so

Expected output:

Blob completed - id: <blob-id>, data: 6

Key Concepts

ArmoniKClient

The entry point for interacting with the ArmoniK cluster:

var config = ArmoniKConfig.builder()
                          .endpoint("http://localhost:5001")
                          .withoutSslValidation()
                          .build();

try (var client = new ArmoniKClient(config)) {
    // Use client...
}

SessionDefinition

Defines a session with target partitions, default task configuration, and a completion listener:

var sessionDefinition = new SessionDefinition(
    Set.of("partition-name"),
    TaskConfiguration.defaultConfigurationWithPartition("partition-name"),
    new SimpleBlobListener()
);
var session = client.openSession(sessionDefinition);

TaskDefinition

Defines a task with inputs and outputs:

var taskDefinition = new TaskDefinition()
    .withInput("inputName", InputBlobDefinition.from("data".getBytes(UTF_8)))
    .withOutput("outputName");

session.submitTask(taskDefinition);

WorkerLibrary (Dynamic Loading)

Specifies a library to load at runtime:

var libraryBlob = session.createBlob(InputBlobDefinition.from(zipFile));

var library = new WorkerLibrary(
    "library.jar",                    // JAR filename inside ZIP
    "com.example.MyProcessor",        // Fully qualified class name
    libraryBlob                       // Blob containing the ZIP
);

var taskDefinition = new TaskDefinition()
    .withWorkerLibrary(library)
    .withInput("num1", InputBlobDefinition.from("2".getBytes(UTF_8)))
    .withOutput("result");

BlobCompletionListener

Receives notifications when output blobs are completed:

public class SimpleBlobListener implements BlobCompletionListener {
    @Override
    public void onSuccess(Blob blob) {
        System.out.println("Data: " + new String(blob.data(), UTF_8));
    }

    @Override
    public void onError(BlobError blobError) {
        System.out.println("Error: " + blobError.cause().getMessage());
    }
}

Learn More