diff --git a/R/languageserver.R b/R/languageserver.R index f3ca847f..fb0df21e 100644 --- a/R/languageserver.R +++ b/R/languageserver.R @@ -23,23 +23,18 @@ LanguageServer <- R6::R6Class("LanguageServer", inputcon = NULL, outputcon = NULL, exit_flag = NULL, - documents = NULL, workspace = NULL, - processId = NULL, rootUri = NULL, rootPath = NULL, initializationOptions = NULL, ClientCapabilities = NULL, ServerCapabilities = NULL, - diagnostics_task_manager = NULL, parse_task_manager = NULL, resolve_task_manager = NULL, - pending_replies = NULL, - initialize = function(host, port) { if (is.null(port)) { logger$info("connection type: stdio") @@ -58,34 +53,23 @@ LanguageServer <- R6::R6Class("LanguageServer", self$inputcon <- inputcon self$outputcon <- outputcon - cpus <- parallel::detectCores() - # Performance optimization: Allow more workers and scale with CPU count - # Old default: min(max(floor(cpus / 2), 1), 3) - capped at 3 - # New default: min(max(cpus - 1, 2), 8) - scale up to 8 workers - default_pool_size <- min(max(cpus - 1, 2), 8) - pool_size <- as.integer( - Sys.getenv("R_LANGSVR_POOL_SIZE", default_pool_size)) - - # parse pool - increase size for better throughput - # Parse operations are CPU-bound and can benefit from parallelism - parse_pool <- if (pool_size > 0) SessionPool$new(pool_size, "parse") else NULL - # diagnostics is slower, so use a separate pool - # Diagnostics can use slightly fewer workers since they're I/O heavy - diagnostics_pool_size <- min(max(floor(pool_size * 0.75), 1), pool_size) - diagnostics_pool <- if (pool_size > 0) SessionPool$new(diagnostics_pool_size, "diagnostics") else NULL - - self$parse_task_manager <- TaskManager$new("parse", parse_pool) - self$diagnostics_task_manager <- TaskManager$new("diagnostics", diagnostics_pool) + self$parse_task_manager <- TaskManager$new( + "parse", + use_session = TRUE, process_recent_first = TRUE + ) + self$diagnostics_task_manager <- TaskManager$new( + "diagnostics", + use_session = TRUE, process_recent_first = TRUE + ) # no pool for resolve task # resolve task require a new session for every task - self$resolve_task_manager <- TaskManager$new("resolve", NULL) + self$resolve_task_manager <- TaskManager$new("resolve") self$pending_replies <- collections::dict() super$initialize() }, - process_events = function() { self$diagnostics_task_manager$run_tasks() self$diagnostics_task_manager$check_tasks() @@ -97,11 +81,8 @@ LanguageServer <- R6::R6Class("LanguageServer", self$workspace$poll_namespace_file() } }, - - text_sync = function( - # TODO: move it to Workspace!? - uri, document, run_lintr = FALSE, parse = FALSE, delay = 0) { - + text_sync = function( # TODO: move it to Workspace!? + uri, document, run_lintr = FALSE, parse = FALSE, delay = 0) { if (!self$pending_replies$has(uri)) { self$pending_replies$set(uri, list( `textDocument/documentSymbol` = collections::queue(), @@ -131,7 +112,6 @@ LanguageServer <- R6::R6Class("LanguageServer", ) } }, - check_connection = function() { if (!isOpen(self$inputcon)) { self$exit_flag <- TRUE @@ -142,13 +122,11 @@ LanguageServer <- R6::R6Class("LanguageServer", self$exit_flag <- TRUE } }, - write_text = function(text) { # we have made effort to ensure that text is utf-8 # so text is printed as is writeLines(text, self$outputcon, sep = "", useBytes = TRUE) }, - read_line = function() { if (self$tcp) { if (socketSelect(list(self$inputcon), timeout = 0)) { @@ -160,7 +138,6 @@ LanguageServer <- R6::R6Class("LanguageServer", stdin_read_line() } }, - read_char = function(n) { if (self$tcp) { out <- readChar(self$inputcon, n, useBytes = TRUE) @@ -170,24 +147,34 @@ LanguageServer <- R6::R6Class("LanguageServer", stdin_read_char(n) } }, - run = function() { + on.exit( + { + if (!is.null(self$parse_task_manager)) self$parse_task_manager$stop() + if (!is.null(self$diagnostics_task_manager)) self$diagnostics_task_manager$stop() + if (!is.null(self$resolve_task_manager)) self$resolve_task_manager$stop() + }, + add = TRUE + ) while (TRUE) { - ret <- tryCatchStack({ - if (isTRUE(self$exit_flag)) { - logger$info("exiting") - break - } + ret <- tryCatchStack( + { + if (isTRUE(self$exit_flag)) { + logger$info("exiting") + break + } - self$process_events() + self$process_events() - data <- self$fetch(blocking = FALSE) - if (is.null(data)) { - Sys.sleep(0.1) - next - } - self$handle_raw(data) - }, error = function(e) e) + data <- self$fetch(blocking = FALSE) + if (is.null(data)) { + Sys.sleep(0.1) + next + } + self$handle_raw(data) + }, + error = function(e) e + ) if (inherits(ret, "error")) { logger$error(ret) logger$error("exiting") diff --git a/R/session.R b/R/session.R index a27bff0f..e69de29b 100644 --- a/R/session.R +++ b/R/session.R @@ -1,253 +0,0 @@ -#' Single R Session for Session Pool -#' @import callr -#' @examples -#' \dontrun{ -#' # create a session and bind to a session pool -#' # this step is done in SessionPool$new() -#' session <- Session$new("session id", parent_pool_ptr, pool_name) -#' -#' # task acquire a session from session pool -#' # this step is done in SessionPool$acquire() -#' -#' # start task -#' session$start(target, args) -#' -#' # check session result with is_alive -#' session$is_alive() -#' -#' # when task is running -#' session$is_alive() == TRUE -#' -#' # when task is done -#' session$is_alive() == FALSE -#' -#' # get result when task is done -#' session$get_result() -#' -#' # session must be released when task is done -#' session$release() -#' -#' #' # use restart() to restart session -#' session$restart() -#' -#' # use kill() to restart and release session -#' session$kill() -#' -#' # session will be restarted on error -#' # you can manually “kill” the process in bash to test this behavior -#' } -#' @noRd -Session <- R6::R6Class("Session", - private = list( - # parent session poll - dependency injection - parent_pool = NULL, - pool_name = NULL, - # session id - be released by parent - id = NULL, - session = NULL, - # prev task is running - is_running = FALSE, - # r session is inited and will run call - is_ready = FALSE, - # before r session is_ready, task is saved in init_task - init_task = NULL, - result = NULL, - finalize = function() { - private$session$close() - }, - # safe call with try catch - call = function(target, args) { - ret <- tryCatch({ - private$session$call(target, args) - }, error = function(e) e) - if (inherits(ret, "error")) { - private$result <- ret - logger$error(private$pool_name, "session call error", private$id, ret) - # on call error, skip current task and restart - self$restart() - private$is_running <- FALSE - } - } - ), - public = list( - initialize = function(id, parent_pool, pool_name) { - private$parent_pool <- parent_pool - private$pool_name <- pool_name - private$id <- id - - private$session <- callr::r_session$new( - callr::r_session_options( - system_profile = TRUE, user_profile = TRUE, supervise = TRUE), - # skip waiting - wait = FALSE - ) - }, - start = function(target, args) { - if (private$is_running) { - # session must stop running before release - logger$error(private$pool_name, "session race condition: prev function call is running in session", private$id) - } - private$is_running <- TRUE - private$result <- NULL - - if (private$is_ready) { - private$call(target, args) - } else { - private$init_task <- list(target = target, args = args) - } - }, - # TRUE for running process, FALSE for compeletion - is_alive = function() { - if (!private$is_running) { - # FALSE for compeletion - return(FALSE) - } - - # more about read status code - # https://callr.r-lib.org/reference/r_session.html#method-read - data <- private$session$read() - if (!is.null(data)) { - # session is not ready - if (!private$is_ready) { - if (data$code == 201) { - logger$info(private$pool_name, "session ready", private$id, Sys.time()) - - private$is_ready <- TRUE - private$call(private$init_task$target, private$init_task$args) - private$init_task <- NULL - } else { - logger$error(private$pool_name, "session init error", private$id, data) - # restart on init error, task is perserved in init_task - self$restart() - } - # continue running - return(TRUE) - } - - # session is ready - if (data$code == 200) { - if (!is.null(data$error)) { - private$result <- data$error - } else { - private$result <- data$result - } - } else { - # error data code 301, 500, 501, 502 - logger$error(private$pool_name, "session read error", private$id, data) - # on read error, skip current task and restart - self$restart() - } - private$is_running <- FALSE - # FALSE for compeletion - return(FALSE) - } - - # TRUE for running process - return(private$is_running) - }, - get_result = function() { - private$result - }, - restart = function(should_release = FALSE) { - logger$info(private$pool_name, "session restart", private$id) - ret <- tryCatch({ - private$session$close(grace = 100) - }, error = function(e) e) - if (inherits(ret, "error")) { - logger$error(private$pool_name, "session kill error", ret) - } - - private$session <- callr::r_session$new( - callr::r_session_options(system_profile = TRUE, user_profile = TRUE), - wait = FALSE - ) - private$is_ready <- FALSE - - if (should_release) { - private$is_running <- FALSE - self$release() - } - }, - kill = function() { - logger$info(private$pool_name, "session kill", private$id) - self$restart(should_release = TRUE) - }, - # release current session from session pool - release = function() { - private$parent_pool$release(private$id) - } - ) -) - -#' Session Pool with Many R Sessions -#' -#' @examples -#' \dontrun{ -#' # create 3 cached r sessions in the pool -#' pool_size <- 3 -#' pool_name <- "common" -#' pool <- SessionPool$new(pool_size, pool_name) -#' -#' # check idle_size before acquiring session -#' n <- pool$get_idle_size() -#' -#' # if there are idle sessions, acquire session -#' session <- pool$acquire() -#' -#' # check session is not null and then use session -#' is.null(session) -#' -#' # use session -#' # please read R6 Class `Session` documentation -#' -#' } -#' @noRd -SessionPool <- R6::R6Class("SessionPool", - private = list( - pool_name = NULL, - idle_keys = NULL, - sessions = NULL, - pending_size = 0, - idle_size = 0, - size = 0 - ), - public = list( - initialize = function(size, name) { - private$pool_name <- name - private$sessions <- collections::ordered_dict() - private$idle_keys <- collections::queue() - - if (size > 0) { - private$size <- size - for (i in seq_len(size)) { - istr <- as.character(i) - private$sessions$set(istr, Session$new(istr, self, name)) - private$idle_keys$push(istr) - private$idle_size <- private$idle_size + 1 - } - } - }, - get_idle_size = function() { - private$idle_size - }, - acquire = function() { - if (private$idle_size > 0) { - private$idle_size <- private$idle_size - 1 - session_id <- private$idle_keys$pop() - if (!is.null(session_id)) { - # FIXME: remove debug - logger$info(private$pool_name, "session acquired session_id =", session_id, "remain pool size =", private$idle_size) - return(private$sessions$get(session_id)) - } - } - return(NULL) - }, - # called by child session - release = function(id) { - private$idle_keys$push(id) - private$idle_size <- private$idle_size + 1 - # FIXME: remove debug - logger$info(private$pool_name, "session released session_id =", id, "remain pool size =", private$idle_size) - } - ) -) diff --git a/R/task.R b/R/task.R index 4ad1897a..9b76edf5 100644 --- a/R/task.R +++ b/R/task.R @@ -1,11 +1,11 @@ Task <- R6::R6Class("Task", private = list( process = NULL, + session = NULL, target = NULL, args = NULL, callback = NULL, - error = NULL, - should_release = FALSE + error = NULL ), public = list( time = NULL, @@ -18,73 +18,165 @@ Task <- R6::R6Class("Task", self$time <- Sys.time() self$delay <- delay }, - start = function(session) { - # no session, use new session + start = function(session = NULL) { if (is.null(session)) { - private$should_release <- FALSE private$process <- callr::r_bg( private$target, private$args, system_profile = TRUE, user_profile = TRUE ) } else { - # acquired session, should release in the future - private$should_release <- TRUE - private$process <- session - private$process$start(private$target, private$args) + private$session <- session + private$session$call( + private$target, + private$args + ) } }, check = function() { + if (!is.null(private$session)) { + res <- private$session$read() + if (!is.null(res)) { + if (res$code == 200 && is.null(res$error)) { + if (!is.null(private$callback)) private$callback(res$result) + return(TRUE) + } else if (!is.null(res$code)) { + if (!is.null(private$error)) { + err <- res$error + if (is.null(err)) err <- simpleError(paste("Session error with code", res$code)) + private$error(err) + } + return(TRUE) + } + } + state <- private$session$get_state() + if (identical(state, "finished")) { + if (!is.null(private$error)) { + err <- simpleError("Session finished unexpectedly while task was running") + private$error(err) + } + return(TRUE) + } + return(FALSE) + } + if (is.null(private$process)) { FALSE } else if (private$process$is_alive()) { FALSE } else { - result <- NULL - # release session - if (private$should_release) { - result <- private$process$get_result() - private$process$release() - } else { - # r_bg$get_result() will throw - result <- tryCatch(private$process$get_result(), error = function(e) e) - } + # r_bg$get_result() will throw + result <- tryCatch(private$process$get_result(), error = function(e) e) - if (!is.null(private$callback)) { - if (inherits(result, "error")) { - if (!is.null(private$error)) { - private$error(result) - } - } else { - private$callback(result) + if (inherits(result, "error")) { + if (!is.null(private$error)) { + private$error(result) } + } else if (!is.null(private$callback)) { + private$callback(result) } TRUE } }, kill = function() { - private$process$kill() + if (!is.null(private$session)) { + if (!identical(Sys.getenv("R_COVR"), "true")) { + # Do not close the session, it is persistent and managed by TaskManager. + # Just try to interrupt the ongoing computation. + private$session$interrupt() + } + } else if (!is.null(private$process) && private$process$is_alive()) { + if (identical(Sys.getenv("R_COVR"), "true")) { + private$process$wait() + } else { + private$process$wait(1000) + private$process$kill() + } + } } ) ) TaskManager <- R6::R6Class("TaskManager", private = list( - cpus = NULL, pending_tasks = NULL, running_tasks = NULL, - session_pool = NULL, name = NULL, - use_session = FALSE + use_session = NULL, + sessions = NULL, + process_recent_first = NULL, + max_running_tasks = NULL, + session_idle_timeout = NULL, + find_or_create_session = function() { + if (!isTRUE(private$use_session)) { + return(NULL) + } + + for (s in private$sessions) { + state <- s$get_state() + if (state == "starting") { + res <- s$read() + if (!is.null(res) && res$code == 201) state <- s$get_state() + } + if (state == "idle") { + return(s) + } + } + + if (length(private$sessions) < private$max_running_tasks) { + session <- callr::r_session$new( + options = callr::r_session_options( + system_profile = TRUE, + user_profile = TRUE + ), + wait = TRUE + ) + private$sessions <- append(private$sessions, session) + return(session) + } + + NULL + }, + prune_sessions = function() { + for (i in rev(seq_along(private$sessions))) { + session <- private$sessions[[i]] + state <- session$get_state() + if (state == "finished") { + private$sessions[[i]] <- NULL + } else if (state == "idle") { + idle_start <- attr(session, "idle_start") + if (is.null(idle_start)) { + attr(session, "idle_start") <- Sys.time() + } else if (as.numeric(difftime(Sys.time(), idle_start, units = "secs")) > private$session_idle_timeout) { + session$close() + private$sessions[[i]] <- NULL + } + } else { + attr(session, "idle_start") <- NULL + } + } + } ), public = list( - initialize = function(name, session_pool = NULL) { - private$cpus <- parallel::detectCores() + initialize = function(name, + use_session = FALSE, + process_recent_first = FALSE, + cpu_load = 0.5, + max_running_tasks = 8, + session_idle_timeout = 60) { private$pending_tasks <- collections::ordered_dict() private$running_tasks <- collections::ordered_dict() - private$session_pool <- session_pool - private$use_session <- !is.null(session_pool) private$name <- name + private$use_session <- use_session + private$process_recent_first <- process_recent_first + + private$session_idle_timeout <- session_idle_timeout + cpus <- min(parallel::detectCores()) + max_running_tasks <- min(cpus, max_running_tasks) + private$max_running_tasks <- max(min(max_running_tasks, round(cpus * cpu_load)), 1) + if (use_session) { + private$sessions <- list() + } }, add_task = function(id, task) { if (is.null(task)) { @@ -92,47 +184,37 @@ TaskManager <- R6::R6Class("TaskManager", } private$pending_tasks$set(id, task) }, - run_tasks = function(cpu_load = 0.5) { - n <- 0 - if (private$use_session) { - n <- private$session_pool$get_idle_size() - } else { - # use r_bg - # Performance: Increase CPU load factor for better resource utilization - # Old: cpu_load = 0.5 was conservative - # New: Allow higher utilization for better throughput - effective_cpu_load <- if (private$name == "parse") 0.8 else cpu_load - n <- max(max(private$cpus * effective_cpu_load, 1) - private$running_tasks$size(), 0) - } + run_tasks = function() { + n <- max(private$max_running_tasks - private$running_tasks$size(), 0) - ids <- private$pending_tasks$keys() + pending_ids <- private$pending_tasks$keys() # Performance: Prioritize newer tasks over older for better responsiveness # For parse tasks, process most recent documents first - if (length(ids) > n && private$name == "parse") { + if (length(pending_ids) > n && isTRUE(private$process_recent_first)) { # Take the most recent n tasks - ids <- tail(ids, n) - } else if (length(ids) > n) { - ids <- ids[seq_len(n)] + pending_ids <- tail(pending_ids, n) + } else if (length(pending_ids) > n) { + pending_ids <- pending_ids[seq_len(n)] } - for (id in ids) { + + for (id in pending_ids) { task <- private$pending_tasks$get(id) if (Sys.time() - task$time >= task$delay) { session <- NULL - if (private$use_session) { - session <- private$session_pool$acquire() - if (is.null(session)) { - # get invalid session + + if (isTRUE(private$use_session)) { + session <- private$find_or_create_session() + if (is.null(session) || session$get_state() == "starting") { next } } if (private$running_tasks$has(id)) { - task <- private$running_tasks$pop(id) - task$kill() + old_task <- private$running_tasks$pop(id) + old_task$kill() } task <- private$pending_tasks$pop(id) private$running_tasks$set(id, task) - # maybe acquired session, will need to be released on check task$start(session) } } @@ -140,7 +222,6 @@ TaskManager <- R6::R6Class("TaskManager", check_tasks = function() { running_tasks <- private$running_tasks keys <- private$running_tasks$keys() - pending_tasks <- private$pending_tasks for (key in keys) { task <- running_tasks$get(key) if (task$check()) { @@ -149,6 +230,22 @@ TaskManager <- R6::R6Class("TaskManager", running_tasks$remove(key) } } + if (isTRUE(private$use_session)) { + private$prune_sessions() + } + }, + stop = function() { + for (id in private$running_tasks$keys()) { + task <- private$running_tasks$get(id) + task$kill() + } + if (private$use_session) { + for (session in private$sessions) { + if (!identical(Sys.getenv("R_COVR"), "true")) { + session$close() + } + } + } } ) ) diff --git a/tests/testthat/helper-utils.R b/tests/testthat/helper-utils.R index 60ff045c..55e400c3 100644 --- a/tests/testthat/helper-utils.R +++ b/tests/testthat/helper-utils.R @@ -49,17 +49,14 @@ language_client <- function(working_dir = getwd(), diagnostics = FALSE, capabili client %>% notify( "workspace/didChangeConfiguration", list(settings = list(diagnostics = diagnostics))) withr::defer_parent({ - # it is sometimes necessary to shutdown the server probably - # we skip this for other times for speed - if (Sys.getenv("R_LANGSVR_TEST_FAST", "YES") == "NO") { - client %>% respond("shutdown", NULL, retry = FALSE) - client$process$wait(10 * 1000) # 10 sec - if (client$process$is_alive()) { - cat("server did not shutdown peacefully\n") - client$process$kill_tree() + client %>% respond("shutdown", NULL, retry = FALSE) + if (client$process$is_alive()) { + if (identical(Sys.getenv("R_COVR"), "true")) { + client$process$wait() + } else { + client$process$wait(1000) + client$process$kill() } - } else { - client$process$kill_tree() } }) client diff --git a/tests/testthat/test-task.R b/tests/testthat/test-task.R new file mode 100644 index 00000000..f4083e01 --- /dev/null +++ b/tests/testthat/test-task.R @@ -0,0 +1,143 @@ +for (covr in c("false", "true")) { + withr::with_envvar(list(R_COVR = covr), { + test_that(paste("Task creation works", covr), { + task <- create_task(function() 1 + 1, list()) + expect_s3_class(task, "Task") + }) + + test_that(paste("TaskManager runs task without session", covr), { + tm <- TaskManager$new("test1", use_session = FALSE) + + result <- NULL + cb <- function(res) { + result <<- res + } + + task <- create_task(function(x) x + 1, list(x = 1), callback = cb) + tm$add_task("t1", task) + + tm$run_tasks() + + # Wait for the task to finish + for (i in 1:10) { + Sys.sleep(0.5) + tm$check_tasks() + if (!is.null(result)) break + } + + expect_equal(result, 2) + tm$stop() + }) + + test_that(paste("TaskManager runs task with session", covr), { + tm <- TaskManager$new("test2", use_session = TRUE) + + result <- NULL + cb <- function(res) { + result <<- res + } + + task <- create_task(function(x) x + 1, list(x = 1), callback = cb) + tm$add_task("t1", task) + + # Sessions take time to start, so we might need multiple iterations + for (i in 1:15) { + tm$run_tasks() + Sys.sleep(0.5) + tm$check_tasks() + if (!is.null(result)) break + } + + expect_equal(result, 2) + tm$stop() + }) + + test_that(paste("Task handles errors with session", covr), { + tm <- TaskManager$new("test3", use_session = TRUE) + + err_res <- NULL + err_cb <- function(e) { + err_res <<- e + } + + task <- create_task(function() stop("test error"), list(), error = err_cb) + tm$add_task("t2", task) + + for (i in 1:15) { + tm$run_tasks() + Sys.sleep(0.5) + tm$check_tasks() + if (!is.null(err_res)) break + } + + expect_true(inherits(err_res, "error")) + expect_match(err_res$parent$message, "test error") + tm$stop() + }) + + test_that(paste("Task handles errors without session", covr), { + tm <- TaskManager$new("test4", use_session = FALSE) + + err_res <- NULL + err_cb <- function(e) { + err_res <<- e + } + + task <- create_task(function() stop("test error"), list(), error = err_cb) + tm$add_task("t2", task) + + tm$run_tasks() + + for (i in 1:10) { + Sys.sleep(0.5) + tm$check_tasks() + if (!is.null(err_res)) break + } + + expect_true(inherits(err_res, "error")) + expect_match(err_res$parent$message, "test error") + tm$stop() + }) + }) +} + +test_that("TaskManager prunes idle sessions", { + skip_on_cran() + + # Initialize TaskManager with a short timeout + tm <- TaskManager$new("test", use_session = TRUE, session_idle_timeout = 2) + + # Create a dummy task + task <- create_task(function() 1, list()) + tm$add_task("1", task) + + # Run the task + tm$run_tasks() + + # Wait for task completion + start_time <- Sys.time() + while (length(tm$.__enclos_env__$private$running_tasks$keys()) > 0 || + length(tm$.__enclos_env__$private$pending_tasks$keys()) > 0) { + tm$check_tasks() + tm$run_tasks() + if (Sys.time() - start_time > 10) stop("Task timed out") + Sys.sleep(0.1) + } + + # Verify session is idle + sessions <- tm$.__enclos_env__$private$sessions + expect_length(sessions, 1) + expect_equal(sessions[[1]]$get_state(), "idle") + + # Wait for timeout + Sys.sleep(5) + + # Trigger pruning + tm$check_tasks() + + # Verify session is removed + sessions <- tm$.__enclos_env__$private$sessions + expect_length(sessions, 0) + + tm$stop() +})