Skip to content

Commit 2b9b799

Browse files
committed
address the memory time bomb
1 parent d0de500 commit 2b9b799

File tree

4 files changed

+104
-51
lines changed

4 files changed

+104
-51
lines changed

taco/internal/tfe/apply.go

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"net/http"
66
"os"
77
"strconv"
8+
"strings"
89
"time"
910

1011
"github.com/diggerhq/digger/opentaco/internal/auth"
@@ -100,15 +101,34 @@ func (h *TfeHandler) GetApplyLogs(c echo.Context) error {
100101
return c.JSON(http.StatusNotFound, map[string]string{"error": "apply not found"})
101102
}
102103

103-
// Try to get apply logs from blob storage
104+
// Read apply logs from chunked S3 objects
105+
// Chunks are stored as applies/{applyID}/chunks/00000001.log, 00000002.log, etc.
104106
var logText string
105-
applyLogBlobID := fmt.Sprintf("runs/%s/apply-logs.txt", run.ID)
107+
chunkIndex := 1
108+
var fullLogs strings.Builder
106109

107-
logData, err := h.blobStore.DownloadBlob(ctx, applyLogBlobID)
108-
if err == nil {
109-
logText = string(logData)
110-
} else {
111-
// If logs don't exist yet, return placeholder
110+
for {
111+
chunkKey := fmt.Sprintf("applies/%s/chunks/%08d.log", run.ID, chunkIndex)
112+
logData, err := h.blobStore.DownloadBlob(ctx, chunkKey)
113+
114+
if err != nil {
115+
// Chunk doesn't exist - check if apply is still running
116+
if run.Status == "applied" || run.Status == "errored" {
117+
// Apply is done, no more chunks coming
118+
break
119+
}
120+
// Apply still running, this chunk doesn't exist yet
121+
break
122+
}
123+
124+
fullLogs.Write(logData)
125+
chunkIndex++
126+
}
127+
128+
logText = fullLogs.String()
129+
130+
// If no chunks exist yet, generate default message based on status
131+
if logText == "" {
112132
if run.Status == "applying" || run.Status == "apply_queued" {
113133
logText = "Waiting for apply to start...\n"
114134
} else {

taco/internal/tfe/apply_executor.go

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -147,23 +147,37 @@ func (e *ApplyExecutor) ExecuteApply(ctx context.Context, runID string) error {
147147
}
148148
}()
149149

150-
// Buffered logging to reduce blob storage roundtrips
151-
applyLogBlobID := fmt.Sprintf("runs/%s/apply-logs.txt", run.ID)
150+
// Chunked logging to prevent memory bloat
151+
// Upload log chunks as separate S3 objects and clear buffer after each upload
152+
chunkIndex := 1
152153
var logBuffer bytes.Buffer
153154
var logMutex sync.Mutex
154155
lastLogFlush := time.Now()
155-
lastFlushSize := 0
156156

157+
// Flush helper - uploads current buffer as a chunk and clears it
157158
flushLogs := func() error {
158159
logMutex.Lock()
159-
defer logMutex.Unlock()
160160
if logBuffer.Len() == 0 {
161+
logMutex.Unlock()
161162
return nil
162163
}
163-
err := e.blobStore.UploadBlob(ctx, applyLogBlobID, logBuffer.Bytes())
164+
// Copy buffer to avoid holding lock during upload
165+
data := make([]byte, logBuffer.Len())
166+
copy(data, logBuffer.Bytes())
167+
currentChunk := chunkIndex
168+
logMutex.Unlock()
169+
170+
// Upload this chunk (key includes zero-padded chunk index)
171+
chunkKey := fmt.Sprintf("applies/%s/chunks/%08d.log", run.ID, currentChunk)
172+
err := e.blobStore.UploadBlob(ctx, chunkKey, data)
173+
164174
if err == nil {
175+
logMutex.Lock()
165176
lastLogFlush = time.Now()
166-
lastFlushSize = logBuffer.Len()
177+
// Clear buffer to free memory
178+
logBuffer.Reset()
179+
chunkIndex++
180+
logMutex.Unlock()
167181
}
168182
return err
169183
}
@@ -172,8 +186,8 @@ func (e *ApplyExecutor) ExecuteApply(ctx context.Context, runID string) error {
172186
logMutex.Lock()
173187
logBuffer.WriteString(message)
174188
now := time.Now()
175-
// Flush if we have >1KB of NEW data or if 1s has passed
176-
shouldFlush := (logBuffer.Len()-lastFlushSize) > 1024 || now.Sub(lastLogFlush) > 1*time.Second
189+
// Flush if buffer exceeds chunk size (256KB) or 1s has passed
190+
shouldFlush := logBuffer.Len() > 256*1024 || now.Sub(lastLogFlush) > 1*time.Second
177191
logMutex.Unlock()
178192

179193
if shouldFlush {
@@ -334,7 +348,7 @@ func (e *ApplyExecutor) ExecuteApply(ctx context.Context, runID string) error {
334348
if applyErr != nil {
335349
runStatus = "errored"
336350
logs = logs + "\n\nError: " + applyErr.Error()
337-
_ = e.blobStore.UploadBlob(ctx, applyLogBlobID, []byte(logs))
351+
// Error already logged via appendLog in the executor
338352
if updateErr := e.runRepo.UpdateRunError(ctx, run.ID, applyErr.Error()); updateErr != nil {
339353
logger.Error("failed to update run error", slog.String("error", updateErr.Error()))
340354
}
@@ -351,7 +365,7 @@ func (e *ApplyExecutor) ExecuteApply(ctx context.Context, runID string) error {
351365
runStatus = "errored"
352366
errMsg := fmt.Sprintf("Failed to upload state: %v", uploadErr)
353367
logs = logs + "\n\nCritical Error: " + errMsg + "\n"
354-
_ = e.blobStore.UploadBlob(ctx, applyLogBlobID, []byte(logs))
368+
// Error already logged via appendLog in the executor
355369
if updateErr := e.runRepo.UpdateRunError(ctx, run.ID, errMsg); updateErr != nil {
356370
logger.Error("failed to update run error", slog.String("error", updateErr.Error()))
357371
}

taco/internal/tfe/plan.go

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"net/http"
77
"os"
88
"strconv"
9+
"strings"
910
"time"
1011

1112
"github.com/diggerhq/digger/opentaco/internal/auth"
@@ -104,20 +105,34 @@ func (h *TfeHandler) GetPlanLogs(c echo.Context) error {
104105
return c.JSON(http.StatusNotFound, map[string]string{"error": "plan not found"})
105106
}
106107

107-
// Check if logs exist in blob storage
108+
// Read logs from chunked S3 objects
109+
// Chunks are stored as plans/{planID}/chunks/00000001.log, 00000002.log, etc.
108110
var logText string
109-
if plan.LogBlobID != nil {
110-
// Try to get logs from blob storage
111-
logData, err := h.blobStore.DownloadBlob(ctx, *plan.LogBlobID)
111+
chunkIndex := 1
112+
var fullLogs strings.Builder
113+
114+
for {
115+
chunkKey := fmt.Sprintf("plans/%s/chunks/%08d.log", planID, chunkIndex)
116+
logData, err := h.blobStore.DownloadBlob(ctx, chunkKey)
117+
112118
if err != nil {
113-
fmt.Printf("Failed to get logs from blob storage: %v\n", err)
114-
// Fall back to default logs
115-
logText = generateDefaultPlanLogs(plan)
116-
} else {
117-
logText = string(logData)
119+
// Chunk doesn't exist - check if plan is still running
120+
if plan.Status == "finished" || plan.Status == "errored" {
121+
// Plan is done, no more chunks coming
122+
break
123+
}
124+
// Plan still running, this chunk doesn't exist yet
125+
break
118126
}
119-
} else {
120-
// Generate default logs based on plan status
127+
128+
fullLogs.Write(logData)
129+
chunkIndex++
130+
}
131+
132+
logText = fullLogs.String()
133+
134+
// If no chunks exist yet, generate default logs based on status
135+
if logText == "" {
121136
logText = generateDefaultPlanLogs(plan)
122137
}
123138

taco/internal/tfe/plan_executor.go

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -185,37 +185,50 @@ func (e *PlanExecutor) ExecutePlan(ctx context.Context, runID string) error {
185185
}
186186
}()
187187

188-
// Buffered logging to reduce blob storage roundtrips
189-
// Instead of download-append-upload on each message, we accumulate in memory
190-
// and flush periodically (every 1KB or 5 seconds)
191-
logBlobID := fmt.Sprintf("plans/%s/logs.txt", *run.PlanID)
188+
// Chunked logging to prevent memory bloat
189+
// Upload log chunks as separate S3 objects and clear buffer after each upload
190+
// This keeps memory usage bounded regardless of total log size
191+
chunkIndex := 1
192192
var logBuffer bytes.Buffer
193193
var logMutex sync.Mutex
194194
lastLogFlush := time.Now()
195-
lastFlushSize := 0
196195

197-
// Flush helper - uploads current buffer to blob storage
196+
// Flush helper - uploads current buffer as a chunk and clears it
198197
flushLogs := func() error {
199198
logMutex.Lock()
200-
defer logMutex.Unlock()
201199
if logBuffer.Len() == 0 {
200+
logMutex.Unlock()
202201
return nil
203202
}
204-
err := e.blobStore.UploadBlob(ctx, logBlobID, logBuffer.Bytes())
203+
// Copy buffer to avoid holding lock during upload
204+
data := make([]byte, logBuffer.Len())
205+
copy(data, logBuffer.Bytes())
206+
currentChunk := chunkIndex
207+
logMutex.Unlock()
208+
209+
// Upload this chunk (key includes zero-padded chunk index)
210+
chunkKey := fmt.Sprintf("plans/%s/chunks/%08d.log", *run.PlanID, currentChunk)
211+
err := e.blobStore.UploadBlob(ctx, chunkKey, data)
212+
205213
if err == nil {
214+
logMutex.Lock()
206215
lastLogFlush = time.Now()
207-
lastFlushSize = logBuffer.Len()
216+
// Clear buffer to free memory (this is the key fix for memory bloat!)
217+
logBuffer.Reset()
218+
chunkIndex++
219+
logMutex.Unlock()
208220
}
209221
return err
210222
}
211223

224+
212225
// Buffered append - only uploads when buffer is large or time has elapsed
213226
appendLog := func(message string) {
214227
logMutex.Lock()
215228
logBuffer.WriteString(message)
216229
now := time.Now()
217-
// Flush if we have >1KB of NEW data or if 1s has passed
218-
shouldFlush := (logBuffer.Len()-lastFlushSize) > 1024 || now.Sub(lastLogFlush) > 1*time.Second
230+
// Flush if buffer exceeds chunk size (256KB) or 1s has passed
231+
shouldFlush := logBuffer.Len() > 256*1024 || now.Sub(lastLogFlush) > 1*time.Second
219232
logMutex.Unlock()
220233

221234
if shouldFlush {
@@ -235,17 +248,8 @@ func (e *PlanExecutor) ExecutePlan(ctx context.Context, runID string) error {
235248
}
236249
logger.Info("updated run status to planning")
237250

238-
// Update plan with LogBlobID immediately so API can stream logs
239-
// This restores the domain pattern where the DB is the source of truth
240-
if run.PlanID != nil {
241-
planUpdates := &domain.TFEPlanUpdate{
242-
LogBlobID: &logBlobID,
243-
}
244-
if err := e.planRepo.UpdatePlan(ctx, *run.PlanID, planUpdates); err != nil {
245-
logger.Warn("failed to update plan with log blob ID", slog.String("error", err.Error()))
246-
// Non-fatal, continue
247-
}
248-
}
251+
// Note: We no longer set LogBlobID since we use chunked logging
252+
// The API reads chunks directly from plans/{planID}/chunks/*.log
249253

250254
appendLog("Preparing terraform run...\n")
251255

@@ -473,7 +477,7 @@ func (e *PlanExecutor) ExecutePlan(ctx context.Context, runID string) error {
473477
ResourceChanges: &changes,
474478
ResourceDestructions: &destroys,
475479
HasChanges: &hasChanges,
476-
LogBlobID: &logBlobID,
480+
// LogBlobID removed - we use chunked logging now
477481
LogReadURL: &logReadURL,
478482
}
479483
if len(planJSON) > 0 {

0 commit comments

Comments
 (0)