Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ lazy val jvmProjects: Seq[ProjectReference] = Seq(
agent,
bedrock,
netty,
test.jvm
test.jvm,
temporalExample
)

lazy val jsProjects: Seq[ProjectReference] = Seq(core.js, uni.js, domTest, test.js)
Expand Down Expand Up @@ -306,3 +307,25 @@ lazy val integrationTest = project
ideSkipProject := false
)
.dependsOn(bedrock, test.jvm % Test)

val TEMPORAL_SDK_VERSION = "1.27.0"
val JACKSON_SCALA_VERSION = "2.14.2"

lazy val temporalExample = project
.in(file("uni-temporal-example"))
.settings(
buildSettings,
noPublish,
name := "uni-temporal-example",
description := "Temporal workflow example for Scala 3 usability exploration",
ideSkipProject := false,
libraryDependencies ++=
Seq(
"io.temporal" % "temporal-sdk" % TEMPORAL_SDK_VERSION,
"io.temporal" % "temporal-testing" % TEMPORAL_SDK_VERSION % Test,
"com.fasterxml.jackson.module" %% "jackson-module-scala" % JACKSON_SCALA_VERSION,
// Redirect slf4j to airframe-log
"org.slf4j" % "slf4j-jdk14" % "2.0.17"
)
)
.dependsOn(uni.jvm, test.jvm % Test)
54 changes: 54 additions & 0 deletions plans/2026-04-04-temporal-example.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Temporal Example Project for Scala 3

## Objective

Create a small Temporal example project inside `uni` to evaluate the Temporal workflow engine's usability, SDK ergonomics, and fit for durable workflow orchestration in the Scala 3 ecosystem.

## Background

Temporal is a workflow orchestration platform built for durability. Workflows survive crashes; activities are retried automatically. This exploration aims to understand:

1. How Temporal's Java SDK feels in Scala 3
2. How to write workflows and activities idiomatically
3. How retries, error handling, and observability work
4. Whether Temporal complements the lightweight `uni-task` approach

## Module: `uni-temporal-example`

A JVM-only example module (not published) that demonstrates:

### Workflows

1. **HelloWorkflow** — minimal "hello world" to understand the basics
2. **DataPipelineWorkflow** — multi-step pipeline: fetch → transform → store

### Activities

- `GreetingActivities` — simple greeting activity
- `DataActivities` — fetch, transform, and store with simulated failures for retry demo

### Key Concepts Covered

- Workflow interface + implementation separation (required by Temporal)
- Activity interface + implementation
- `ActivityOptions` with retry policy
- Test server (`TestWorkflowEnvironment`) for unit tests without a real server
- Workflow signals and queries (stretch goal)

## Implementation Plan

1. Add `uni-temporal-example` module to `build.sbt`
2. Add Temporal SDK dependencies:
- `io.temporal:temporal-sdk`
- `io.temporal:temporal-testing` (test scope)
3. Implement `HelloWorkflow` with `GreetingActivities`
4. Implement `DataPipelineWorkflow` with `DataActivities`
5. Write `UniTest`-based tests using `TestWorkflowEnvironment`
6. Add a `TemporalWorkerApp` entry point for running against a real local server

## Findings (to be filled in)

- SDK ergonomics:
- Test experience:
- Comparison to uni-task:
- Recommendation:
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package wvlet.uni.temporal.example

import io.temporal.activity.{ActivityInterface, ActivityMethod}
import io.temporal.workflow.{WorkflowInterface, WorkflowMethod}

/** Input/output data types for the pipeline. */
case class PipelineInput(sourceId: String, rawData: String)
case class PipelineResult(sourceId: String, processedData: String, recordCount: Int)

/**
* A multi-step data pipeline workflow.
*
* Demonstrates sequential activity chaining. Each step is independently retried on failure, and
* the workflow survives worker restarts — Temporal replays history to restore state.
*/
@WorkflowInterface
trait DataPipelineWorkflow:
@WorkflowMethod
def process(input: PipelineInput): PipelineResult

/** Activities for the data pipeline. Each method is a separately-retried unit of work. */
@ActivityInterface
trait DataActivities:
/** Fetch raw data from a source. Simulates transient failures to show retry behaviour. */
@ActivityMethod
def fetchData(sourceId: String): String

/** Transform/clean the raw data. */
@ActivityMethod
def transformData(rawData: String): String

/** Persist the result and return a record count. */
@ActivityMethod
def storeData(sourceId: String, processedData: String): Int
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package wvlet.uni.temporal.example

import io.temporal.activity.{Activity, ActivityOptions}
import io.temporal.common.RetryOptions
import io.temporal.workflow.Workflow
import wvlet.uni.log.LogSupport

