Skip to content

Commit b3c1adf

Browse files
committed
Add idle timeout for WebWorkerPool with per-slot RefCell
Workers with no pending tasks are automatically terminated after a configurable idle_timeout_ms duration and transparently recreated when new tasks arrive. Uses per-slot RefCell<WorkerSlot> instead of a single RefCell around the whole Vec, allowing independent borrowing so the idle checker and task execution never conflict.
1 parent 13e90e1 commit b3c1adf

7 files changed

Lines changed: 267 additions & 40 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/pool/mod.rs

Lines changed: 166 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
use std::borrow::Borrow;
1+
use std::{borrow::Borrow, cell::RefCell, rc::Rc};
22

33
use futures::future::join_all;
44
use js_sys::wasm_bindgen::{prelude::wasm_bindgen, UnwrapThrowExt};
55
use scheduler::Scheduler;
66
pub use scheduler::Strategy;
77
use serde::{Deserialize, Serialize};
8+
use wasm_bindgen::prelude::Closure;
9+
use wasm_bindgen::JsCast;
810

911
use wasm_bindgen_futures::JsFuture;
1012
use web_sys::window;
@@ -46,6 +48,10 @@ pub struct WorkerPoolOptions {
4648
/// Whether to precompile and share the WASM module across workers for bandwidth optimization.
4749
/// This reduces the number of WASM fetches from N (one per worker) to 1 (shared across all workers).
4850
pub precompile_wasm: Option<bool>,
51+
/// Idle timeout in milliseconds. Workers with no pending tasks will be terminated
52+
/// after being idle for this duration. They are transparently recreated when new tasks arrive.
53+
/// Default: `None` (no timeout, workers live for the pool's lifetime).
54+
pub idle_timeout_ms: Option<u32>,
4955
/// Pre-compiled WASM module to share across workers. Internal use only.
5056
pub(crate) wasm_module: Option<js_sys::WebAssembly::Module>,
5157
}
@@ -109,14 +115,45 @@ impl WorkerPoolOptions {
109115
/// # }
110116
/// # fn main() {}
111117
/// ```
118+
/// The state of a single worker slot in the pool.
119+
enum WorkerSlot {
120+
/// Worker is active and can accept tasks.
121+
Active(WebWorker),
122+
/// Worker is being created (prevents duplicate creation during async init).
123+
Creating,
124+
/// Worker was terminated by idle timeout and can be recreated.
125+
Empty,
126+
}
127+
112128
pub struct WebWorkerPool {
113-
/// The workers that have been spawned.
114-
workers: Vec<WebWorker>,
129+
/// The worker slots (per-slot RefCell for independent borrowing).
130+
slots: Rc<Vec<RefCell<WorkerSlot>>>,
131+
/// The total number of slots (pool capacity).
132+
num_slots: usize,
115133
/// The internal scheduler that is used to distribute the tasks.
116134
scheduler: Scheduler,
117135
/// Pre-compiled WASM module shared across workers (kept alive to prevent dropping)
118136
#[allow(dead_code)]
119137
wasm_module: Option<js_sys::WebAssembly::Module>,
138+
/// Config retained for worker re-creation.
139+
pool_path: Option<String>,
140+
pool_path_bg: Option<String>,
141+
/// Idle checker setInterval closure (prevent GC).
142+
_idle_checker_cb: Option<Closure<dyn FnMut()>>,
143+
/// Idle checker interval ID (for clearInterval on Drop).
144+
_idle_checker_id: Option<i32>,
145+
/// Notify waiting tasks when a worker becomes available after creation.
146+
worker_ready: tokio::sync::Notify,
147+
}
148+
149+
impl Drop for WebWorkerPool {
150+
fn drop(&mut self) {
151+
if let Some(id) = self._idle_checker_id {
152+
if let Some(w) = web_sys::window() {
153+
w.clear_interval_with_handle(id);
154+
}
155+
}
156+
}
120157
}
121158

122159
impl WebWorkerPool {
@@ -172,7 +209,8 @@ impl WebWorkerPool {
172209
options.wasm_module.take()
173210
};
174211

175-
let worker_inits = (0..options.num_workers()).map(|_| {
212+
let num_slots = options.num_workers();
213+
let worker_inits = (0..num_slots).map(|_| {
176214
// Do not impose a task limit.
177215
WebWorker::with_path_and_module(
178216
options.path(),
@@ -184,10 +222,51 @@ impl WebWorkerPool {
184222
let workers = join_all(worker_inits).await;
185223
let workers = workers.into_iter().collect::<Result<Vec<_>, _>>()?;
186224

225+
let slots: Rc<Vec<RefCell<WorkerSlot>>> = Rc::new(
226+
workers
227+
.into_iter()
228+
.map(|w| RefCell::new(WorkerSlot::Active(w)))
229+
.collect(),
230+
);
231+
232+
// Set up idle timeout checker if configured.
233+
let (idle_checker_cb, idle_checker_id) = if let Some(timeout) = options.idle_timeout_ms {
234+
let slots_clone = Rc::clone(&slots);
235+
let cb = Closure::<dyn FnMut()>::new(move || {
236+
let now = js_sys::Date::now();
237+
for i in 0..slots_clone.len() {
238+
let should_terminate = {
239+
let s = slots_clone[i].borrow();
240+
matches!(&*s, WorkerSlot::Active(ref w)
241+
if w.current_load() == 0 && (now - w.last_active()) >= timeout as f64)
242+
};
243+
if should_terminate {
244+
*slots_clone[i].borrow_mut() = WorkerSlot::Empty;
245+
}
246+
}
247+
});
248+
let id = window()
249+
.expect_throw("Window missing")
250+
.set_interval_with_callback_and_timeout_and_arguments_0(
251+
cb.as_ref().unchecked_ref(),
252+
(timeout / 2) as i32,
253+
)
254+
.expect_throw("Could not set interval");
255+
(Some(cb), Some(id))
256+
} else {
257+
(None, None)
258+
};
259+
187260
Ok(Self {
188-
workers,
261+
slots,
262+
num_slots,
189263
scheduler: Scheduler::new(options.strategy()),
190264
wasm_module,
265+
pool_path: options.path.clone(),
266+
pool_path_bg: options.path_bg.clone(),
267+
_idle_checker_cb: idle_checker_cb,
268+
_idle_checker_id: idle_checker_id,
269+
worker_ready: tokio::sync::Notify::new(),
191270
})
192271
}
193272

@@ -256,22 +335,77 @@ impl WebWorkerPool {
256335
self.run_internal(func, arg).await
257336
}
258337

338+
/// Acquires an active worker slot, recreating a terminated worker if needed.
339+
async fn acquire_worker(&self) -> usize {
340+
loop {
341+
let loads = self.compute_loads();
342+
if let Some(id) = self.scheduler.schedule(&loads) {
343+
return id;
344+
}
345+
346+
// No active workers. Find first Empty slot and recreate.
347+
let empty_slot = self
348+
.slots
349+
.iter()
350+
.position(|slot| matches!(&*slot.borrow(), WorkerSlot::Empty));
351+
if let Some(i) = empty_slot {
352+
*self.slots[i].borrow_mut() = WorkerSlot::Creating;
353+
}
354+
355+
if let Some(slot_id) = empty_slot {
356+
let worker = WebWorker::with_path_and_module(
357+
self.pool_path.as_deref(),
358+
self.pool_path_bg.as_deref(),
359+
None,
360+
self.wasm_module.clone(),
361+
)
362+
.await
363+
.expect_throw("Couldn't recreate worker");
364+
*self.slots[slot_id].borrow_mut() = WorkerSlot::Active(worker);
365+
self.worker_ready.notify_waiters();
366+
return slot_id;
367+
}
368+
369+
// All slots are Creating — wait for one to finish.
370+
self.worker_ready.notified().await;
371+
}
372+
}
373+
374+
/// Compute per-slot loads for the scheduler.
375+
fn compute_loads(&self) -> Vec<Option<usize>> {
376+
self.slots
377+
.iter()
378+
.map(|slot| match &*slot.borrow() {
379+
WorkerSlot::Active(w) => Some(w.current_load()),
380+
_ => None,
381+
})
382+
.collect()
383+
}
384+
259385
/// Determines the worker to run a simple task on using the scheduler
260386
/// and runs the task.
387+
// Per-slot RefCell: holding a borrow across await is safe because
388+
// the idle checker only terminates slots with zero load (i.e., not borrowed).
389+
#[allow(clippy::await_holding_refcell_ref)]
261390
pub(crate) async fn run_internal<T, R, A>(&self, func: WebWorkerFn<T, R>, arg: A) -> R
262391
where
263392
A: Borrow<T>,
264393
T: Serialize + for<'de> Deserialize<'de>,
265394
R: Serialize + for<'de> Deserialize<'de>,
266395
{
267-
let worker_id = self.scheduler.schedule(self);
268-
self.workers[worker_id]
269-
.run_internal(func, arg.borrow())
270-
.await
396+
let worker_id = self.acquire_worker().await;
397+
let slot = self.slots[worker_id].borrow();
398+
match &*slot {
399+
WorkerSlot::Active(worker) => worker.run_internal(func, arg.borrow()).await,
400+
_ => unreachable!("acquire_worker guarantees Active slot"),
401+
}
271402
}
272403

273404
/// Determines the worker to run a channel task on using the scheduler
274405
/// and runs the task.
406+
// Per-slot RefCell: holding a borrow across await is safe because
407+
// the idle checker only terminates slots with zero load (i.e., not borrowed).
408+
#[allow(clippy::await_holding_refcell_ref)]
275409
pub(crate) async fn run_channel_internal<T, R>(
276410
&self,
277411
func: WebWorkerChannelFn<T, R>,
@@ -281,20 +415,36 @@ impl WebWorkerPool {
281415
T: Serialize + for<'de> Deserialize<'de>,
282416
R: Serialize + for<'de> Deserialize<'de>,
283417
{
284-
let worker_id = self.scheduler.schedule(self);
285-
self.workers[worker_id]
286-
.run_channel_internal(func, arg)
287-
.await
418+
let worker_id = self.acquire_worker().await;
419+
let slot = self.slots[worker_id].borrow();
420+
match &*slot {
421+
WorkerSlot::Active(worker) => worker.run_channel_internal(func, arg).await,
422+
_ => unreachable!("acquire_worker guarantees Active slot"),
423+
}
288424
}
289425

290426
/// Return the number of tasks currently queued to this worker pool.
291427
pub fn current_load(&self) -> usize {
292-
self.workers.iter().map(WebWorker::current_load).sum()
428+
self.slots
429+
.iter()
430+
.map(|slot| match &*slot.borrow() {
431+
WorkerSlot::Active(w) => w.current_load(),
432+
_ => 0,
433+
})
434+
.sum()
293435
}
294436

295-
/// Return the number of workers in the pool.
437+
/// Return the total number of worker slots in the pool (pool capacity).
296438
pub fn num_workers(&self) -> usize {
297-
self.workers.len()
439+
self.num_slots
440+
}
441+
442+
/// Return the number of currently active (non-terminated) workers.
443+
pub fn num_active_workers(&self) -> usize {
444+
self.slots
445+
.iter()
446+
.filter(|s| matches!(&*RefCell::borrow(s), WorkerSlot::Active(_)))
447+
.count()
298448
}
299449

300450
/// Create a worker pool with a pre-compiled WASM module for optimal bandwidth usage.

src/pool/scheduler.rs

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
use std::cell::Cell;
22

3-
use wasm_bindgen::{prelude::wasm_bindgen, UnwrapThrowExt};
4-
5-
use super::WebWorkerPool;
3+
use wasm_bindgen::prelude::wasm_bindgen;
64

75
/// This enumeration contains the supported strategies for distributing
86
/// tasks within the worker pool.
@@ -46,26 +44,30 @@ impl Scheduler {
4644
}
4745
}
4846

49-
/// Given the pool, apply the strategy and determine which worker
50-
/// should receive the next task.
51-
pub(super) fn schedule(&self, pool: &WebWorkerPool) -> usize {
47+
/// Given per-slot loads, apply the strategy and determine which worker
48+
/// should receive the next task. Returns `None` if no active workers exist.
49+
///
50+
/// Each entry in `loads` is `Some(current_load)` for active workers,
51+
/// or `None` for terminated/creating slots.
52+
pub(super) fn schedule(&self, loads: &[Option<usize>]) -> Option<usize> {
5253
match self.strategy {
5354
Strategy::RoundRobin => {
54-
// Simply return the current worker and increment.
55-
let worker_id = self.current_worker.get();
56-
self.current_worker
57-
.set((worker_id + 1) % pool.num_workers());
58-
worker_id
59-
}
60-
Strategy::LoadBased => {
61-
// Choose the worker with the minimum work load.
62-
pool.workers
63-
.iter()
64-
.enumerate()
65-
.min_by_key(|(_id, worker)| worker.current_load())
66-
.expect_throw("WorkerPool does not have workers")
67-
.0
55+
let num = loads.len();
56+
for _ in 0..num {
57+
let id = self.current_worker.get();
58+
self.current_worker.set((id + 1) % num);
59+
if loads[id].is_some() {
60+
return Some(id);
61+
}
62+
}
63+
None
6864
}
65+
Strategy::LoadBased => loads
66+
.iter()
67+
.enumerate()
68+
.filter_map(|(i, load)| load.map(|l| (i, l)))
69+
.min_by_key(|(_i, load)| *load)
70+
.map(|(i, _)| i),
6971
}
7072
}
7173
}

src/webworker/worker.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::{
2-
cell::RefCell,
2+
cell::{Cell, RefCell},
33
collections::HashMap,
44
rc::Rc,
55
sync::atomic::{AtomicU32, Ordering},
@@ -60,6 +60,8 @@ pub struct WebWorker {
6060
open_tasks: Rc<RefCell<HashMap<u32, oneshot::Sender<Response>>>>,
6161
/// The callback handle for the worker.
6262
_callback: Closure<Callback>,
63+
/// Timestamp (ms since epoch) of the last completed task, used for idle timeout tracking.
64+
last_active: Rc<Cell<f64>>,
6365
}
6466

6567
impl WebWorker {
@@ -182,8 +184,9 @@ impl WebWorker {
182184
}
183185

184186
let tasks = Rc::new(RefCell::new(HashMap::new()));
187+
let last_active = Rc::new(Cell::new(js_sys::Date::now()));
185188

186-
let callback_handle = Self::callback(Rc::clone(&tasks));
189+
let callback_handle = Self::callback(Rc::clone(&tasks), Rc::clone(&last_active));
187190
worker.set_onmessage(Some(callback_handle.as_ref().unchecked_ref()));
188191

189192
Ok(WebWorker {
@@ -192,12 +195,14 @@ impl WebWorker {
192195
current_task: AtomicU32::new(0),
193196
open_tasks: tasks,
194197
_callback: callback_handle,
198+
last_active,
195199
})
196200
}
197201

198202
/// Function to be called when a result is ready.
199203
fn callback(
200204
tasks: Rc<RefCell<HashMap<u32, oneshot::Sender<Response>>>>,
205+
last_active: Rc<Cell<f64>>,
201206
) -> Closure<dyn FnMut(MessageEvent)> {
202207
Closure::new(move |event: MessageEvent| {
203208
let data = event.data();
@@ -210,6 +215,9 @@ impl WebWorker {
210215
// Ignore if receiver is already closed.
211216
let _ = channel.send(response);
212217
}
218+
219+
// Update idle tracking timestamp.
220+
last_active.set(js_sys::Date::now());
213221
})
214222
}
215223

@@ -516,6 +524,12 @@ impl WebWorker {
516524
pub fn current_load(&self) -> usize {
517525
self.open_tasks.borrow().len()
518526
}
527+
528+
/// Return the timestamp (ms since epoch) of the last completed task.
529+
/// Used for idle timeout tracking.
530+
pub fn last_active(&self) -> f64 {
531+
self.last_active.get()
532+
}
519533
}
520534

521535
impl Drop for WebWorker {

0 commit comments

Comments
 (0)