Skip to content

Commit 62b83fe

Browse files
committed
chunked logs
1 parent 2b9b799 commit 62b83fe

File tree

6 files changed

+153
-65
lines changed

6 files changed

+153
-65
lines changed

sandbox-sidecar/src/runners/e2bRunner.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,6 @@ export class E2BSandboxRunner implements SandboxRunner {
6868
sandbox,
6969
workDir,
7070
["show", "-json", "tfplan.binary"],
71-
logs,
72-
streamLog,
7371
);
7472

7573
const planJSON = showResult.stdout;

taco/internal/tfe/apply.go

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -101,16 +101,29 @@ func (h *TfeHandler) GetApplyLogs(c echo.Context) error {
101101
return c.JSON(http.StatusNotFound, map[string]string{"error": "apply not found"})
102102
}
103103

104-
// Read apply logs from chunked S3 objects
104+
// Read apply logs from chunked S3 objects (fixed 2KB chunks)
105105
// Chunks are stored as applies/{applyID}/chunks/00000001.log, 00000002.log, etc.
106+
const chunkSize = 2 * 1024 // Must match writer's chunk size
107+
108+
// Determine which chunk contains the requested offset to avoid re-downloading
109+
// data the client already has.
110+
startChunk := 1
111+
if offsetInt > 1 { // offset includes STX byte at offset 0
112+
logOffset := offsetInt - 1
113+
startChunk = int(logOffset/chunkSize) + 1
114+
}
115+
116+
// Number of bytes before the first chunk we fetch (used to map offsets)
117+
bytesBefore := int64(chunkSize * (startChunk - 1))
118+
106119
var logText string
107-
chunkIndex := 1
120+
chunkIndex := startChunk
108121
var fullLogs strings.Builder
109-
122+
110123
for {
111124
chunkKey := fmt.Sprintf("applies/%s/chunks/%08d.log", run.ID, chunkIndex)
112125
logData, err := h.blobStore.DownloadBlob(ctx, chunkKey)
113-
126+
114127
if err != nil {
115128
// Chunk doesn't exist - check if apply is still running
116129
if run.Status == "applied" || run.Status == "errored" {
@@ -120,15 +133,19 @@ func (h *TfeHandler) GetApplyLogs(c echo.Context) error {
120133
// Apply still running, this chunk doesn't exist yet
121134
break
122135
}
123-
136+
137+
// Keep chunks at full 2048 bytes (don't trim nulls) for correct offset math
124138
fullLogs.Write(logData)
125139
chunkIndex++
126140
}
127-
141+
128142
logText = fullLogs.String()
129-
143+
144+
// NOW trim all null bytes from the result (after offset calculation is done)
145+
logText = strings.TrimRight(logText, "\x00")
146+
130147
// If no chunks exist yet, generate default message based on status
131-
if logText == "" {
148+
if logText == "" && offsetInt == 0 {
132149
if run.Status == "applying" || run.Status == "apply_queued" {
133150
logText = "Waiting for apply to start...\n"
134151
} else {
@@ -146,16 +163,22 @@ func (h *TfeHandler) GetApplyLogs(c echo.Context) error {
146163
fmt.Printf("📤 APPLY LOGS at offset=0: STX + %d bytes of log text\n", len(logText))
147164
} else {
148165
// Client already received STX (1 byte at offset 0)
149-
// Map stream offset to logText offset: streamOffset=1 → logText[0]
150-
logOffset := offsetInt - 1
166+
// Map stream offset to logText offset:
167+
// - stream offset 0 = STX
168+
// - stream offset 1 = first byte of full logs
169+
// We only fetched chunks starting at startChunk, so subtract bytesBefore.
170+
logOffset := offsetInt - 1 - bytesBefore
171+
if logOffset < 0 {
172+
logOffset = 0
173+
}
151174

152175
if logOffset < int64(len(logText)) {
153176
// Send remaining log text
154177
responseData = []byte(logText[logOffset:])
155178
fmt.Printf("📤 APPLY LOGS at offset=%d: sending %d bytes (logText[%d:])\n",
156179
offsetInt, len(responseData), logOffset)
157-
} else if logOffset == int64(len(logText)) && run.Status == "applied" {
158-
// All logs sent, send ETX
180+
} else if run.Status == "applied" || run.Status == "errored" {
181+
// All logs sent (or client offset beyond end), send ETX
159182
responseData = []byte{0x03}
160183
fmt.Printf("📤 Sending ETX (End of Text) for apply %s - logs complete\n", applyID)
161184
} else {

taco/internal/tfe/apply_executor.go

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -149,34 +149,57 @@ func (e *ApplyExecutor) ExecuteApply(ctx context.Context, runID string) error {
149149

150150
// Chunked logging to prevent memory bloat
151151
// Upload log chunks as separate S3 objects and clear buffer after each upload
152+
// Fixed-size 2KB chunks enable offset-based chunk selection (reduces S3 re-downloads)
153+
const chunkSize = 2 * 1024 // 2KB fixed size
152154
chunkIndex := 1
153155
var logBuffer bytes.Buffer
154156
var logMutex sync.Mutex
155157
lastLogFlush := time.Now()
156158

157-
// Flush helper - uploads current buffer as a chunk and clears it
159+
// Flush helper - uploads current buffer as a padded 2KB chunk and clears it
158160
flushLogs := func() error {
159161
logMutex.Lock()
160162
if logBuffer.Len() == 0 {
161163
logMutex.Unlock()
162164
return nil
163165
}
164-
// Copy buffer to avoid holding lock during upload
165-
data := make([]byte, logBuffer.Len())
166-
copy(data, logBuffer.Bytes())
166+
167+
// Extract at most chunkSize bytes (2KB)
168+
dataLen := logBuffer.Len()
169+
if dataLen > chunkSize {
170+
dataLen = chunkSize
171+
}
172+
data := make([]byte, dataLen)
173+
copy(data, logBuffer.Bytes()[:dataLen])
167174
currentChunk := chunkIndex
175+
chunkIndex++ // Increment NOW before unlock to reserve this chunk number atomically
176+
177+
// Copy remainder BEFORE resetting (crucial - remainder slice points to internal buffer)
178+
var remainderCopy []byte
179+
if logBuffer.Len() > dataLen {
180+
remainder := logBuffer.Bytes()[dataLen:]
181+
remainderCopy = make([]byte, len(remainder))
182+
copy(remainderCopy, remainder)
183+
}
184+
185+
// Now safe to reset and write remainder back
186+
logBuffer.Reset()
187+
if len(remainderCopy) > 0 {
188+
logBuffer.Write(remainderCopy)
189+
}
168190
logMutex.Unlock()
169191

192+
// Pad to fixed 2KB size (rest will be null bytes)
193+
paddedData := make([]byte, chunkSize)
194+
copy(paddedData, data)
195+
170196
// Upload this chunk (key includes zero-padded chunk index)
171197
chunkKey := fmt.Sprintf("applies/%s/chunks/%08d.log", run.ID, currentChunk)
172-
err := e.blobStore.UploadBlob(ctx, chunkKey, data)
198+
err := e.blobStore.UploadBlob(ctx, chunkKey, paddedData)
173199

174200
if err == nil {
175201
logMutex.Lock()
176202
lastLogFlush = time.Now()
177-
// Clear buffer to free memory
178-
logBuffer.Reset()
179-
chunkIndex++
180203
logMutex.Unlock()
181204
}
182205
return err
@@ -186,8 +209,8 @@ func (e *ApplyExecutor) ExecuteApply(ctx context.Context, runID string) error {
186209
logMutex.Lock()
187210
logBuffer.WriteString(message)
188211
now := time.Now()
189-
// Flush if buffer exceeds chunk size (256KB) or 1s has passed
190-
shouldFlush := logBuffer.Len() > 256*1024 || now.Sub(lastLogFlush) > 1*time.Second
212+
// Flush if buffer exceeds 2KB or 1s has passed
213+
shouldFlush := logBuffer.Len() > chunkSize || now.Sub(lastLogFlush) > 1*time.Second
191214
logMutex.Unlock()
192215

193216
if shouldFlush {
@@ -334,7 +357,9 @@ func (e *ApplyExecutor) ExecuteApply(ctx context.Context, runID string) error {
334357
}
335358

336359
// Append the actual terraform output to the progress logs
337-
appendLog("\n" + logs)
360+
if !useSandbox {
361+
appendLog("\n" + logs)
362+
}
338363

339364
// Store final status
340365
if applyErr != nil {

taco/internal/tfe/plan.go

Lines changed: 46 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func (h *TfeHandler) GetPlan(c echo.Context) error {
6464
ID: plan.RunID,
6565
},
6666
}
67-
67+
6868
// Only include resource counts when plan is finished
6969
// If we send HasChanges:false before the plan completes, Terraform CLI
7070
// will think there's nothing to apply and won't prompt for confirmation!
@@ -105,16 +105,29 @@ func (h *TfeHandler) GetPlanLogs(c echo.Context) error {
105105
return c.JSON(http.StatusNotFound, map[string]string{"error": "plan not found"})
106106
}
107107

108-
// Read logs from chunked S3 objects
108+
// Read logs from chunked S3 objects (fixed 2KB chunks)
109109
// Chunks are stored as plans/{planID}/chunks/00000001.log, 00000002.log, etc.
110+
const chunkSize = 2 * 1024 // Must match writer's chunk size
111+
112+
// Determine which chunk contains the requested offset to avoid re-downloading
113+
// data the client already has.
114+
startChunk := 1
115+
if offsetInt > 1 { // offset includes STX byte at offset 0
116+
logOffset := offsetInt - 1
117+
startChunk = int(logOffset/chunkSize) + 1
118+
}
119+
120+
// Number of bytes before the first chunk we fetch (used to map offsets)
121+
bytesBefore := int64(chunkSize * (startChunk - 1))
122+
110123
var logText string
111-
chunkIndex := 1
124+
chunkIndex := startChunk
112125
var fullLogs strings.Builder
113-
126+
114127
for {
115128
chunkKey := fmt.Sprintf("plans/%s/chunks/%08d.log", planID, chunkIndex)
116129
logData, err := h.blobStore.DownloadBlob(ctx, chunkKey)
117-
130+
118131
if err != nil {
119132
// Chunk doesn't exist - check if plan is still running
120133
if plan.Status == "finished" || plan.Status == "errored" {
@@ -124,22 +137,26 @@ func (h *TfeHandler) GetPlanLogs(c echo.Context) error {
124137
// Plan still running, this chunk doesn't exist yet
125138
break
126139
}
127-
140+
141+
// Keep chunks at full 2048 bytes (don't trim nulls) for correct offset math
128142
fullLogs.Write(logData)
129143
chunkIndex++
130144
}
131-
145+
132146
logText = fullLogs.String()
133-
134-
// If no chunks exist yet, generate default logs based on status
135-
if logText == "" {
147+
148+
// NOW trim all null bytes from the result (after offset calculation is done)
149+
logText = strings.TrimRight(logText, "\x00")
150+
151+
// If no chunks exist yet, generate default logs based on status (only on first request)
152+
if logText == "" && offsetInt == 0 {
136153
logText = generateDefaultPlanLogs(plan)
137154
}
138155

139156
// Handle offset for streaming with proper byte accounting
140157
// Stream format: [STX at offset 0][logText at offset 1+][ETX at offset 1+len(logText)]
141158
var responseData []byte
142-
159+
143160
if offsetInt == 0 {
144161
// First request: send STX + current logs
145162
responseData = append([]byte{0x02}, []byte(logText)...)
@@ -149,20 +166,26 @@ func (h *TfeHandler) GetPlanLogs(c echo.Context) error {
149166
}
150167
} else {
151168
// Client already received STX (1 byte at offset 0)
152-
// Map stream offset to logText offset: streamOffset=1 → logText[0]
153-
logOffset := offsetInt - 1
154-
169+
// Map stream offset to logText offset:
170+
// - stream offset 0 = STX
171+
// - stream offset 1 = first byte of full logs
172+
// We only fetched chunks starting at startChunk, so subtract bytesBefore.
173+
logOffset := offsetInt - 1 - bytesBefore
174+
if logOffset < 0 {
175+
logOffset = 0
176+
}
177+
155178
if logOffset < int64(len(logText)) {
156179
// Send remaining log text
157180
responseData = []byte(logText[logOffset:])
158-
fmt.Printf("📤 PLAN LOGS at offset=%d: sending %d bytes (logText[%d:])\n",
181+
fmt.Printf("📤 PLAN LOGS at offset=%d: sending %d bytes (logText[%d:])\n",
159182
offsetInt, len(responseData), logOffset)
160-
} else if logOffset == int64(len(logText)) && plan.Status == "finished" {
161-
// All logs sent, send ETX
183+
} else if plan.Status == "finished" || plan.Status == "errored" {
184+
// All logs sent, send ETX to stop polling
162185
responseData = []byte{0x03}
163186
fmt.Printf("📤 Sending ETX (End of Text) for plan %s - logs complete\n", planID)
164187
} else {
165-
// Waiting for more logs or already sent ETX
188+
// Waiting for more logs
166189
responseData = []byte{}
167190
fmt.Printf("📤 PLAN LOGS at offset=%d: no new data (waiting or complete)\n", offsetInt)
168191
}
@@ -206,7 +229,7 @@ func (h *TfeHandler) GetPlanJSONOutput(c echo.Context) error {
206229
// Create dummy resource changes based on our counts
207230
// The CLI checks if this array has entries to decide whether to prompt
208231
resourceChanges := make([]interface{}, 0)
209-
232+
210233
// Add placeholder entries for additions
211234
for i := 0; i < plan.ResourceAdditions; i++ {
212235
resourceChanges = append(resourceChanges, map[string]interface{}{
@@ -215,7 +238,7 @@ func (h *TfeHandler) GetPlanJSONOutput(c echo.Context) error {
215238
},
216239
})
217240
}
218-
241+
219242
// Add placeholder entries for changes
220243
for i := 0; i < plan.ResourceChanges; i++ {
221244
resourceChanges = append(resourceChanges, map[string]interface{}{
@@ -224,7 +247,7 @@ func (h *TfeHandler) GetPlanJSONOutput(c echo.Context) error {
224247
},
225248
})
226249
}
227-
250+
228251
// Add placeholder entries for destructions
229252
for i := 0; i < plan.ResourceDestructions; i++ {
230253
resourceChanges = append(resourceChanges, map[string]interface{}{
@@ -233,7 +256,7 @@ func (h *TfeHandler) GetPlanJSONOutput(c echo.Context) error {
233256
},
234257
})
235258
}
236-
259+
237260
jsonPlan["resource_changes"] = resourceChanges
238261
}
239262

@@ -246,7 +269,7 @@ func generateDefaultPlanLogs(plan *domain.TFEPlan) string {
246269
// Don't show resource counts in logs until plan is finished
247270
// Terraform CLI parses the logs to determine if changes exist!
248271
if plan.Status == "finished" {
249-
return fmt.Sprintf(`Terraform used the selected providers to generate the following execution plan.
272+
return fmt.Sprintf(`Terraform used the selected providers to generate the following execution plan.
250273
Resource actions are indicated with the following symbols:
251274
+ create
252275
- destroy
@@ -260,4 +283,3 @@ Plan: %d to add, %d to change, %d to destroy.
260283
// The CLI will keep polling until it gets real content.
261284
return ""
262285
}
263-

0 commit comments

Comments
 (0)