import java.time.Duration

/**
* DataPipeline workflow implementation.
*
* Chaining activities sequentially: fetchData → transformData → storeData. Each activity is
* scheduled with explicit retry options demonstrating Temporal's built-in retry mechanism.
*/
class DataPipelineWorkflowImpl extends DataPipelineWorkflow with LogSupport:

private val activities: DataActivities = Workflow.newActivityStub(
classOf[DataActivities],
ActivityOptions
.newBuilder()
// Each activity call must complete within 30s
.setStartToCloseTimeout(Duration.ofSeconds(30))
.setRetryOptions(
RetryOptions
.newBuilder()
// Retry up to 3 times with 1-second initial interval
.setMaximumAttempts(3)
.setInitialInterval(Duration.ofSeconds(1))
.setBackoffCoefficient(2.0)
.build()
)
.build()
)

override def process(input: PipelineInput): PipelineResult =
// Step 1: fetch
val rawData = activities.fetchData(input.sourceId)
// Step 2: transform
val processedData = activities.transformData(rawData)
// Step 3: store
val recordCount = activities.storeData(input.sourceId, processedData)
PipelineResult(input.sourceId, processedData, recordCount)

/**
* DataActivities implementation with a simulated transient failure on the first fetch attempt.
*
* Uses Temporal's ActivityExecutionContext to get the server-tracked attempt number, which is the
* recommended approach — it works correctly across different worker instances and survives
* restarts.
*/
class DataActivitiesImpl extends DataActivities with LogSupport:

override def fetchData(sourceId: String): String =
val attempt = Activity.getExecutionContext().getInfo().getAttempt()
logger.info(s"fetchData attempt ${attempt} for source '${sourceId}'")
if attempt == 1 then
// Simulate a transient failure on the first attempt
throw RuntimeException(s"Transient network error fetching '${sourceId}' (attempt 1)")
s"raw-data-for-${sourceId}"
Comment on lines +46 to +60
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Instead of using a manual AtomicInteger to track attempts, it is recommended to use Temporal's ActivityExecutionContext. This is more robust as it works correctly across different worker instances and provides the actual attempt number tracked by the Temporal server, which is useful for implementing custom retry logic or logging.

/**
  * DataActivities implementation with a simulated transient failure on the first fetch attempt.
  *
  * Temporal supplies the attempt number via ActivityExecutionContext, which is the recommended
  * way to track attempts across retries.
  */
class DataActivitiesImpl extends DataActivities with LogSupport:

  override def fetchData(sourceId: String): String =
    // Use Temporal's ActivityExecutionContext to get the current attempt number
    val attempt = io.temporal.activity.Activity.getExecutionContext().getInfo().getAttempt()
    logger.info(s"fetchData attempt ${attempt} for source '${sourceId}'")
    if attempt == 1 then
      // Simulate a transient failure on the first attempt
      throw RuntimeException(s"Transient network error fetching '${sourceId}' (attempt 1)")
    s"raw-data-for-${sourceId}"


override def transformData(rawData: String): String =
logger.info(s"Transforming: ${rawData}")
rawData.toUpperCase.replace("-", "_")

override def storeData(sourceId: String, processedData: String): Int =
logger.info(s"Storing ${processedData} for source '${sourceId}'")
// Simulate storing: return a fake record count
processedData.length
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package wvlet.uni.temporal.example

import io.temporal.activity.{ActivityInterface, ActivityMethod}
import io.temporal.workflow.{WorkflowInterface, WorkflowMethod}

/**
* The simplest possible Temporal workflow: greet a user.
*
* Temporal requires workflow and activity interfaces to be annotated with @WorkflowInterface /
* @ActivityInterface.
* Implementations are registered separately with the worker.
*/
@WorkflowInterface
trait HelloWorkflow:
@WorkflowMethod
def sayHello(name: String): String

@ActivityInterface
trait GreetingActivities:
@ActivityMethod
def composeGreeting(greeting: String, name: String): String
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package wvlet.uni.temporal.example

import io.temporal.activity.ActivityOptions
import io.temporal.workflow.Workflow
import wvlet.uni.log.LogSupport

import java.time.Duration

/**
* Workflow implementation.
*
* Workflow code must be deterministic — no I/O, random numbers, or wall-clock time. All side
* effects go through activities. Temporal replays workflow history on restarts; non-determinism
* breaks replay.
*/
class HelloWorkflowImpl extends HelloWorkflow with LogSupport:

