-
Notifications
You must be signed in to change notification settings - Fork 0
feature: Add Temporal workflow example for Scala 3 #463
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
xerial
wants to merge
3
commits into
main
Choose a base branch
from
feature/temporal-example
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| # Temporal Example Project for Scala 3 | ||
|
|
||
| ## Objective | ||
|
|
||
| Create a small Temporal example project inside `uni` to evaluate the Temporal workflow engine's usability, SDK ergonomics, and fit for durable workflow orchestration in the Scala 3 ecosystem. | ||
|
|
||
| ## Background | ||
|
|
||
| Temporal is a workflow orchestration platform built for durability. Workflows survive crashes; activities are retried automatically. This exploration aims to understand: | ||
|
|
||
| 1. How Temporal's Java SDK feels in Scala 3 | ||
| 2. How to write workflows and activities idiomatically | ||
| 3. How retries, error handling, and observability work | ||
| 4. Whether Temporal complements the lightweight `uni-task` approach | ||
|
|
||
| ## Module: `uni-temporal-example` | ||
|
|
||
| A JVM-only example module (not published) that demonstrates: | ||
|
|
||
| ### Workflows | ||
|
|
||
| 1. **HelloWorkflow** — minimal "hello world" to understand the basics | ||
| 2. **DataPipelineWorkflow** — multi-step pipeline: fetch → transform → store | ||
|
|
||
| ### Activities | ||
|
|
||
| - `GreetingActivities` — simple greeting activity | ||
| - `DataActivities` — fetch, transform, and store with simulated failures for retry demo | ||
|
|
||
| ### Key Concepts Covered | ||
|
|
||
| - Workflow interface + implementation separation (required by Temporal) | ||
| - Activity interface + implementation | ||
| - `ActivityOptions` with retry policy | ||
| - Test server (`TestWorkflowEnvironment`) for unit tests without a real server | ||
| - Workflow signals and queries (stretch goal) | ||
|
|
||
| ## Implementation Plan | ||
|
|
||
| 1. Add `uni-temporal-example` module to `build.sbt` | ||
| 2. Add Temporal SDK dependencies: | ||
| - `io.temporal:temporal-sdk` | ||
| - `io.temporal:temporal-testing` (test scope) | ||
| 3. Implement `HelloWorkflow` with `GreetingActivities` | ||
| 4. Implement `DataPipelineWorkflow` with `DataActivities` | ||
| 5. Write `UniTest`-based tests using `TestWorkflowEnvironment` | ||
| 6. Add a `TemporalWorkerApp` entry point for running against a real local server | ||
|
|
||
| ## Findings (to be filled in) | ||
|
|
||
| - SDK ergonomics: | ||
| - Test experience: | ||
| - Comparison to uni-task: | ||
| - Recommendation: |
34 changes: 34 additions & 0 deletions
34
uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/DataPipeline.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| package wvlet.uni.temporal.example | ||
|
|
||
| import io.temporal.activity.{ActivityInterface, ActivityMethod} | ||
| import io.temporal.workflow.{WorkflowInterface, WorkflowMethod} | ||
|
|
||
| /** Input/output data types for the pipeline. */ | ||
| case class PipelineInput(sourceId: String, rawData: String) | ||
| case class PipelineResult(sourceId: String, processedData: String, recordCount: Int) | ||
|
|
||
| /** | ||
| * A multi-step data pipeline workflow. | ||
| * | ||
| * Demonstrates sequential activity chaining. Each step is independently retried on failure, and | ||
| * the workflow survives worker restarts — Temporal replays history to restore state. | ||
| */ | ||
| @WorkflowInterface | ||
| trait DataPipelineWorkflow: | ||
| @WorkflowMethod | ||
| def process(input: PipelineInput): PipelineResult | ||
|
|
||
| /** Activities for the data pipeline. Each method is a separately-retried unit of work. */ | ||
| @ActivityInterface | ||
| trait DataActivities: | ||
| /** Fetch raw data from a source. Simulates transient failures to show retry behaviour. */ | ||
| @ActivityMethod | ||
| def fetchData(sourceId: String): String | ||
|
|
||
| /** Transform/clean the raw data. */ | ||
| @ActivityMethod | ||
| def transformData(rawData: String): String | ||
|
|
||
| /** Persist the result and return a record count. */ | ||
| @ActivityMethod | ||
| def storeData(sourceId: String, processedData: String): Int |
69 changes: 69 additions & 0 deletions
69
uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/DataPipelineImpl.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,69 @@ | ||
| package wvlet.uni.temporal.example | ||
|
|
||
| import io.temporal.activity.{Activity, ActivityOptions} | ||
| import io.temporal.common.RetryOptions | ||
| import io.temporal.workflow.Workflow | ||
| import wvlet.uni.log.LogSupport | ||
|
|
||
| import java.time.Duration | ||
|
|
||
| /** | ||
| * DataPipeline workflow implementation. | ||
| * | ||
| * Chaining activities sequentially: fetchData → transformData → storeData. Each activity is | ||
| * scheduled with explicit retry options demonstrating Temporal's built-in retry mechanism. | ||
| */ | ||
| class DataPipelineWorkflowImpl extends DataPipelineWorkflow with LogSupport: | ||
|
|
||
| private val activities: DataActivities = Workflow.newActivityStub( | ||
| classOf[DataActivities], | ||
| ActivityOptions | ||
| .newBuilder() | ||
| // Each activity call must complete within 30s | ||
| .setStartToCloseTimeout(Duration.ofSeconds(30)) | ||
| .setRetryOptions( | ||
| RetryOptions | ||
| .newBuilder() | ||
| // Retry up to 3 times with 1-second initial interval | ||
| .setMaximumAttempts(3) | ||
| .setInitialInterval(Duration.ofSeconds(1)) | ||
| .setBackoffCoefficient(2.0) | ||
| .build() | ||
| ) | ||
| .build() | ||
| ) | ||
|
|
||
| override def process(input: PipelineInput): PipelineResult = | ||
| // Step 1: fetch | ||
| val rawData = activities.fetchData(input.sourceId) | ||
| // Step 2: transform | ||
| val processedData = activities.transformData(rawData) | ||
| // Step 3: store | ||
| val recordCount = activities.storeData(input.sourceId, processedData) | ||
| PipelineResult(input.sourceId, processedData, recordCount) | ||
|
|
||
| /** | ||
| * DataActivities implementation with a simulated transient failure on the first fetch attempt. | ||
| * | ||
| * Uses Temporal's ActivityExecutionContext to get the server-tracked attempt number, which is the | ||
| * recommended approach — it works correctly across different worker instances and survives | ||
| * restarts. | ||
| */ | ||
| class DataActivitiesImpl extends DataActivities with LogSupport: | ||
|
|
||
| override def fetchData(sourceId: String): String = | ||
| val attempt = Activity.getExecutionContext().getInfo().getAttempt() | ||
| logger.info(s"fetchData attempt ${attempt} for source '${sourceId}'") | ||
| if attempt == 1 then | ||
| // Simulate a transient failure on the first attempt | ||
| throw RuntimeException(s"Transient network error fetching '${sourceId}' (attempt 1)") | ||
| s"raw-data-for-${sourceId}" | ||
|
|
||
| override def transformData(rawData: String): String = | ||
| logger.info(s"Transforming: ${rawData}") | ||
| rawData.toUpperCase.replace("-", "_") | ||
|
|
||
| override def storeData(sourceId: String, processedData: String): Int = | ||
| logger.info(s"Storing ${processedData} for source '${sourceId}'") | ||
| // Simulate storing: return a fake record count | ||
| processedData.length | ||
21 changes: 21 additions & 0 deletions
21
uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/HelloWorkflow.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| package wvlet.uni.temporal.example | ||
|
|
||
| import io.temporal.activity.{ActivityInterface, ActivityMethod} | ||
| import io.temporal.workflow.{WorkflowInterface, WorkflowMethod} | ||
|
|
||
| /** | ||
| * The simplest possible Temporal workflow: greet a user. | ||
| * | ||
| * Temporal requires workflow and activity interfaces to be annotated with @WorkflowInterface / | ||
| * @ActivityInterface. | ||
| * Implementations are registered separately with the worker. | ||
| */ | ||
| @WorkflowInterface | ||
| trait HelloWorkflow: | ||
| @WorkflowMethod | ||
| def sayHello(name: String): String | ||
|
|
||
| @ActivityInterface | ||
| trait GreetingActivities: | ||
| @ActivityMethod | ||
| def composeGreeting(greeting: String, name: String): String |
33 changes: 33 additions & 0 deletions
33
uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/HelloWorkflowImpl.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| package wvlet.uni.temporal.example | ||
|
|
||
| import io.temporal.activity.ActivityOptions | ||
| import io.temporal.workflow.Workflow | ||
| import wvlet.uni.log.LogSupport | ||
|
|
||
| import java.time.Duration | ||
|
|
||
| /** | ||
| * Workflow implementation. | ||
| * | ||
| * Workflow code must be deterministic — no I/O, random numbers, or wall-clock time. All side | ||
| * effects go through activities. Temporal replays workflow history on restarts; non-determinism | ||
| * breaks replay. | ||
| */ | ||
| class HelloWorkflowImpl extends HelloWorkflow with LogSupport: | ||
|
|
||
| // Activity stub: calls are scheduled as Temporal activities, not direct method calls. | ||
| private val activities: GreetingActivities = Workflow.newActivityStub( | ||
| classOf[GreetingActivities], | ||
| ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(10)).build() | ||
| ) | ||
|
|
||
| override def sayHello(name: String): String = activities.composeGreeting("Hello", name) | ||
|
|
||
| /** | ||
| * Activity implementation. Activities are the side-effectful units — they do real work and can be | ||
| * retried independently. | ||
| */ | ||
| class GreetingActivitiesImpl extends GreetingActivities with LogSupport: | ||
| override def composeGreeting(greeting: String, name: String): String = | ||
| logger.info(s"Composing greeting for ${name}") | ||
| s"${greeting}, ${name}!" |
27 changes: 27 additions & 0 deletions
27
uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/ScalaDataConverter.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| package wvlet.uni.temporal.example | ||
|
|
||
| import com.fasterxml.jackson.databind.ObjectMapper | ||
| import com.fasterxml.jackson.module.scala.DefaultScalaModule | ||
| import io.temporal.common.converter.{ | ||
| DefaultDataConverter, | ||
| JacksonJsonPayloadConverter, | ||
| PayloadConverter | ||
| } | ||
|
|
||
| /** | ||
| * Custom DataConverter that registers Jackson's Scala module so Scala 3 case classes can be | ||
| * serialized/deserialized by Temporal. | ||
| * | ||
| * Without this, Jackson cannot construct Scala case class instances because they lack a no-arg | ||
| * constructor. The Scala module teaches Jackson about Scala product types, Option, collections, | ||
| * etc. | ||
| */ | ||
| object ScalaDataConverter: | ||
| private val objectMapper: ObjectMapper = | ||
| val mapper = ObjectMapper() | ||
| mapper.registerModule(DefaultScalaModule) | ||
| mapper | ||
|
|
||
| val converter: DefaultDataConverter = DefaultDataConverter | ||
| .newDefaultInstance() | ||
| .withPayloadConverterOverrides(JacksonJsonPayloadConverter(objectMapper)) |
97 changes: 97 additions & 0 deletions
97
uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/TemporalWorkerApp.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,97 @@ | ||
| package wvlet.uni.temporal.example | ||
|
|
||
| import io.temporal.client.{WorkflowClient, WorkflowClientOptions, WorkflowOptions} | ||
| import io.temporal.serviceclient.WorkflowServiceStubs | ||
| import io.temporal.worker.WorkerFactory | ||
| import wvlet.uni.log.LogSupport | ||
|
|
||
| import java.util.UUID | ||
|
|
||
| /** Shared task-queue name used by all workers and clients in this example. */ | ||
| object TemporalExample: | ||
| val TaskQueue = "uni-example-queue" | ||
|
|
||
| /** | ||
| * Starts a Temporal worker connected to a locally-running Temporal dev server (`temporal server | ||
| * start-dev`). | ||
| * | ||
| * This is an entry point for manual exploration — run the Temporal dev server first, then run this | ||
| * app alongside `TemporalClientApp` to see workflows execute end-to-end. | ||
| * | ||
| * {{{ | ||
| * # Terminal 1 | ||
| * temporal server start-dev | ||
| * # Terminal 2 | ||
| * ./sbt "temporalExample/run" # starts worker | ||
| * }}} | ||
| */ | ||
| object TemporalWorkerApp extends App with LogSupport: | ||
|
|
||
| logger.info("Connecting to local Temporal dev server...") | ||
|
|
||
| val service = WorkflowServiceStubs.newLocalServiceStubs() | ||
| val clientOptions = WorkflowClientOptions | ||
| .newBuilder() | ||
| .setDataConverter(ScalaDataConverter.converter) | ||
| .build() | ||
|
|
||
| val client = WorkflowClient.newInstance(service, clientOptions) | ||
| val factory = WorkerFactory.newInstance(client) | ||
|
|
||
| val worker = factory.newWorker(TemporalExample.TaskQueue) | ||
|
|
||
| // Register workflow and activity implementations | ||
| worker.registerWorkflowImplementationTypes(classOf[HelloWorkflowImpl]) | ||
| worker.registerWorkflowImplementationTypes(classOf[DataPipelineWorkflowImpl]) | ||
| worker.registerActivitiesImplementations(GreetingActivitiesImpl()) | ||
| worker.registerActivitiesImplementations(DataActivitiesImpl()) | ||
|
|
||
| factory.start() | ||
| logger.info(s"Worker started on task queue '${TemporalExample.TaskQueue}'. Ctrl+C to stop.") | ||
|
|
||
| /** | ||
| * Submits example workflows to the running worker. | ||
| * | ||
| * Run after `TemporalWorkerApp` is running. | ||
| */ | ||
| object TemporalClientApp extends App with LogSupport: | ||
|
|
||
| val service = WorkflowServiceStubs.newLocalServiceStubs() | ||
| val clientOptions = WorkflowClientOptions | ||
| .newBuilder() | ||
| .setDataConverter(ScalaDataConverter.converter) | ||
| .build() | ||
|
|
||
| val client = WorkflowClient.newInstance(service, clientOptions) | ||
|
|
||
| // --- Hello workflow --- | ||
| val helloStub = client.newWorkflowStub( | ||
| classOf[HelloWorkflow], | ||
| WorkflowOptions | ||
| .newBuilder() | ||
| .setTaskQueue(TemporalExample.TaskQueue) | ||
| .setWorkflowId(s"hello-${UUID.randomUUID()}") | ||
| .build() | ||
| ) | ||
|
|
||
| val greeting = helloStub.sayHello("Temporal") | ||
| logger.info(s"HelloWorkflow result: ${greeting}") | ||
|
|
||
| // --- Data pipeline workflow --- | ||
| val pipelineStub = client.newWorkflowStub( | ||
| classOf[DataPipelineWorkflow], | ||
| WorkflowOptions | ||
| .newBuilder() | ||
| .setTaskQueue(TemporalExample.TaskQueue) | ||
| .setWorkflowId(s"pipeline-${UUID.randomUUID()}") | ||
| .build() | ||
| ) | ||
|
|
||
| val result = pipelineStub.process(PipelineInput("sensor-42", "raw,data,stream")) | ||
| logger.info( | ||
| s"DataPipeline result: processed=${result.processedData}, records=${result.recordCount}" | ||
| ) | ||
|
|
||
| service.shutdownNow() | ||
|
|
||
| end TemporalClientApp |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of using a manual
AtomicIntegerto track attempts, it is recommended to use Temporal'sActivityExecutionContext. This is more robust as it works correctly across different worker instances and provides the actual attempt number tracked by the Temporal server, which is useful for implementing custom retry logic or logging.