Skip to content

Commit 997c0a3

Browse files
committed
fix: handle conditional struct args in v2 launcher
Signed-off-by: wannabeaquant <[email protected]>
1 parent 474273f commit 997c0a3

File tree

2 files changed

+185
-6
lines changed

2 files changed

+185
-6
lines changed

backend/src/v2/component/launcher_v2.go

Lines changed: 141 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -827,20 +827,155 @@ func compileCmdAndArgs(executorInput *pipelinespec.ExecutorInput, cmd string, ar
827827
executorInputJSONString := string(executorInputJSON)
828828

829829
compiledCmd := strings.ReplaceAll(cmd, executorInputJSONKey, executorInputJSONString)
830-
compiledArgs := make([]string, 0, len(args))
831830
for placeholder, replacement := range placeholders {
832-
cmd = strings.ReplaceAll(cmd, placeholder, replacement)
831+
compiledCmd = strings.ReplaceAll(compiledCmd, placeholder, replacement)
833832
}
833+
834+
compiledArgs := make([]string, 0, len(args))
835+
providedInputs := getProvidedInputs(executorInput)
834836
for _, arg := range args {
835-
compiledArgTemplate := strings.ReplaceAll(arg, executorInputJSONKey, executorInputJSONString)
836-
for placeholder, replacement := range placeholders {
837-
compiledArgTemplate = strings.ReplaceAll(compiledArgTemplate, placeholder, replacement)
837+
expandedArgs, err := resolveStructPlaceholders(arg, providedInputs)
838+
if err != nil {
839+
return "", nil, err
840+
}
841+
for _, expanded := range expandedArgs {
842+
compiledArgTemplate := strings.ReplaceAll(expanded, executorInputJSONKey, executorInputJSONString)
843+
for placeholder, replacement := range placeholders {
844+
compiledArgTemplate = strings.ReplaceAll(compiledArgTemplate, placeholder, replacement)
845+
}
846+
compiledArgs = append(compiledArgs, compiledArgTemplate)
838847
}
839-
compiledArgs = append(compiledArgs, compiledArgTemplate)
840848
}
841849
return compiledCmd, compiledArgs, nil
842850
}
843851

852+
// getProvidedInputs returns the input names that have values supplied (parameters or artifacts).
853+
func getProvidedInputs(executorInput *pipelinespec.ExecutorInput) map[string]struct{} {
854+
provided := make(map[string]struct{})
855+
for name, v := range executorInput.GetInputs().GetParameterValues() {
856+
if v != nil {
857+
provided[name] = struct{}{}
858+
}
859+
}
860+
for name, artifactList := range executorInput.GetInputs().GetArtifacts() {
861+
if artifactList != nil && len(artifactList.Artifacts) > 0 {
862+
provided[name] = struct{}{}
863+
}
864+
}
865+
return provided
866+
}
867+
868+
// resolveStructPlaceholders expands IfPresent/Concat placeholder strings into concrete args.
869+
// If no struct placeholder is detected, the original arg is returned.
870+
func resolveStructPlaceholders(arg string, providedInputs map[string]struct{}) ([]string, error) {
871+
if strings.HasPrefix(arg, `{"Concat": `) || strings.HasPrefix(arg, `{"IfPresent": `) {
872+
var obj interface{}
873+
if err := json.Unmarshal([]byte(arg), &obj); err != nil {
874+
return nil, fmt.Errorf("failed to unmarshal struct placeholder %q: %w", arg, err)
875+
}
876+
resolved, err := recursivelyResolveStruct(obj, providedInputs)
877+
if err != nil {
878+
return nil, err
879+
}
880+
switch v := resolved.(type) {
881+
case nil:
882+
return []string{}, nil
883+
case string:
884+
return []string{v}, nil
885+
case []string:
886+
return v, nil
887+
default:
888+
return nil, fmt.Errorf("unexpected resolved struct placeholder type %T for %q", v, arg)
889+
}
890+
}
891+
892+
return []string{arg}, nil
893+
}
894+
895+
// recursivelyResolveStruct handles nested IfPresent/Concat structures.
896+
func recursivelyResolveStruct(obj interface{}, providedInputs map[string]struct{}) (interface{}, error) {
897+
switch typed := obj.(type) {
898+
case string:
899+
return typed, nil
900+
case []interface{}:
901+
var parts []string
902+
for _, item := range typed {
903+
resolved, err := recursivelyResolveStruct(item, providedInputs)
904+
if err != nil {
905+
return nil, err
906+
}
907+
switch v := resolved.(type) {
908+
case nil:
909+
continue
910+
case string:
911+
parts = append(parts, v)
912+
case []string:
913+
parts = append(parts, v...)
914+
default:
915+
return nil, fmt.Errorf("unexpected list item type %T in struct placeholder", v)
916+
}
917+
}
918+
return parts, nil
919+
case map[string]interface{}:
920+
if len(typed) != 1 {
921+
return nil, fmt.Errorf("invalid struct placeholder: %v", typed)
922+
}
923+
for key, value := range typed {
924+
switch key {
925+
case "Concat":
926+
items, ok := value.([]interface{})
927+
if !ok {
928+
return nil, fmt.Errorf("Concat value must be a list, got %T", value)
929+
}
930+
var parts []string
931+
for _, item := range items {
932+
resolved, err := recursivelyResolveStruct(item, providedInputs)
933+
if err != nil {
934+
return nil, err
935+
}
936+
switch v := resolved.(type) {
937+
case nil:
938+
continue
939+
case string:
940+
parts = append(parts, v)
941+
case []string:
942+
parts = append(parts, v...)
943+
default:
944+
return nil, fmt.Errorf("unexpected Concat item type %T", v)
945+
}
946+
}
947+
return strings.Join(parts, ""), nil
948+
case "IfPresent":
949+
inner, ok := value.(map[string]interface{})
950+
if !ok {
951+
return nil, fmt.Errorf("IfPresent value must be an object, got %T", value)
952+
}
953+
inputName, ok := inner["InputName"].(string)
954+
if !ok {
955+
return nil, fmt.Errorf("IfPresent.InputName must be a string, got %T", inner["InputName"])
956+
}
957+
_, exists := providedInputs[inputName]
958+
var branch interface{}
959+
if exists {
960+
branch = inner["Then"]
961+
} else {
962+
branch = inner["Else"]
963+
}
964+
if branch == nil {
965+
return nil, nil
966+
}
967+
return recursivelyResolveStruct(branch, providedInputs)
968+
default:
969+
return nil, fmt.Errorf("unsupported struct placeholder key %q", key)
970+
}
971+
}
972+
default:
973+
return nil, fmt.Errorf("unexpected struct placeholder type %T", typed)
974+
}
975+
976+
return nil, nil
977+
}
978+
844979
// Add executor input placeholders to provided map.
845980
func getPlaceholders(executorInput *pipelinespec.ExecutorInput) (placeholders map[string]string, err error) {
846981
defer func() {

backend/src/v2/component/launcher_v2_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,50 @@ func Test_executorInput_compileCmdAndArgs(t *testing.T) {
269269
assert.Equal(t, "9312", config["sphinx_port"])
270270
}
271271

272+
func Test_compileCmdAndArgs_structPlaceholders(t *testing.T) {
273+
executorInput := &pipelinespec.ExecutorInput{
274+
Inputs: &pipelinespec.ExecutorInput_Inputs{
275+
ParameterValues: map[string]*structpb.Value{
276+
"file": structpb.NewStringValue("/etc/hosts"),
277+
"line_number": structpb.NewBoolValue(true),
278+
"flag_value": structpb.NewStringValue("foo"),
279+
},
280+
},
281+
}
282+
283+
cmd := "cat"
284+
args := []string{
285+
"{{$.inputs.parameters['file']}}",
286+
`{"IfPresent": {"InputName": "line_number", "Then": ["-n"]}}`,
287+
`{"Concat": ["--flag=", "{{$.inputs.parameters['flag_value']}}"]}`,
288+
}
289+
290+
compiledCmd, compiledArgs, err := compileCmdAndArgs(executorInput, cmd, args)
291+
assert.NoError(t, err)
292+
assert.Equal(t, "cat", compiledCmd)
293+
assert.Equal(t, []string{"/etc/hosts", "-n", "--flag=foo"}, compiledArgs)
294+
}
295+
296+
func Test_compileCmdAndArgs_structPlaceholders_Omitted(t *testing.T) {
297+
executorInput := &pipelinespec.ExecutorInput{
298+
Inputs: &pipelinespec.ExecutorInput_Inputs{
299+
ParameterValues: map[string]*structpb.Value{
300+
"file": structpb.NewStringValue("/etc/hosts"),
301+
},
302+
},
303+
}
304+
305+
cmd := "cat"
306+
args := []string{
307+
"{{$.inputs.parameters['file']}}",
308+
`{"IfPresent": {"InputName": "line_number", "Then": ["-n"]}}`,
309+
}
310+
311+
_, compiledArgs, err := compileCmdAndArgs(executorInput, cmd, args)
312+
assert.NoError(t, err)
313+
assert.Equal(t, []string{"/etc/hosts"}, compiledArgs)
314+
}
315+
272316
func Test_get_log_Writer(t *testing.T) {
273317
old := osCreateFunc
274318
defer func() { osCreateFunc = old }()

0 commit comments

Comments
 (0)