// Activity stub: calls are scheduled as Temporal activities, not direct method calls.
private val activities: GreetingActivities = Workflow.newActivityStub(
classOf[GreetingActivities],
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(10)).build()
)

override def sayHello(name: String): String = activities.composeGreeting("Hello", name)

/**
* Activity implementation. Activities are the side-effectful units — they do real work and can be
* retried independently.
*/
class GreetingActivitiesImpl extends GreetingActivities with LogSupport:
override def composeGreeting(greeting: String, name: String): String =
logger.info(s"Composing greeting for ${name}")
s"${greeting}, ${name}!"
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package wvlet.uni.temporal.example

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import io.temporal.common.converter.{
DefaultDataConverter,
JacksonJsonPayloadConverter,
PayloadConverter
}

/**
* Custom DataConverter that registers Jackson's Scala module so Scala 3 case classes can be
* serialized/deserialized by Temporal.
*
* Without this, Jackson cannot construct Scala case class instances because they lack a no-arg
* constructor. The Scala module teaches Jackson about Scala product types, Option, collections,
* etc.
*/
object ScalaDataConverter:
private val objectMapper: ObjectMapper =
val mapper = ObjectMapper()
mapper.registerModule(DefaultScalaModule)
mapper

val converter: DefaultDataConverter = DefaultDataConverter
.newDefaultInstance()
.withPayloadConverterOverrides(JacksonJsonPayloadConverter(objectMapper))
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package wvlet.uni.temporal.example

import io.temporal.client.{WorkflowClient, WorkflowClientOptions, WorkflowOptions}
import io.temporal.serviceclient.WorkflowServiceStubs
import io.temporal.worker.WorkerFactory
import wvlet.uni.log.LogSupport

import java.util.UUID

/** Shared task-queue name used by all workers and clients in this example. */
object TemporalExample:
val TaskQueue = "uni-example-queue"

/**
* Starts a Temporal worker connected to a locally-running Temporal dev server (`temporal server
* start-dev`).
*
* This is an entry point for manual exploration — run the Temporal dev server first, then run this
* app alongside `TemporalClientApp` to see workflows execute end-to-end.
*
* {{{
* # Terminal 1
* temporal server start-dev
* # Terminal 2
* ./sbt "temporalExample/run" # starts worker
* }}}
*/
object TemporalWorkerApp extends App with LogSupport:

logger.info("Connecting to local Temporal dev server...")

val service = WorkflowServiceStubs.newLocalServiceStubs()
val clientOptions = WorkflowClientOptions
.newBuilder()
.setDataConverter(ScalaDataConverter.converter)
.build()

val client = WorkflowClient.newInstance(service, clientOptions)
val factory = WorkerFactory.newInstance(client)

val worker = factory.newWorker(TemporalExample.TaskQueue)

// Register workflow and activity implementations
worker.registerWorkflowImplementationTypes(classOf[HelloWorkflowImpl])
worker.registerWorkflowImplementationTypes(classOf[DataPipelineWorkflowImpl])
worker.registerActivitiesImplementations(GreetingActivitiesImpl())
worker.registerActivitiesImplementations(DataActivitiesImpl())

factory.start()
logger.info(s"Worker started on task queue '${TemporalExample.TaskQueue}'. Ctrl+C to stop.")

/**
* Submits example workflows to the running worker.
*
* Run after `TemporalWorkerApp` is running.
*/
object TemporalClientApp extends App with LogSupport:

val service = WorkflowServiceStubs.newLocalServiceStubs()
val clientOptions = WorkflowClientOptions
.newBuilder()
.setDataConverter(ScalaDataConverter.converter)
.build()

val client = WorkflowClient.newInstance(service, clientOptions)

// --- Hello workflow ---
val helloStub = client.newWorkflowStub(
classOf[HelloWorkflow],
WorkflowOptions
.newBuilder()
.setTaskQueue(TemporalExample.TaskQueue)
.setWorkflowId(s"hello-${UUID.randomUUID()}")
.build()
)

val greeting = helloStub.sayHello("Temporal")
logger.info(s"HelloWorkflow result: ${greeting}")

// --- Data pipeline workflow ---
val pipelineStub = client.newWorkflowStub(
classOf[DataPipelineWorkflow],
WorkflowOptions
.newBuilder()
.setTaskQueue(TemporalExample.TaskQueue)
.setWorkflowId(s"pipeline-${UUID.randomUUID()}")
.build()
)

val result = pipelineStub.process(PipelineInput("sensor-42", "raw,data,stream"))
logger.info(
s"DataPipeline result: processed=${result.processedData}, records=${result.recordCount}"
)

service.shutdownNow()

end TemporalClientApp
Loading
Loading