Skip to content

Commit cc0409f

Browse files
committed
Use SDK ActivityHandle for activity result polling
Replace hand-rolled PollActivityExecution long poll loop with the SDK's client.GetActivityHandle().Get(), which provides proper gRPC long-poll handling and retry semantics. This is analogous to how workflow update uses UpdateWorkflow() + updateHandle.Get().
1 parent 9bfd8ab commit cc0409f

1 file changed

Lines changed: 24 additions & 50 deletions

File tree

internal/temporalcli/commands.activity.go

Lines changed: 24 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package temporalcli
22

33
import (
4-
"context"
54
"encoding/json"
65
"fmt"
76
"time"
@@ -16,6 +15,7 @@ import (
1615
sdkpb "go.temporal.io/api/sdk/v1"
1716
taskqueuepb "go.temporal.io/api/taskqueue/v1"
1817
"go.temporal.io/api/workflowservice/v1"
18+
"go.temporal.io/sdk/client"
1919
"go.temporal.io/sdk/converter"
2020
"google.golang.org/protobuf/types/known/durationpb"
2121
"google.golang.org/protobuf/types/known/fieldmaskpb"
@@ -472,7 +472,7 @@ func (c *TemporalActivityExecuteCommand) run(cctx *CommandContext, args []string
472472
if err != nil {
473473
return fmt.Errorf("failed starting activity: %w", err)
474474
}
475-
return pollActivityOutcome(cctx, cl.WorkflowService(), c.Parent.Namespace, c.ActivityId, startResp.RunId)
475+
return getActivityResult(cctx, cl, c.ActivityId, startResp.RunId)
476476
}
477477

478478
func (c *TemporalActivityResultCommand) run(cctx *CommandContext, args []string) error {
@@ -482,7 +482,7 @@ func (c *TemporalActivityResultCommand) run(cctx *CommandContext, args []string)
482482
}
483483
defer cl.Close()
484484

485-
return pollActivityOutcome(cctx, cl.WorkflowService(), c.Parent.Namespace, c.ActivityId, c.RunId)
485+
return getActivityResult(cctx, cl, c.ActivityId, c.RunId)
486486
}
487487

488488
func (c *TemporalActivityDescribeCommand) run(cctx *CommandContext, args []string) error {
@@ -648,30 +648,30 @@ func (c *TemporalActivityTerminateCommand) run(cctx *CommandContext, args []stri
648648
return nil
649649
}
650650

651-
func pollActivityOutcome(cctx *CommandContext, svc workflowservice.WorkflowServiceClient, ns, activityID, runID string) error {
652-
req := &workflowservice.PollActivityExecutionRequest{
653-
Namespace: ns,
654-
ActivityId: activityID,
655-
RunId: runID,
656-
}
657-
var resp *workflowservice.PollActivityExecutionResponse
658-
for resp.GetOutcome() == nil {
659-
rpcCtx, cancel := context.WithTimeout(cctx, longPollPerRPCTimeout)
660-
var err error
661-
resp, err = svc.PollActivityExecution(rpcCtx, req)
662-
cancel()
663-
if err != nil {
664-
if cctx.Err() != nil {
665-
return cctx.Err()
666-
}
667-
return fmt.Errorf("failed polling activity result: %w", err)
668-
}
651+
func getActivityResult(cctx *CommandContext, cl client.Client, activityID, runID string) error {
652+
handle := cl.GetActivityHandle(client.GetActivityHandleOptions{
653+
ActivityID: activityID,
654+
RunID: runID,
655+
})
656+
var valuePtr interface{}
657+
if err := handle.Get(cctx, &valuePtr); err != nil {
658+
return fmt.Errorf("activity failed: %w", err)
659+
}
660+
if cctx.JSONOutput {
661+
return cctx.Printer.PrintStructured(
662+
struct {
663+
Result interface{} `json:"result"`
664+
}{Result: valuePtr},
665+
printer.StructuredOptions{})
666+
}
667+
jsonBytes, err := json.Marshal(valuePtr)
668+
if err != nil {
669+
return fmt.Errorf("failed marshaling result: %w", err)
669670
}
670-
return printActivityOutcome(cctx, resp.GetOutcome())
671+
cctx.Printer.Printlnf("Result: %s", jsonBytes)
672+
return nil
671673
}
672674

673-
const longPollPerRPCTimeout = 70 * time.Second
674-
675675
func buildStartActivityRequest(
676676
cctx *CommandContext,
677677
parent *TemporalActivityCommand,
@@ -786,29 +786,3 @@ func buildStartActivityRequest(
786786
return req, nil
787787
}
788788

789-
func printActivityOutcome(cctx *CommandContext, outcome *activitypb.ActivityExecutionOutcome) error {
790-
if cctx.JSONOutput {
791-
return cctx.Printer.PrintStructured(outcome, printer.StructuredOptions{})
792-
}
793-
switch v := outcome.GetValue().(type) {
794-
case *activitypb.ActivityExecutionOutcome_Result:
795-
for _, payload := range v.Result.Payloads {
796-
var value any
797-
if err := converter.GetDefaultDataConverter().FromPayload(payload, &value); err != nil {
798-
cctx.Printer.Printlnf("Result: <failed converting: %v>", err)
799-
} else {
800-
jsonBytes, err := json.Marshal(value)
801-
if err != nil {
802-
cctx.Printer.Printlnf("Result: <failed marshaling: %v>", err)
803-
} else {
804-
cctx.Printer.Printlnf("Result: %s", jsonBytes)
805-
}
806-
}
807-
}
808-
return nil
809-
case *activitypb.ActivityExecutionOutcome_Failure:
810-
return fmt.Errorf("activity failed: %s", v.Failure.GetMessage())
811-
default:
812-
return fmt.Errorf("unexpected activity outcome type: %T", v)
813-
}
814-
}

0 commit comments

Comments
 (0)