@@ -3,6 +3,7 @@ package cmd
33import (
44 "context"
55 "fmt"
6+ "os"
67 "strconv"
78 "strings"
89 "sync/atomic"
@@ -36,6 +37,7 @@ func initRunbookCommands() {
3637
3738 runbookGlobalCmd .AddCommand (initRunbookShowCommands ())
3839 runbookGlobalCmd .AddCommand (initRunbookRunCommands ())
40+ runbookGlobalCmd .AddCommand (initRunbookRunScriptCmd ())
3941 runbookGlobalCmd .AddCommand (initRunbookDevCommands ())
4042 runbookGlobalCmd .AddCommand (initRunbookValidateCommands ())
4143}
@@ -262,185 +264,230 @@ func initRunbookRunCommands() *cobra.Command {
262264 Long : runbookAction .Description .Long ,
263265 Short : runbookAction .Description .Short ,
264266 RunE : func (cmd * cobra.Command , args []string ) error {
265- numSteps := len (runbookAction .RawCommands )
267+ return processRunBookCommands (runbook .Name , runbookStringArguments , runbookAction , maxConcurrency , execTimeoutInSeconds )
268+ },
269+ }
270+ processRunbookVariablesOnCommand (runbookActionRunActionCommand , runbookStringArguments , runbookAction .Variables , true )
266271
267- parentCtx := clictx .Ctx
272+ runbookRunRunbookCmd .AddCommand (runbookActionRunActionCommand )
273+ }
274+ }
268275
269- ctx , cancelFunc := context .WithCancel (parentCtx )
276+ return runbookRunCommand
277+ }
270278
271- concurrentRunSemaphore := semaphore .NewWeighted (int64 (* maxConcurrency ))
272- factory := pool .NewPooledObjectFactorySimple (
273- func (ctx2 context.Context ) (interface {}, error ) {
274- return generateRunbookCmd (), nil
275- })
279+ func initRunbookRunScriptCmd () * cobra.Command {
280+ cmd := & cobra.Command {
281+ Use : "exec-script" ,
282+ Short : "Execute a YAML script file containing epcc commands" ,
283+ Args : cobra .ExactArgs (1 ),
284+ SilenceUsage : true ,
285+ }
276286
277- objectPool := pool .NewObjectPool (ctx , factory , & pool.ObjectPoolConfig {
278- MaxTotal : * maxConcurrency ,
279- MaxIdle : * maxConcurrency ,
280- })
287+ execTimeoutInSeconds := cmd .Flags ().Int64 ("execution-timeout" , 900 , "How long should the script take to execute before timing out" )
288+ maxConcurrency := cmd .Flags ().Int ("max-concurrency" , 20 , "Maximum number of commands that can run simultaneously" )
281289
282- rawCmds := runbookAction .RawCommands
283- for stepIdx := 0 ; stepIdx < len (rawCmds ); stepIdx ++ {
290+ cmd .ValidArgsFunction = func (cmd * cobra.Command , args []string , toComplete string ) ([]string , cobra.ShellCompDirective ) {
291+ return []string {"yaml" , "yml" }, cobra .ShellCompDirectiveFilterFileExt
292+ }
284293
285- origIndex := & stepIdx
286- // Create a copy of loop variables
287- stepIdx := stepIdx
288- rawCmd := rawCmds [stepIdx ]
294+ cmd .PersistentPreRunE = func (cmd * cobra.Command , args []string ) error {
295+ return RootCmd .PersistentPreRunE (RootCmd , args )
296+ }
289297
290- templateName := fmt .Sprintf ("Runbook: %s Action: %s Step: %d" , runbook .Name , runbookAction .Name , stepIdx )
291- rawCmdLines , err := runbooks .RenderTemplates (templateName , rawCmd , runbookStringArguments , runbookAction .Variables )
298+ cmd .RunE = func (cmd * cobra.Command , args []string ) error {
299+ data , err := os .ReadFile (args [0 ])
300+ if err != nil {
301+ return fmt .Errorf ("could not read file %s: %v" , args [0 ], err )
302+ }
292303
293- if err != nil {
294- cancelFunc ()
295- return err
296- }
304+ var commands []string
305+ err = yaml .Unmarshal (data , & commands )
306+ if err != nil {
307+ return fmt .Errorf ("could not parse YAML file %s: %v" , args [0 ], err )
308+ }
297309
298- joinedString := strings .Join (rawCmdLines , "\n " )
299- renderedCmd := []string {}
310+ runbookAction := & runbooks.RunbookAction {
311+ RawCommands : commands ,
312+ }
300313
301- err = yaml .Unmarshal ([]byte (joinedString ), & renderedCmd )
314+ return processRunBookCommands ("exec-script" , map [string ]* string {}, runbookAction , maxConcurrency , execTimeoutInSeconds )
315+ }
302316
303- if err == nil {
304- log .Tracef ("Line %d is a Yaml array %s, inserting into stack" , stepIdx , joinedString )
305- newCmds := make ([]string , 0 , len (rawCmds )+ len (renderedCmd )- 1 )
306- newCmds = append (newCmds , rawCmds [0 :stepIdx ]... )
307- newCmds = append (newCmds , renderedCmd ... )
308- newCmds = append (newCmds , rawCmds [stepIdx + 1 :]... )
309- rawCmds = newCmds
310- * origIndex --
311- continue
312- }
317+ return cmd
318+ }
313319
314- log .Infof ("Executing> %s" , rawCmd )
315- resultChan := make (chan * commandResult , * maxConcurrency * 2 )
316- funcs := make ([]func (), 0 , len (rawCmdLines ))
320+ func processRunBookCommands (runbookName string , runbookStringArguments map [string ]* string , runbookAction * runbooks.RunbookAction , maxConcurrency * int , execTimeoutInSeconds * int64 ) error {
321+ numSteps := len (runbookAction .RawCommands )
317322
318- for commandIdx , rawCmdLine := range rawCmdLines {
323+ parentCtx := clictx . Ctx
319324
320- commandIdx := commandIdx
321- rawCmdLine := strings .Trim (rawCmdLine , " \n " )
325+ ctx , cancelFunc := context .WithCancel (parentCtx )
322326
323- if rawCmdLine == "" {
324- // Allow blank lines
325- continue
326- }
327+ concurrentRunSemaphore := semaphore .NewWeighted (int64 (* maxConcurrency ))
328+ factory := pool .NewPooledObjectFactorySimple (
329+ func (ctx2 context.Context ) (interface {}, error ) {
330+ return generateRunbookCmd (), nil
331+ })
327332
328- if ! strings .HasPrefix (rawCmdLine , "epcc " ) {
329- // Some commands like sleep don't have prefix
330- // This hack allows them to run
331- rawCmdLine = "epcc " + rawCmdLine
332- }
333- rawCmdArguments , err := shellwords .SplitPosix (strings .Trim (rawCmdLine , " \n " ))
333+ objectPool := pool .NewObjectPool (ctx , factory , & pool.ObjectPoolConfig {
334+ MaxTotal : * maxConcurrency ,
335+ MaxIdle : * maxConcurrency ,
336+ })
334337
335- if err != nil {
336- cancelFunc ()
337- return err
338- }
338+ rawCmds := runbookAction .RawCommands
339+ for stepIdx := 0 ; stepIdx < len (rawCmds ); stepIdx ++ {
339340
340- funcs = append (funcs , func () {
341+ origIndex := & stepIdx
342+ // Create a copy of loop variables
343+ stepIdx := stepIdx
344+ rawCmd := rawCmds [stepIdx ]
341345
342- log .Tracef ("(Step %d/%d Command %d/%d) Building Commmand" , stepIdx + 1 , numSteps , commandIdx + 1 , len (funcs ))
346+ templateName := fmt .Sprintf ("Runbook: %s Action: %s Step: %d" , runbookName , runbookAction .Name , stepIdx )
347+ rawCmdLines , err := runbooks .RenderTemplates (templateName , rawCmd , runbookStringArguments , runbookAction .Variables )
343348
344- stepCmdObject , err := objectPool .BorrowObject (ctx )
345- defer objectPool .ReturnObject (ctx , stepCmdObject )
349+ if err != nil {
350+ cancelFunc ()
351+ return err
352+ }
346353
347- if err == nil {
348- commandAndResetFunc := stepCmdObject .(* CommandAndReset )
349- commandAndResetFunc .reset ()
350- stepCmd := commandAndResetFunc .cmd
354+ joinedString := strings .Join (rawCmdLines , "\n " )
355+ renderedCmd := []string {}
351356
352- tweakedArguments := misc .AddImplicitDoubleDash (rawCmdArguments )
353- stepCmd .SetArgs (tweakedArguments [1 :])
357+ err = yaml .Unmarshal ([]byte (joinedString ), & renderedCmd )
354358
355- stepCmd .SilenceErrors = true
356- log .Tracef ("(Step %d/%d Command %d/%d) Starting Command" , stepIdx + 1 , numSteps , commandIdx + 1 , len (funcs ))
359+ if err == nil {
360+ log .Tracef ("Line %d is a Yaml array %s, inserting into stack" , stepIdx , joinedString )
361+ newCmds := make ([]string , 0 , len (rawCmds )+ len (renderedCmd )- 1 )
362+ newCmds = append (newCmds , rawCmds [0 :stepIdx ]... )
363+ newCmds = append (newCmds , renderedCmd ... )
364+ newCmds = append (newCmds , rawCmds [stepIdx + 1 :]... )
365+ rawCmds = newCmds
366+ * origIndex --
367+ continue
368+ }
357369
358- stepCmd .ResetFlags ()
359- err = stepCmd .ExecuteContext (ctx )
360- log .Tracef ("(Step %d/%d Command %d/%d) Complete Command" , stepIdx + 1 , numSteps , commandIdx + 1 , len (funcs ))
361- }
370+ log .Infof ("Executing> %s" , rawCmd )
371+ resultChan := make (chan * commandResult , * maxConcurrency * 2 )
372+ funcs := make ([]func (), 0 , len (rawCmdLines ))
362373
363- commandResult := & commandResult {
364- stepIdx : stepIdx ,
365- commandIdx : commandIdx ,
366- commandLine : rawCmdLine ,
367- error : err ,
368- }
374+ for commandIdx , rawCmdLine := range rawCmdLines {
369375
370- resultChan <- commandResult
376+ commandIdx := commandIdx
377+ rawCmdLine := strings .Trim (rawCmdLine , " \n " )
371378
372- })
379+ if rawCmdLine == "" {
380+ // Allow blank lines
381+ continue
382+ }
373383
374- }
384+ if ! strings .HasPrefix (rawCmdLine , "epcc " ) {
385+ // Some commands like sleep don't have prefix
386+ // This hack allows them to run
387+ rawCmdLine = "epcc " + rawCmdLine
388+ }
389+ rawCmdArguments , err := shellwords .SplitPosix (strings .Trim (rawCmdLine , " \n " ))
375390
376- if len (funcs ) > 1 {
377- log .Debugf ("Running %d commands" , len (funcs ))
378- }
391+ if err != nil {
392+ cancelFunc ()
393+ return err
394+ }
379395
380- // Start processing all the functions
381- go func () {
382- for idx , fn := range funcs {
383- idx := idx
384- if shutdown .ShutdownFlag .Load () {
385- log .Infof ("Aborting runbook execution, after %d scheduled executions" , idx )
386- cancelFunc ()
387- break
388- }
396+ funcs = append (funcs , func () {
389397
390- fn := fn
391- log .Tracef ("Run %d is waiting on semaphore" , idx )
392- if err := concurrentRunSemaphore .Acquire (ctx , 1 ); err == nil {
393- go func () {
394- log .Tracef ("Run %d is starting" , idx )
395- defer concurrentRunSemaphore .Release (1 )
396- fn ()
397- }()
398- } else {
399- log .Warnf ("Run %d failed to get semaphore %v" , idx , err )
400- }
401- }
402- }()
403-
404- errorCount := 0
405- for i := 0 ; i < len (funcs ); i ++ {
406- select {
407- case result := <- resultChan :
408- if ! shutdown .ShutdownFlag .Load () {
409- if result .error != nil {
410- log .Warnf ("(Step %d/%d Command %d/%d) %v" , result .stepIdx + 1 , numSteps , result .commandIdx + 1 , len (funcs ), fmt .Errorf ("error processing command [%s], %w" , result .commandLine , result .error ))
411- errorCount ++
412- } else {
413- log .Debugf ("(Step %d/%d Command %d/%d) finished successfully " , result .stepIdx + 1 , numSteps , result .commandIdx + 1 , len (funcs ))
414- }
415- } else {
416- log .Tracef ("Shutdown flag enabled, completion result %v" , result )
417- cancelFunc ()
418- }
419- case <- time .After (time .Duration (* execTimeoutInSeconds ) * time .Second ):
420- return fmt .Errorf ("timeout of %d seconds reached, only %d of %d commands finished of step %d/%d" , * execTimeoutInSeconds , i + 1 , len (funcs ), stepIdx + 1 , numSteps )
398+ log .Tracef ("(Step %d/%d Command %d/%d) Building Commmand" , stepIdx + 1 , numSteps , commandIdx + 1 , len (funcs ))
421399
422- }
423- }
400+ stepCmdObject , err := objectPool . BorrowObject ( ctx )
401+ defer objectPool . ReturnObject ( ctx , stepCmdObject )
424402
425- if len (funcs ) > 1 {
426- log .Debugf ("Running %d commands complete" , len (funcs ))
427- }
403+ if err == nil {
404+ commandAndResetFunc := stepCmdObject .(* CommandAndReset )
405+ commandAndResetFunc .reset ()
406+ stepCmd := commandAndResetFunc .cmd
428407
429- if ! runbookAction .IgnoreErrors && errorCount > 0 {
430- return fmt .Errorf ("error occurred while processing script aborting" )
431- }
408+ tweakedArguments := misc .AddImplicitDoubleDash (rawCmdArguments )
409+ stepCmd .SetArgs (tweakedArguments [1 :])
410+
411+ stepCmd .SilenceErrors = true
412+ log .Tracef ("(Step %d/%d Command %d/%d) Starting Command" , stepIdx + 1 , numSteps , commandIdx + 1 , len (funcs ))
413+
414+ stepCmd .ResetFlags ()
415+ err = stepCmd .ExecuteContext (ctx )
416+ log .Tracef ("(Step %d/%d Command %d/%d) Complete Command" , stepIdx + 1 , numSteps , commandIdx + 1 , len (funcs ))
417+ }
418+
419+ commandResult := & commandResult {
420+ stepIdx : stepIdx ,
421+ commandIdx : commandIdx ,
422+ commandLine : rawCmdLine ,
423+ error : err ,
424+ }
425+
426+ resultChan <- commandResult
427+
428+ })
429+
430+ }
431+
432+ if len (funcs ) > 1 {
433+ log .Debugf ("Running %d commands" , len (funcs ))
434+ }
435+
436+ // Start processing all the functions
437+ go func () {
438+ for idx , fn := range funcs {
439+ idx := idx
440+ if shutdown .ShutdownFlag .Load () {
441+ log .Infof ("Aborting runbook execution, after %d scheduled executions" , idx )
442+ cancelFunc ()
443+ break
444+ }
445+
446+ fn := fn
447+ log .Tracef ("Run %d is waiting on semaphore" , idx )
448+ if err := concurrentRunSemaphore .Acquire (ctx , 1 ); err == nil {
449+ go func () {
450+ log .Tracef ("Run %d is starting" , idx )
451+ defer concurrentRunSemaphore .Release (1 )
452+ fn ()
453+ }()
454+ } else {
455+ log .Warnf ("Run %d failed to get semaphore %v" , idx , err )
456+ }
457+ }
458+ }()
459+
460+ errorCount := 0
461+ for i := 0 ; i < len (funcs ); i ++ {
462+ select {
463+ case result := <- resultChan :
464+ if ! shutdown .ShutdownFlag .Load () {
465+ if result .error != nil {
466+ log .Warnf ("(Step %d/%d Command %d/%d) %v" , result .stepIdx + 1 , numSteps , result .commandIdx + 1 , len (funcs ), fmt .Errorf ("error processing command [%s], %w" , result .commandLine , result .error ))
467+ errorCount ++
468+ } else {
469+ log .Debugf ("(Step %d/%d Command %d/%d) finished successfully " , result .stepIdx + 1 , numSteps , result .commandIdx + 1 , len (funcs ))
432470 }
433- defer cancelFunc ()
434- return nil
435- },
471+ } else {
472+ log .Tracef ("Shutdown flag enabled, completion result %v" , result )
473+ cancelFunc ()
474+ }
475+ case <- time .After (time .Duration (* execTimeoutInSeconds ) * time .Second ):
476+ return fmt .Errorf ("timeout of %d seconds reached, only %d of %d commands finished of step %d/%d" , * execTimeoutInSeconds , i + 1 , len (funcs ), stepIdx + 1 , numSteps )
477+
436478 }
437- processRunbookVariablesOnCommand ( runbookActionRunActionCommand , runbookStringArguments , runbookAction . Variables , true )
479+ }
438480
439- runbookRunRunbookCmd .AddCommand (runbookActionRunActionCommand )
481+ if len (funcs ) > 1 {
482+ log .Debugf ("Running %d commands complete" , len (funcs ))
440483 }
441- }
442484
443- return runbookRunCommand
485+ if ! runbookAction .IgnoreErrors && errorCount > 0 {
486+ return fmt .Errorf ("error occurred while processing script aborting" )
487+ }
488+ }
489+ defer cancelFunc ()
490+ return nil
444491}
445492
446493func processRunbookVariablesOnCommand (runbookActionRunActionCommand * cobra.Command , runbookStringArguments map [string ]* string , variables map [string ]runbooks.Variable , enableRequiredVars bool ) {
0 commit comments