Skip to content

Commit e3916bc

Browse files
author
Piotr Stankiewicz
committed
Reload defunct runners
In case a runner becomes defunct, e.g. as a result of a backend crash it would be neat to be able to reload it. So, if the loader finds runner, have it check if the runner is still alive, and create a new one if the runner is defunct. Signed-off-by: Piotr Stankiewicz <[email protected]>
1 parent 2ee0a36 commit e3916bc

File tree

2 files changed

+55
-15
lines changed

2 files changed

+55
-15
lines changed

pkg/inference/scheduling/loader.go

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -154,15 +154,22 @@ func (l *loader) broadcast() {
154154
}
155155

156156
// evict evicts all unused runners from the loader. If idleOnly is true, then
157-
// only those unused runners which are considered "idle" (based on usage
158-
// timestamp) are evicted. The caller must hold the loader lock. It returns the
159-
// number of remaining runners.
157+
// only those unused, but functioning, runners which are considered "idle" (based
158+
// on usage timestamp) are evicted. Defunct (e.g. crashed) runners will be evicted
159+
// regardless of whether they are considered "idle". The caller must hold the loader
160+
// lock. It returns the number of remaining runners.
160161
func (l *loader) evict(idleOnly bool) int {
161162
now := time.Now()
162163
for r, slot := range l.runners {
163164
unused := l.references[slot] == 0
164165
idle := unused && now.Sub(l.timestamps[slot]) > runnerIdleTimeout
165-
if unused && (!idleOnly || idle) {
166+
defunct := false
167+
select {
168+
case <-l.slots[slot].done:
169+
defunct = true
170+
default:
171+
}
172+
if unused && (!idleOnly || idle || defunct) {
166173
l.log.Infof("Evicting %s backend runner with model %s in %s mode",
167174
r.backend, r.model, r.mode,
168175
)
@@ -179,11 +186,11 @@ func (l *loader) evict(idleOnly bool) int {
179186

180187
// evictRunner evicts a specific runner. The caller must hold the loader lock.
181188
// It returns the number of remaining runners.
182-
func (l *loader) evictRunner(backend, model string) int {
189+
func (l *loader) evictRunner(backend, model string, mode inference.BackendMode) int {
183190
allBackends := backend == ""
184191
for r, slot := range l.runners {
185192
unused := l.references[slot] == 0
186-
if unused && (allBackends || r.backend == backend) && r.model == model {
193+
if unused && (allBackends || r.backend == backend) && r.model == model && r.mode == mode {
187194
l.log.Infof("Evicting %s backend runner with model %s in %s mode",
188195
r.backend, r.model, r.mode,
189196
)
@@ -210,7 +217,10 @@ func (l *loader) Unload(ctx context.Context, unload UnloadRequest) int {
210217
return l.evict(false)
211218
} else {
212219
for _, model := range unload.Models {
213-
l.evictRunner(unload.Backend, model)
220+
// Evict both, completion and embedding models. We should consider
221+
// accepting a mode parameter in unload requests.
222+
l.evictRunner(unload.Backend, model, inference.BackendModeCompletion)
223+
l.evictRunner(unload.Backend, model, inference.BackendModeEmbedding)
214224
}
215225
return len(l.runners)
216226
}
@@ -364,6 +374,8 @@ func (l *loader) load(ctx context.Context, backendName, model string, mode infer
364374

365375
// Loop until we can satisfy the request or an error occurs.
366376
for {
377+
slot := -1
378+
367379
// If loads are disabled, then there's nothing we can do.
368380
if !l.loadsEnabled {
369381
return nil, errLoadsDisabled
@@ -372,9 +384,15 @@ func (l *loader) load(ctx context.Context, backendName, model string, mode infer
372384
// See if we can satisfy the request with an existing runner.
373385
existing, ok := l.runners[runnerKey{backendName, model, mode}]
374386
if ok {
375-
l.references[existing] += 1
376-
l.timestamps[existing] = time.Time{}
377-
return l.slots[existing], nil
387+
select {
388+
case <-l.slots[existing].done:
389+
l.log.Warnf("%s runner for %s is defunct. Waiting for it to be evicted.", backendName, model)
390+
goto WaitForChange
391+
default:
392+
l.references[existing] += 1
393+
l.timestamps[existing] = time.Time{}
394+
return l.slots[existing], nil
395+
}
378396
}
379397

380398
// If there's not sufficient memory or all slots are full, then try
@@ -384,7 +402,6 @@ func (l *loader) load(ctx context.Context, backendName, model string, mode infer
384402
}
385403

386404
// If there's sufficient memory and a free slot, then find the slot.
387-
slot := -1
388405
if memory <= l.availableMemory && len(l.runners) < len(l.slots) {
389406
for s, runner := range l.slots {
390407
if runner == nil {
@@ -432,6 +449,7 @@ func (l *loader) load(ctx context.Context, backendName, model string, mode infer
432449
// Wait for something to change. Note that we always re-lock with
433450
// context.Background() because we need to ensure we hold the lock by
434451
// the time we return.
452+
WaitForChange:
435453
l.unlock()
436454
select {
437455
case <-ctx.Done():
@@ -455,13 +473,19 @@ func (l *loader) release(runner *runner) {
455473
// Decrement the runner's reference count.
456474
l.references[slot] -= 1
457475

458-
// If the runner's reference count is now zero, then record now as its idle
459-
// start time and signal the idle checker.
476+
// If the runner's reference count is now zero, then check if it is still
477+
// active, and record now as its idle start time and signal the idle
478+
// checker.
460479
if l.references[slot] == 0 {
461-
l.timestamps[slot] = time.Now()
462480
select {
463-
case l.idleCheck <- struct{}{}:
481+
case <-runner.done:
482+
l.evictRunner(runner.backend.Name(), runner.model, runner.mode)
464483
default:
484+
l.timestamps[slot] = time.Now()
485+
select {
486+
case l.idleCheck <- struct{}{}:
487+
default:
488+
}
465489
}
466490
}
467491

pkg/inference/scheduling/runner.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,22 @@ func run(
134134
proxyLog: proxyLog,
135135
}
136136

137+
proxy.ErrorHandler = func(w http.ResponseWriter, req *http.Request, err error) {
138+
// If the error is EOF, the underlying runner likely bailed, and closed its socket
139+
// unexpectedly. Wait for the runner process to complete, but time out in case
140+
// the runner process only killed its comms and is stuck.
141+
if errors.Is(err, io.EOF) {
142+
w.WriteHeader(http.StatusInternalServerError)
143+
select {
144+
case <-r.done:
145+
return
146+
case <-time.After(30 * time.Second):
147+
}
148+
} else {
149+
w.WriteHeader(http.StatusBadGateway)
150+
}
151+
}
152+
137153
// Start the backend run loop.
138154
go func() {
139155
if err := backend.Run(runCtx, socket, model, mode); err != nil {

0 commit comments

Comments
 (0)