1313// --------------------------------------------------------------------------
1414import { NodeSDK } from "@opentelemetry/sdk-node" ;
1515import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http" ;
16- import { SimpleSpanProcessor } from "@opentelemetry/sdk-trace-base" ;
17- import { resourceFromAttributes } from "@opentelemetry/resources" ;
16+ import { Resource } from "@opentelemetry/resources" ;
1817import { ATTR_SERVICE_NAME } from "@opentelemetry/semantic-conventions" ;
1918
2019// Load environment variables from .env file
@@ -25,15 +24,13 @@ dotenv.config({ path: path.join(__dirname, "..", ".env") });
2524// Read the OTLP endpoint from the environment (defaults to Jaeger's OTLP HTTP port)
2625const otlpEndpoint = process . env . OTEL_EXPORTER_OTLP_ENDPOINT || "http://localhost:4318" ;
2726
28- const traceExporter = new OTLPTraceExporter ( {
29- url : `${ otlpEndpoint } /v1/traces` ,
30- } ) ;
31-
3227const sdk = new NodeSDK ( {
33- resource : resourceFromAttributes ( {
34- [ ATTR_SERVICE_NAME ] : "durabletask-js-tracing-example" ,
28+ resource : new Resource ( {
29+ [ ATTR_SERVICE_NAME ] : "durabletask-js-tracing-sample" ,
30+ } ) ,
31+ traceExporter : new OTLPTraceExporter ( {
32+ url : `${ otlpEndpoint } /v1/traces` ,
3533 } ) ,
36- spanProcessors : [ new SimpleSpanProcessor ( traceExporter ) ] ,
3734} ) ;
3835
3936sdk . start ( ) ;
@@ -53,13 +50,9 @@ import { TOrchestrator } from "@microsoft/durabletask-js/dist/types/orchestrator
5350import { whenAll } from "@microsoft/durabletask-js/dist/task" ;
5451
5552// --------------------------------------------------------------------------
56- // 3. Application code
53+ // 3. Application code — FanOutFanIn pattern (matches Java tracing sample)
5754// --------------------------------------------------------------------------
5855( async ( ) => {
59- // Use ConsoleLogger so structured log events (with event IDs and categories) are
60- // printed to the console by default, similar to .NET's default ILogger<T> output.
61- // For production, consider using createAzureLogger() which integrates with @azure/logger
62- // and respects the AZURE_LOG_LEVEL environment variable.
6356 const sdkLogger = new ConsoleLogger ( ) ;
6457
6558 // --- Configuration ---
@@ -80,96 +73,44 @@ import { whenAll } from "@microsoft/durabletask-js/dist/task";
8073
8174 // --- Activity definitions ---
8275
83- /** Simulates fetching data from an external service. */
84- const fetchData = async ( _ctx : ActivityContext , source : string ) : Promise < string > => {
85- console . log ( ` [fetchData] Fetching data from "${ source } "...` ) ;
86- // Simulate network latency
87- await new Promise ( ( r ) => setTimeout ( r , 300 + Math . random ( ) * 200 ) ) ;
88- return `data-from-${ source } ` ;
89- } ;
90-
91- /** Simulates transforming a piece of data. */
92- const transformData = async ( _ctx : ActivityContext , input : string ) : Promise < string > => {
93- console . log ( ` [transformData] Transforming "${ input } "...` ) ;
94- await new Promise ( ( r ) => setTimeout ( r , 200 ) ) ;
95- return `transformed(${ input } )` ;
76+ /** Simulates getting weather for a city. */
77+ const getWeather = async ( _ctx : ActivityContext , city : string ) : Promise < string > => {
78+ console . log ( ` [GetWeather] Getting weather for: ${ city } ` ) ;
79+ await new Promise ( ( r ) => setTimeout ( r , 20 ) ) ;
80+ return `${ city } =72F` ;
9681 } ;
9782
98- /** Simulates persisting results to a database. */
99- const saveResults = async ( _ctx : ActivityContext , results : string [ ] ) : Promise < number > => {
100- console . log ( ` [saveResults] Saving ${ results . length } results...` ) ;
101- await new Promise ( ( r ) => setTimeout ( r , 150 ) ) ;
102- return results . length ;
83+ /** Aggregates weather results into a summary. */
84+ const createSummary = async ( _ctx : ActivityContext , input : string ) : Promise < string > => {
85+ console . log ( ` [CreateSummary] Creating summary for: ${ input } ` ) ;
86+ return `Weather Report: ${ input } ` ;
10387 } ;
10488
105- // --- Orchestrator: data pipeline (chaining + fan-out/fan-in) ---
89+ // --- Orchestrator: Fan-Out/Fan-In with timer ---
10690
10791 /**
108- * Demonstrates a realistic data-processing pipeline that produces a rich
109- * distributed trace:
92+ * Demonstrates a Fan-Out/Fan-In pattern that produces a rich trace:
11093 *
111- * 1. Fan-out – fetch data from multiple sources in parallel.
112- * 2. Fan-out – transform each result in parallel.
113- * 3. Chain – save all transformed results.
114- *
115- * The resulting trace will show:
116- * create_orchestration → orchestration (server)
117- * ├─ activity:fetchData (×N, parallel)
118- * ├─ activity:transformData (×N, parallel)
119- * └─ activity:saveResults
94+ * create_orchestration:FanOutFanIn (Producer)
95+ * └─ orchestration:FanOutFanIn (Server)
96+ * ├─ orchestration:FanOutFanIn:timer (Internal, ~1s)
97+ * ├─ activity:getWeather (Client+Server, ×5 parallel)
98+ * └─ activity:createSummary (Client+Server)
12099 */
121- const dataPipelineOrchestrator : TOrchestrator = async function * (
122- ctx : OrchestrationContext ,
123- ) : any {
124- const sources : string [ ] = yield ctx . callActivity ( getDataSources ) ;
125-
126- // Step 1 – fan-out: fetch from all sources in parallel
127- const fetchTasks = [ ] ;
128- for ( const source of sources ) {
129- fetchTasks . push ( ctx . callActivity ( fetchData , source ) ) ;
130- }
131- const rawData : string [ ] = yield whenAll ( fetchTasks ) ;
132-
133- // Step 2 – fan-out: transform all fetched data in parallel
134- const transformTasks = [ ] ;
135- for ( const raw of rawData ) {
136- transformTasks . push ( ctx . callActivity ( transformData , raw ) ) ;
137- }
138- const transformed : string [ ] = yield whenAll ( transformTasks ) ;
139-
140- // Step 3 – chain: save all results
141- const savedCount : number = yield ctx . callActivity ( saveResults , transformed ) ;
100+ const fanOutFanIn : TOrchestrator = async function * ( ctx : OrchestrationContext ) : any {
101+ // Timer: wait 1 second (demonstrates timer span with creation-to-fired duration)
102+ const timerDelay = new Date ( ctx . currentUtcDateTime . getTime ( ) + 1000 ) ;
103+ yield ctx . createTimer ( timerDelay ) ;
142104
143- return {
144- sourcesProcessed : sources . length ,
145- resultsSaved : savedCount ,
146- data : transformed ,
147- } ;
148- } ;
149-
150- /** Returns the list of data sources to process. */
151- const getDataSources = async ( _ctx : ActivityContext ) : Promise < string [ ] > => {
152- return [ "users-api" , "orders-api" , "inventory-api" , "analytics-api" ] ;
153- } ;
105+ // Fan-out: schedule parallel weather lookups
106+ const cities = [ "Seattle" , "Tokyo" , "London" , "Paris" , "Sydney" ] ;
107+ const tasks = cities . map ( ( city ) => ctx . callActivity ( getWeather , city ) ) ;
108+ const results : string [ ] = yield whenAll ( tasks ) ;
154109
155- // --- Orchestrator: simple sequence (for a cleaner trace comparison) ---
156-
157- const sequenceOrchestrator : TOrchestrator = async function * (
158- ctx : OrchestrationContext ,
159- ) : any {
160- const cities = [ "Tokyo" , "Seattle" , "London" ] ;
161- const greetings : string [ ] = [ ] ;
162- for ( const city of cities ) {
163- const greeting : string = yield ctx . callActivity ( greetCity , city ) ;
164- greetings . push ( greeting ) ;
165- }
166- return greetings ;
167- } ;
110+ // Aggregate results
111+ const summary : string = yield ctx . callActivity ( createSummary , results . join ( ", " ) ) ;
168112
169- const greetCity = async ( _ctx : ActivityContext , city : string ) : Promise < string > => {
170- console . log ( ` [greetCity] Greeting ${ city } ` ) ;
171- await new Promise ( ( r ) => setTimeout ( r , 100 ) ) ;
172- return `Hello, ${ city } !` ;
113+ return summary ;
173114 } ;
174115
175116 // --- Create client & worker ---
@@ -193,52 +134,37 @@ import { whenAll } from "@microsoft/durabletask-js/dist/task";
193134
194135 client = clientBuilder . build ( ) ;
195136 worker = workerBuilder
196- . addOrchestrator ( dataPipelineOrchestrator )
197- . addOrchestrator ( sequenceOrchestrator )
198- . addActivity ( fetchData )
199- . addActivity ( transformData )
200- . addActivity ( saveResults )
201- . addActivity ( getDataSources )
202- . addActivity ( greetCity )
137+ . addOrchestrator ( fanOutFanIn )
138+ . addActivity ( getWeather )
139+ . addActivity ( createSummary )
203140 . build ( ) ;
204141
205142 // --- Start worker ---
206143 console . log ( "Starting worker..." ) ;
207144 await worker . start ( ) ;
208- // Allow the worker time to establish the gRPC stream with the scheduler.
209- // worker.start() returns before the connection is fully established.
210145 await new Promise ( ( r ) => setTimeout ( r , 2000 ) ) ;
211- console . log ( "Worker started." ) ;
212-
213- // --- Run orchestrations ---
146+ console . log ( "Worker started with OpenTelemetry tracing." ) ;
214147
215- // 1) Sequence orchestration
216- console . log ( "\n=== Sequence Orchestration ===" ) ;
217- const seqId = await client . scheduleNewOrchestration ( sequenceOrchestrator ) ;
218- console . log ( `Scheduled: ${ seqId } ` ) ;
219- const seqState = await client . waitForOrchestrationCompletion ( seqId , true , 60 ) ;
220- console . log ( `Completed – result: ${ seqState ?. serializedOutput } ` ) ;
148+ // --- Run orchestration ---
149+ console . log ( "\nScheduling FanOutFanIn orchestration..." ) ;
150+ const instanceId = await client . scheduleNewOrchestration ( fanOutFanIn ) ;
151+ console . log ( `Started orchestration: ${ instanceId } ` ) ;
221152
222- // 2) Data pipeline orchestration (fan-out/fan-in)
223- console . log ( "\n=== Data Pipeline Orchestration ===" ) ;
224- const pipelineId = await client . scheduleNewOrchestration ( dataPipelineOrchestrator ) ;
225- console . log ( `Scheduled: ${ pipelineId } ` ) ;
226- const pipelineState = await client . waitForOrchestrationCompletion ( pipelineId , true , 60 ) ;
227- console . log ( `Completed – result: ${ pipelineState ?. serializedOutput } ` ) ;
153+ console . log ( "Waiting for completion..." ) ;
154+ const state = await client . waitForOrchestrationCompletion ( instanceId , true , 60 ) ;
155+ console . log ( `Status: ${ state ?. runtimeStatus } ` ) ;
156+ console . log ( `Result: ${ state ?. serializedOutput } ` ) ;
228157
229- console . log ( "\n=== All orchestrations completed! ===" ) ;
230- console . log (
231- `Open Jaeger UI at http://localhost:16686 and search for service "durabletask-js-tracing-example" to view traces.` ,
232- ) ;
158+ console . log ( "\nView traces in Jaeger UI: http://localhost:16686" ) ;
159+ console . log ( ' Search for service: durabletask-js-tracing-sample' ) ;
160+ console . log ( "View orchestration in DTS Dashboard: http://localhost:8082" ) ;
233161 } catch ( error ) {
234162 console . error ( "Error:" , error ) ;
235163 process . exit ( 1 ) ;
236164 } finally {
237165 console . log ( "\nShutting down..." ) ;
238166 if ( worker ) await worker . stop ( ) ;
239167 if ( client ) await client . stop ( ) ;
240-
241- // Flush remaining spans before exit
242168 await sdk . shutdown ( ) ;
243169 console . log ( "Done." ) ;
244170 process . exit ( 0 ) ;
0 commit comments