diff --git a/apps/flowlord/handler.go b/apps/flowlord/handler.go
index 46603cb..38aca2e 100644
--- a/apps/flowlord/handler.go
+++ b/apps/flowlord/handler.go
@@ -5,6 +5,7 @@ import (
"embed"
"encoding/json"
"errors"
+ "fmt"
"html/template"
"io"
"io/fs"
@@ -118,7 +119,7 @@ func (tm *taskMaster) StartHandler() {
RunTime: gtools.PrintDuration(time.Since(tm.initTime)),
}
b, _ := json.Marshal(sts)
- if err := tm.slack.Notify(string(b), slack.OK); err != nil {
+ if err := tm.notify.Notify(string(b), slack.OK); err != nil {
w.Write([]byte(err.Error()))
}
})
@@ -140,7 +141,15 @@ func (tm *taskMaster) StartHandler() {
}
log.Printf("starting handler on :%v", tm.port)
- http.ListenAndServe(":"+strconv.Itoa(tm.port), router)
+ tm.httpServer = &http.Server{
+ Addr: ":" + strconv.Itoa(tm.port),
+ Handler: router,
+ }
+ go func() {
+ if err := tm.httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
+ log.Println("http server:", err)
+ }
+ }()
}
func (tm *taskMaster) Info(w http.ResponseWriter, r *http.Request) {
@@ -286,6 +295,10 @@ func (tm *taskMaster) refreshHandler(w http.ResponseWriter, _ *http.Request) {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
+ if err := tm.taskCache.Sync(); err != nil {
+ http.Error(w, "sqlite backup: "+err.Error(), http.StatusInternalServerError)
+ return
+ }
v := struct {
Files []string `json:",omitempty"`
Cache string
@@ -693,9 +706,9 @@ func (tm *taskMaster) aboutHTML() []byte {
"SchemaVersion": tm.taskCache.GetSchemaVersion(),
"Retention": gtools.PrintDuration(tm.taskCache.Retention),
"TaskTTL": gtools.PrintDuration(tm.taskCache.TaskTTL),
- "MinFrequency": gtools.PrintDuration(tm.slack.MinFrequency),
- "MaxFrequency": gtools.PrintDuration(tm.slack.MaxFrequency),
- "CurrentFrequency": gtools.PrintDuration(tm.slack.GetCurrentDuration()),
+ "MinFrequency": gtools.PrintDuration(tm.notify.MinFrequency),
+ "MaxFrequency": gtools.PrintDuration(tm.notify.MaxFrequency),
+ "CurrentFrequency": gtools.PrintDuration(tm.notify.GetAlertFrequency()),
"CurrentPage": "about",
"DateValue": "", // About page doesn't need date
"PageTitle": "System Information",
@@ -843,13 +856,31 @@ func (tm *taskMaster) Backloader(w http.ResponseWriter, r *http.Request) {
if req.Execute {
resp.Status = "Executed: " + resp.Status
+ attempted := len(resp.Tasks)
errs := appenderr.New()
+ failed := 0
for _, t := range resp.Tasks {
tm.taskCache.Add(t)
- errs.Add(tm.producer.Send(t.Type, t.JSONBytes()))
+ if err := tm.producer.Send(t.Type, t.JSONBytes()); err != nil {
+ failed++
+ errs.Add(err)
+ }
}
if errs.ErrOrNil() != nil {
- http.Error(w, "issue writing to producer "+errs.Error(), http.StatusInternalServerError)
+ // Sent vs attempted must come from the loop: appenderr merges duplicate messages,
+ // so it has no total count of failed sends.
+ sent := attempted - failed
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusInternalServerError)
+ _ = json.NewEncoder(w).Encode(map[string]interface{}{
+ "Status": fmt.Sprintf(
+ "%d/%d messages sent: %s",
+ sent,
+ attempted,
+ strings.TrimSpace(errs.Error()),
+ ),
+ })
+ return
}
} else {
resp.Status = "DRY RUN ONLY: " + resp.Status
diff --git a/apps/flowlord/handler/backload.tmpl b/apps/flowlord/handler/backload.tmpl
index 622bda3..ede34b2 100644
--- a/apps/flowlord/handler/backload.tmpl
+++ b/apps/flowlord/handler/backload.tmpl
@@ -139,7 +139,7 @@
-
Preview Results
+
Preview Results
diff --git a/apps/flowlord/handler/static/backload.js b/apps/flowlord/handler/static/backload.js
index 8644cdc..4ab0ca0 100644
--- a/apps/flowlord/handler/static/backload.js
+++ b/apps/flowlord/handler/static/backload.js
@@ -40,6 +40,7 @@
elements.executeBtn = document.getElementById('executeBtn');
elements.resetBtn = document.getElementById('resetBtn');
elements.previewSection = document.getElementById('previewSection');
+ elements.previewResultsHeading = document.getElementById('previewResultsHeading');
elements.previewStatus = document.getElementById('previewStatus');
elements.previewTableBody = document.getElementById('previewTableBody');
elements.previewCount = document.getElementById('previewCount');
@@ -549,12 +550,22 @@
}
}
+ // Parse API error body: JSON Status or raw text
+ function messageFromApiResponse(responseText, fallback) {
+ if (responseText == null || responseText === '') {
+ return fallback || 'Request failed';
+ }
+ try {
+ const j = JSON.parse(responseText);
+ if (j && typeof j.Status === 'string' && j.Status.length > 0) {
+ return j.Status;
+ }
+ } catch (e) { /* use raw */ }
+ return responseText;
+ }
+
// Execute button click handler
async function handleExecuteClick() {
- if (!confirm('Are you sure you want to execute this backload? This will create ' + previewTasks.length + ' tasks.')) {
- return;
- }
-
const request = buildRequest(true);
elements.requestBodyDisplay.textContent = JSON.stringify(request, null, 2);
@@ -568,31 +579,36 @@
});
const responseText = await response.text();
- let data;
-
+ let data = null;
try {
data = JSON.parse(responseText);
} catch (e) {
- throw new Error(responseText || 'Execution failed');
+ data = null;
}
if (!response.ok) {
- throw new Error(data.Status || responseText || 'Execution failed');
+ const msg = messageFromApiResponse(responseText, 'Execution failed');
+ elements.executionSection.style.display = 'block';
+ elements.executionStatus.className = 'execution-status error';
+ elements.executionStatus.textContent = msg;
+ return;
}
- elements.executionSection.style.display = 'block';
- elements.executionStatus.className = 'execution-status success';
- elements.executionStatus.innerHTML = `
- Success!
- ${escapeHtml(data.Status)}
- Created ${data.Count} tasks.
- `;
+ if (!data) {
+ elements.executionSection.style.display = 'block';
+ elements.executionStatus.className = 'execution-status error';
+ elements.executionStatus.textContent = responseText || 'Invalid JSON response';
+ return;
+ }
+
+ showExecutionResults(data);
+ elements.executionSection.style.display = 'none';
elements.executeBtn.style.display = 'none';
} catch (error) {
elements.executionSection.style.display = 'block';
elements.executionStatus.className = 'execution-status error';
- elements.executionStatus.textContent = 'Error: ' + error.message;
+ elements.executionStatus.textContent = error.message || String(error);
} finally {
setButtonLoading(elements.executeBtn, false, 'Execute Backload');
}
@@ -620,19 +636,16 @@
hideTemplateInfo();
initializeDates();
+ if (elements.previewResultsHeading) {
+ elements.previewResultsHeading.textContent = 'Preview Results';
+ }
updatePreviewButton();
}
- // Show preview results
- function showPreviewResults(data) {
- elements.previewSection.style.display = 'block';
- elements.previewStatus.className = 'preview-status info';
- elements.previewStatus.textContent = data.Status || 'Dry run complete';
-
+ function renderTasksIntoPreviewTable(tasks, emptyRowHtml) {
elements.previewTableBody.innerHTML = '';
-
- if (data.Tasks && data.Tasks.length > 0) {
- data.Tasks.forEach((task, index) => {
+ if (tasks && tasks.length > 0) {
+ tasks.forEach((task, index) => {
const row = document.createElement('tr');
row.innerHTML = `
${index + 1} |
@@ -643,17 +656,9 @@
`;
elements.previewTableBody.appendChild(row);
});
-
- elements.previewCount.textContent = `Total tasks to be created: ${data.Count}`;
- elements.executeBtn.style.display = 'inline-block';
- elements.executeBtn.disabled = false;
} else {
- elements.previewTableBody.innerHTML = '| No tasks would be created |
';
- elements.previewCount.textContent = '';
- elements.executeBtn.style.display = 'none';
+ elements.previewTableBody.innerHTML = emptyRowHtml || '| No tasks |
';
}
-
- // Add expand/collapse functionality to cells
document.querySelectorAll('#previewTableBody .expandable').forEach(cell => {
cell.addEventListener('click', function() {
this.classList.toggle('expanded');
@@ -661,6 +666,41 @@
});
}
+ // Show preview results
+ function showPreviewResults(data) {
+ if (elements.previewResultsHeading) {
+ elements.previewResultsHeading.textContent = 'Preview Results';
+ }
+ elements.previewSection.style.display = 'block';
+ elements.previewStatus.className = 'preview-status info';
+ elements.previewStatus.textContent = data.Status || 'Dry run complete';
+
+ renderTasksIntoPreviewTable(data.Tasks, '| No tasks would be created |
');
+
+ if (data.Tasks && data.Tasks.length > 0) {
+ elements.previewCount.textContent = `Total tasks to be created: ${data.Count}`;
+ elements.executeBtn.style.display = 'inline-block';
+ elements.executeBtn.disabled = false;
+ } else {
+ elements.previewCount.textContent = '';
+ elements.executeBtn.style.display = 'none';
+ }
+ }
+
+ function showExecutionResults(data) {
+ if (elements.previewResultsHeading) {
+ elements.previewResultsHeading.textContent = 'Execution results';
+ }
+ elements.previewSection.style.display = 'block';
+ elements.previewStatus.className = 'preview-status execution-success';
+ elements.previewStatus.innerHTML =
+ 'Executed (not a dry run)
' +
+ escapeHtml(data.Status || '');
+ renderTasksIntoPreviewTable(data.Tasks, '| No tasks were sent |
');
+ const n = typeof data.Count === 'number' ? data.Count : (data.Tasks && data.Tasks.length) || 0;
+ elements.previewCount.textContent = `Created ${n} task(s). Jobs were sent to the task bus.`;
+ }
+
// Initialize date inputs with today's date
function initializeDates() {
const today = new Date().toISOString().split('T')[0];
diff --git a/apps/flowlord/handler/static/style.css b/apps/flowlord/handler/static/style.css
index 2e4aeef..93d9d6a 100644
--- a/apps/flowlord/handler/static/style.css
+++ b/apps/flowlord/handler/static/style.css
@@ -1760,6 +1760,12 @@ select.form-control:disabled {
border: 1px solid #c3e6cb;
}
+.preview-status.execution-success {
+ background: #d4edda;
+ color: #155724;
+ border: 1px solid #c3e6cb;
+}
+
#previewTable {
margin-top: 16px;
}
diff --git a/apps/flowlord/handler_test.go b/apps/flowlord/handler_test.go
index be39aac..3862b25 100644
--- a/apps/flowlord/handler_test.go
+++ b/apps/flowlord/handler_test.go
@@ -652,7 +652,7 @@ func TestAboutHTML(t *testing.T) {
MinFrequency: 5 * time.Minute,
MaxFrequency: 30 * time.Minute,
}
- notification.currentDuration.Store(int64(10 * time.Minute))
+ notification.alertFrequency.Store(int64(10 * time.Minute))
// Create a mock taskMaster with test data
tm := &taskMaster{
@@ -660,7 +660,7 @@ func TestAboutHTML(t *testing.T) {
nextUpdate: time.Now().Add(30 * time.Minute), // 30 minutes from now
lastUpdate: time.Now().Add(-15 * time.Minute), // 15 minutes ago
taskCache: taskCache,
- slack: notification,
+ notify: notification,
}
// Generate HTML using the aboutHTML method
diff --git a/apps/flowlord/sqlite/sqlite.go b/apps/flowlord/sqlite/sqlite.go
index daec295..ad2ec84 100644
--- a/apps/flowlord/sqlite/sqlite.go
+++ b/apps/flowlord/sqlite/sqlite.go
@@ -247,25 +247,36 @@ func (o *SQLite) migrateSchema(currentVersion int) error {
// Close the DB connection and copy the current file to the backup location
func (o *SQLite) Close() error {
var errs []error
- if err := o.db.Close(); err != nil {
+ if err := o.Sync(); err != nil {
errs = append(errs, err)
}
- if o.BackupPath != "" {
- log.Printf("Backing up DB to %s", o.BackupPath)
- if err := o.Sync(); err != nil {
+ if o.db != nil {
+ if err := o.db.Close(); err != nil {
errs = append(errs, err)
}
}
+
if len(errs) > 0 {
return fmt.Errorf("close errors: %v", errs)
}
return nil
}
-// Sync the local DB to the backup location
+// Sync checkpoints WAL into the main DB file (so a plain file copy is consistent), then
+// copies LocalPath to BackupPath. Holds o.mu for the duration so backups do not interleave
+// with other cache operations that also take o.mu.
func (o *SQLite) Sync() error {
+ if o == nil || o.BackupPath == "" {
+ // no cache to backup
+ return nil
+ }
+ o.mu.Lock()
+ defer o.mu.Unlock()
+
+ if o.db != nil {
+ if _, err := o.db.Exec("PRAGMA wal_checkpoint(TRUNCATE);"); err != nil {
+ return fmt.Errorf("wal checkpoint: %w", err)
+ }
+ }
return copyFiles(o.LocalPath, o.BackupPath, o.fOpts)
}
-
-
-
diff --git a/apps/flowlord/taskmaster.go b/apps/flowlord/taskmaster.go
index 87d482d..6e8d67a 100644
--- a/apps/flowlord/taskmaster.go
+++ b/apps/flowlord/taskmaster.go
@@ -7,6 +7,7 @@ import (
"fmt"
"log"
"math/rand"
+ "net/http"
"net/url"
"regexp"
"strconv"
@@ -46,30 +47,88 @@ type taskMaster struct {
HostName string
port int
cron *cron.Cron
- slack *Notification
+ notify *Notification `toml:"slack"`
files []fileRule
- alerts chan task.Task
+ alerts chan task.Task
+ httpServer *http.Server
}
type Notification struct {
slack.Slack
//ReportPath string
- MinFrequency time.Duration
- MaxFrequency time.Duration
- currentDuration atomic.Int64 // Current notification duration (atomically updated)
+ MinFrequency time.Duration
+ MaxFrequency time.Duration
+ alertFrequency atomic.Int64 // summary backoff; same value as used in Tick (GetAlertFrequency)
+
+ // Alert loop state (owned by handleNotifications goroutine)
+ lastAlertTime time.Time // batch watermark: alerts with created_at after this are candidates
+ lastRun time.Time // watermark for new rows since last Tick (backoff churn)
file *file.Options
}
-// GetCurrentDuration returns the current notification duration
-func (n *Notification) GetCurrentDuration() time.Duration {
- return time.Duration(n.currentDuration.Load())
+// GetAlertFrequency returns the current notification duration
+func (n *Notification) GetAlertFrequency() time.Duration {
+ return time.Duration(n.alertFrequency.Load())
+}
+
+// setAlertFrequency atomically sets the current notification duration
+func (n *Notification) setAlertFrequency(d time.Duration) {
+ n.alertFrequency.Store(int64(d))
+}
+
+// initAlertLoopState resets watermarks for a new notification loop (local start-of-day batch anchor, tick anchor now).
+func (n *Notification) initAlertLoopState() {
+ now := time.Now()
+ n.lastAlertTime = time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
+ n.lastRun = time.Now()
+ n.setAlertFrequency(n.MinFrequency)
}
-// setCurrentDuration atomically sets the current notification duration
-func (n *Notification) setCurrentDuration(d time.Duration) {
- n.currentDuration.Store(int64(d))
+// Tick runs one poll: incomplete tasks, optional summary via sendSummary, then adjust backoff from new rows since lastRun.
+func (n *Notification) Tick(taskCache *sqlite.SQLite, sendSummary func([]sqlite.AlertRecord) error) {
+ taskCache.CheckIncompleteTasks()
+
+ pending, err := taskCache.GetAlertsAfterTime(n.lastAlertTime)
+ if err != nil {
+ log.Printf("failed to retrieve alerts: %v", err)
+ return
+ }
+ waitTime := n.GetAlertFrequency()
+ if len(pending) > 0 && time.Since(n.lastAlertTime) >= waitTime {
+ if err := sendSummary(pending); err != nil {
+ log.Println(err)
+ }
+ n.lastAlertTime = time.Now()
+ }
+
+ newAlerts, err := taskCache.GetAlertsAfterTime(n.lastRun)
+ if err != nil {
+ log.Printf("failed to retrieve alerts since last tick: %v", err)
+ n.lastRun = time.Now()
+ return
+ }
+
+ if len(newAlerts) == 0 && waitTime > n.MinFrequency {
+ next := waitTime / 2
+ if next < n.MinFrequency {
+ waitTime = n.MinFrequency
+ } else {
+ waitTime = next
+ }
+ log.Println("de-escalate waitTime ", waitTime)
+ } else if len(newAlerts) > 0 && waitTime < n.MaxFrequency {
+ next := waitTime * 2
+ if next > n.MaxFrequency {
+ waitTime = n.MaxFrequency
+ } else {
+ waitTime = next
+ }
+ log.Println("escalate waitTime ", waitTime)
+ }
+ n.setAlertFrequency(waitTime)
+ n.lastRun = time.Now()
}
type stats struct {
@@ -116,7 +175,7 @@ func New(opts *options) *taskMaster {
opts.Slack.file = opts.File
// Initialize current duration to MinFrequency
- opts.Slack.setCurrentDuration(opts.Slack.MinFrequency)
+ opts.Slack.setAlertFrequency(opts.Slack.MinFrequency)
tm := &taskMaster{
initTime: time.Now(),
taskCache: opts.DB,
@@ -130,7 +189,7 @@ func New(opts *options) *taskMaster {
HostName: opts.Host,
cron: cron.New(cron.WithParser(cronParser)),
dur: opts.Refresh,
- slack: opts.Slack,
+ notify: opts.Slack,
alerts: make(chan task.Task, 20),
}
if opts.FileTopic != "" {
@@ -197,16 +256,20 @@ func (tm *taskMaster) Run(ctx context.Context) (err error) {
if err != nil {
log.Fatal(err)
}
- go func() { // auto refresh cache after set duration
+ go func() {
workflowTick := time.NewTicker(tm.dur)
DBTick := time.NewTicker(24 * time.Hour)
+ defer workflowTick.Stop()
+ defer DBTick.Stop()
for {
select {
+ case <-ctx.Done():
+ return
case <-DBTick.C:
- if s, err := tm.taskCache.Recycle(time.Now().Add(-tm.taskCache.Retention)); err != nil {
- log.Println("task cache recycle:", err)
- } else {
- log.Println(s)
+ s, err := tm.taskCache.Recycle(time.Now().Add(-tm.taskCache.Retention))
+ log.Printf(" cache recycle:%s err:%v", s, err)
+ if err := tm.taskCache.Sync(); err != nil {
+ log.Println("DB sync", err)
}
case <-workflowTick.C:
if _, err := tm.refreshCache(); err != nil {
@@ -227,6 +290,14 @@ func (tm *taskMaster) Run(ctx context.Context) (err error) {
go tm.handleNotifications(tm.alerts, ctx)
<-ctx.Done()
log.Println("shutting down")
+ shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if tm.httpServer != nil {
+ if err := tm.httpServer.Shutdown(shutdownCtx); err != nil {
+ log.Printf("http shutdown: %v", err)
+ }
+ }
+ tm.cron.Stop()
return tm.taskCache.Close()
}
@@ -473,57 +544,26 @@ func (tm *taskMaster) readFiles(ctx context.Context) {
}
// handleNotifications gathers all 'failed' tasks and incomplete tasks
-// sends a summary message every X minutes
-// It uses an exponential backoff to limit the number of messages
-// ie, (min) 5 -> 10 -> 20 -> 40 -> 80 -> 160 (max)
-// The backoff is cleared after no failed tasks occur within the window
+// and sends a Slack summary when there is pending work and waitTime has elapsed.
+// Polling runs at MinFrequency; waitTime is the exponential backoff between
+// summary sends (min → max: e.g. 5 → 10 → 20 …), descalated when no new alert
+// rows appear since lastRun, escalated when any do.
func (tm *taskMaster) handleNotifications(taskChan chan task.Task, ctx context.Context) {
- sendChan := make(chan struct{})
- var alerts []sqlite.AlertRecord
-
- // Initialize lastAlertTime to today at 00:00:00 (zero hour)
- now := time.Now()
- lastAlertTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
-
- go func() {
- dur := tm.slack.MinFrequency
- for ; ; time.Sleep(dur) {
- var err error
+ tm.notify.initAlertLoopState()
- // Check for incomplete tasks and add them to alerts
- tm.taskCache.CheckIncompleteTasks()
+ tm.notify.Tick(tm.taskCache, tm.sendAlertSummary)
+ ticker := time.NewTicker(tm.notify.MinFrequency)
+ defer ticker.Stop()
- // Get NEW alerts only - those after the last time we sent
- alerts, err = tm.taskCache.GetAlertsAfterTime(lastAlertTime)
- if err != nil {
- log.Printf("failed to retrieve alerts: %v", err)
- continue
- }
-
- if len(alerts) > 0 {
- sendChan <- struct{}{}
- // Update lastAlertTime to now (before we send, so we don't miss any)
- lastAlertTime = time.Now()
- if dur *= 2; dur > tm.slack.MaxFrequency {
- dur = tm.slack.MaxFrequency
- }
- tm.slack.setCurrentDuration(dur) // Update current duration atomically
- log.Println("wait time ", dur)
- } else if dur != tm.slack.MinFrequency {
- // No NEW alerts - reset to minimum frequency
- dur = tm.slack.MinFrequency
- tm.slack.setCurrentDuration(dur) // Update current duration atomically
- log.Println("Reset ", dur)
- }
- }
- }()
for {
select {
+ case <-ticker.C:
+ tm.notify.Tick(tm.taskCache, tm.sendAlertSummary)
case tsk := <-taskChan:
// if the task result is an alert result, send a slack notification now
if tsk.Result == task.AlertResult {
b, _ := json.MarshalIndent(tsk, "", " ")
- if err := tm.slack.Slack.Notify(string(b), slack.Critical); err != nil {
+ if err := tm.notify.Slack.Notify(string(b), slack.Critical); err != nil {
log.Println(err)
}
} else { // if the task result is not an alert result add to the tasks list summary
@@ -531,16 +571,10 @@ func (tm *taskMaster) handleNotifications(taskChan chan task.Task, ctx context.C
log.Printf("failed to store alert: %v", err)
}
}
- case <-sendChan:
- // prepare message
- if err := tm.sendAlertSummary(alerts); err != nil {
- log.Println(err)
- }
case <-ctx.Done():
return
}
}
-
}
// sendAlertSummary sends a formatted alert summary to Slack
@@ -553,7 +587,7 @@ func (tm *taskMaster) sendAlertSummary(alerts []sqlite.AlertRecord) error {
// build compact summary using existing logic
summary := sqlite.BuildCompactSummary(alerts)
- // format message similar to current Slack format
+ // format message similar to current Slack format
var message strings.Builder
message.WriteString(fmt.Sprintf("see report at %v:%d/web/alert?date=%s\n", tm.HostName, tm.port, time.Now().Format("2006-01-02")))
@@ -564,8 +598,8 @@ func (tm *taskMaster) sendAlertSummary(alerts []sqlite.AlertRecord) error {
// send to Slack if configured
log.Println(message.String())
- if tm.slack != nil {
- if err := tm.slack.Notify(message.String(), slack.Critical); err != nil {
+ if tm.notify != nil {
+ if err := tm.notify.Notify(message.String(), slack.Critical); err != nil {
return fmt.Errorf("failed to send alert summary to Slack: %w", err)
}
}
diff --git a/apps/flowlord/taskmaster_test.go b/apps/flowlord/taskmaster_test.go
index 5895ce1..8031937 100644
--- a/apps/flowlord/taskmaster_test.go
+++ b/apps/flowlord/taskmaster_test.go
@@ -36,7 +36,7 @@ func TestTaskMaster_Process(t *testing.T) {
fn := func(tsk task.Task) ([]task.Task, error) {
var alerts int64
- tm := taskMaster{doneConsumer: consumer, taskCache: taskCache, failedTopic: "failed-topic", alerts: make(chan task.Task), slack: &Notification{}}
+ tm := taskMaster{doneConsumer: consumer, taskCache: taskCache, failedTopic: "failed-topic", alerts: make(chan task.Task), notify: &Notification{}}
producer, _ := nop.NewProducer("")
tm.producer = producer
nop.FakeMsg = tsk.JSONBytes()
@@ -529,3 +529,104 @@ func TestIsReady(t *testing.T) {
}
trial.New(fn, cases).Test(t)
}
+
+func TestNotification_Tick(t *testing.T) {
+ // in is the full pre-tick state: frequencies, watermarks relative to a single
+ // time.Now() at the start of the test function, and an optional row inserted before Tick.
+ type in struct {
+ MinFrequency time.Duration
+ MaxFrequency time.Duration
+ InitFrequency time.Duration
+ // lastAlertTime = now.Add(LastAlertFromNow); lastRun = now.Add(LastRunFromNow)
+ LastAlertFromNow time.Duration
+ LastRunFromNow time.Duration
+ AddAlert *task.Task // if non-nil, insert this alert before Tick
+ }
+ type out struct {
+ Freq time.Duration
+ Alerted bool
+ }
+
+ fn := func(in in) (out, error) {
+ db := &sqlite.SQLite{LocalPath: ":memory:"}
+ if err := db.Open(base_test_path+"workflow", nil); err != nil {
+ return out{}, err
+ }
+ defer db.Close()
+
+ n := &Notification{
+ MinFrequency: in.MinFrequency,
+ MaxFrequency: in.MaxFrequency,
+ }
+ n.setAlertFrequency(in.InitFrequency)
+
+ now := time.Now()
+ n.lastAlertTime = now.Add(in.LastAlertFromNow)
+ n.lastRun = now.Add(in.LastRunFromNow)
+
+ if in.AddAlert != nil {
+ if err := db.AddAlert(*in.AddAlert, "err"); err != nil {
+ return out{}, err
+ }
+ }
+
+ alerted := false
+ sendSummary := func(alerts []sqlite.AlertRecord) error {
+ alerted = true
+ return nil
+ }
+
+ n.Tick(db, sendSummary)
+ return out{n.GetAlertFrequency(), alerted}, nil
+ }
+
+ cases := trial.Cases[in, out]{
+ "de-escalate halves when no new alerts": {
+ Input: in{
+ MinFrequency: 5 * time.Minute, MaxFrequency: 80 * time.Minute,
+ InitFrequency: 20 * time.Minute,
+ LastAlertFromNow: 0, LastRunFromNow: 0,
+ AddAlert: nil,
+ },
+ Expected: out{Freq: 10 * time.Minute},
+ },
+ "de-escalate floors at MinFrequency": {
+ Input: in{
+ MinFrequency: 5 * time.Minute, MaxFrequency: 80 * time.Minute,
+ InitFrequency: 8 * time.Minute,
+ LastAlertFromNow: 0, LastRunFromNow: 0,
+ AddAlert: nil,
+ },
+ Expected: out{Freq: 5 * time.Minute},
+ },
+ "escalate doubles when new alerts since lastRun": {
+ Input: in{
+ MinFrequency: 5 * time.Minute, MaxFrequency: 80 * time.Minute,
+ InitFrequency: 5 * time.Minute,
+ LastAlertFromNow: 0, LastRunFromNow: -10 * time.Minute,
+ AddAlert: &task.Task{ID: "esc", Type: "t1", Job: "j1"},
+ },
+ Expected: out{Freq: 10 * time.Minute},
+ },
+ "summary send then escalate from new rows": {
+ Input: in{
+ MinFrequency: 5 * time.Minute, MaxFrequency: 80 * time.Minute,
+ InitFrequency: 5 * time.Minute,
+ LastAlertFromNow: -1 * time.Hour, LastRunFromNow: -15 * time.Minute,
+ AddAlert: &task.Task{ID: "sum", Type: "t2", Job: "j2"},
+ },
+ Expected: out{Freq: 10 * time.Minute, Alerted: true},
+ },
+ "at MaxFrequency new alerts do not raise further": {
+ Input: in{
+ MinFrequency: 5 * time.Minute, MaxFrequency: 80 * time.Minute,
+ InitFrequency: 80 * time.Minute,
+ LastAlertFromNow: 0, LastRunFromNow: -10 * time.Minute,
+ AddAlert: &task.Task{ID: "max", Type: "t3", Job: "j3"},
+ },
+ Expected: out{Freq: 80 * time.Minute},
+ },
+ }
+
+ trial.New(fn, cases).SubTest(t)
+}
diff --git a/apps/go.mod b/apps/go.mod
index 9c19acf..8448bee 100644
--- a/apps/go.mod
+++ b/apps/go.mod
@@ -24,7 +24,7 @@ require (
github.com/json-iterator/go v1.1.12
github.com/lib/pq v1.10.9
github.com/pcelvng/task v0.8.0
- github.com/pcelvng/task-tools v0.32.0
+ github.com/pcelvng/task-tools v0.32.1
github.com/prometheus/client_golang v1.18.0
github.com/robfig/cron/v3 v3.0.1
github.com/stretchr/testify v1.11.1
@@ -41,7 +41,6 @@ require (
github.com/apache/arrow/go/v15 v15.0.2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bitly/go-hostpool v0.1.0 // indirect
- github.com/buger/jsonparser v1.1.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.3 // indirect
diff --git a/apps/go.sum b/apps/go.sum
index 4b897d4..8be375e 100644
--- a/apps/go.sum
+++ b/apps/go.sum
@@ -78,8 +78,6 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bitly/go-hostpool v0.1.0 h1:XKmsF6k5el6xHG3WPJ8U0Ku/ye7njX7W81Ng7O2ioR0=
github.com/bitly/go-hostpool v0.1.0/go.mod h1:4gOCgp6+NZnVqlKyZ/iBZFTAJKembaVENUpMkpg42fw=
-github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
-github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
@@ -303,8 +301,8 @@ github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE=
github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
github.com/pcelvng/task v0.8.0 h1:YA0eXGV801IMt8zePwB15GE126R+pSmyGUeDco3f8dI=
github.com/pcelvng/task v0.8.0/go.mod h1:REM+jcZWlxD0b6nSlCow4d51FRLt0y164Ik3sR0Ahag=
-github.com/pcelvng/task-tools v0.32.0 h1:JuP7WHVkQTKMCWcsrikpPyWZ6j4oKBq0eiT5r4bnyos=
-github.com/pcelvng/task-tools v0.32.0/go.mod h1:YKkmKpMCFfFdMikPBzwi5kLVjTYvtvLxZrGCZ591YdI=
+github.com/pcelvng/task-tools v0.32.1 h1:jIJ9AumaQr0FrquYahI+lYSq8onaS9RrAJ+Nt0wgaW4=
+github.com/pcelvng/task-tools v0.32.1/go.mod h1:38nofr0jrgQ6H18tFDKTVMU52h1bbwE6jyyjTe293SM=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=