diff --git a/src/production/connection_optimized.rs b/src/production/connection_optimized.rs index 6c49b6f..cb969dd 100644 --- a/src/production/connection_optimized.rs +++ b/src/production/connection_optimized.rs @@ -84,6 +84,9 @@ pub struct OptimizedConnectionHandler { transaction_errors: bool, /// Watched keys with their values at WATCH time (for optimistic locking) watched_keys: Vec<(String, RespValue)>, + /// Timestamp for the current read batch — amortizes clock_gettime syscalls + /// across all commands in a pipeline batch instead of 2 syscalls per command. + batch_start: Instant, } impl OptimizedConnectionHandler @@ -168,6 +171,7 @@ where transaction_queue: Vec::new(), transaction_errors: false, watched_keys: Vec::new(), + batch_start: Instant::now(), } } @@ -191,6 +195,8 @@ where break; } Ok(n) => { + self.batch_start = Instant::now(); + if self.buffer.len() + n > self.config.max_buffer_size { error!( "Buffer overflow from {}, closing connection", @@ -220,9 +226,8 @@ where if get_count >= batch_threshold { // Batch execute multiple GETs concurrently - let start = Instant::now(); let results = self.state.fast_batch_get_pipeline(get_keys).await; - let duration_ms = start.elapsed().as_secs_f64() * 1000.0; + let duration_ms = self.batch_start.elapsed().as_secs_f64() * 1000.0; for response in &results { let success = !matches!(response, RespValue::Error(_)); @@ -242,10 +247,9 @@ where if set_count >= batch_threshold { // Batch execute multiple SETs concurrently - let start = Instant::now(); let results = self.state.fast_batch_set_pipeline(set_pairs).await; - let duration_ms = start.elapsed().as_secs_f64() * 1000.0; + let duration_ms = self.batch_start.elapsed().as_secs_f64() * 1000.0; for response in &results { let success = !matches!(response, RespValue::Error(_)); @@ -337,7 +341,6 @@ where Ok(Some(resp_value)) => match Command::from_resp_zero_copy(&resp_value) { Ok(cmd) => { let cmd_name = cmd.name(); - let start = Instant::now(); // Handle connection-level transaction state let response = if self.in_transaction { @@ -535,7 +538,7 @@ where } }; - let duration_ms = start.elapsed().as_secs_f64() * 1000.0; + let duration_ms = self.batch_start.elapsed().as_secs_f64() * 1000.0; let success = !matches!(&response, RespValue::Error(_)); self.metrics.record_command(cmd_name, duration_ms, success); @@ -1156,12 +1159,10 @@ where break; // Need more data } - // Extract key - let key = bytes::Bytes::copy_from_slice(&buf[key_start..key_start + key_len]); + // Zero-copy: split consumed bytes from buffer, freeze to Bytes, then slice the key + let consumed = self.buffer.split_to(total_needed).freeze(); + let key = consumed.slice(key_start..key_start + key_len); keys.push(key); - - // Consume this GET from buffer - let _ = self.buffer.split_to(total_needed); } let count = keys.len(); @@ -1242,13 +1243,11 @@ where break; // Need more data } - // Extract key and value - let key = bytes::Bytes::copy_from_slice(&buf[key_start..key_end]); - let value = bytes::Bytes::copy_from_slice(&buf[val_start..val_start + val_len]); + // Zero-copy: split consumed bytes, freeze, then slice key and value + let consumed = self.buffer.split_to(total_needed).freeze(); + let key = consumed.slice(key_start..key_end); + let value = consumed.slice(val_start..val_start + val_len); pairs.push((key, value)); - - // Consume this SET from buffer - let _ = self.buffer.split_to(total_needed); } let count = pairs.len(); @@ -1320,16 +1319,13 @@ where return FastPathResult::NeedMoreData; } - // Extract key as bytes::Bytes (zero-copy from buffer) - let key = bytes::Bytes::copy_from_slice(&buf[key_start..key_start + key_len]); - - // Consume the parsed bytes from buffer - let _ = self.buffer.split_to(total_needed); + // Zero-copy: split consumed bytes, freeze, then slice key + let consumed = self.buffer.split_to(total_needed).freeze(); + let key = consumed.slice(key_start..key_start + key_len); // Execute fast GET using pooled response slot (avoids oneshot allocation) - let start = Instant::now(); let response = self.state.pooled_fast_get(key).await; - let duration_ms = start.elapsed().as_secs_f64() * 1000.0; + let duration_ms = self.batch_start.elapsed().as_secs_f64() * 1000.0; let success = !matches!(&response, RespValue::Error(_)); self.metrics.record_command("GET", duration_ms, success); @@ -1398,17 +1394,14 @@ where return FastPathResult::NeedMoreData; } - // Extract key and value as bytes::Bytes - let key = bytes::Bytes::copy_from_slice(&buf[key_start..key_end]); - let value = bytes::Bytes::copy_from_slice(&buf[val_start..val_start + val_len]); - - // Consume the parsed bytes - let _ = self.buffer.split_to(total_needed); + // Zero-copy: split consumed bytes, freeze, then slice key and value + let consumed = self.buffer.split_to(total_needed).freeze(); + let key = consumed.slice(key_start..key_end); + let value = consumed.slice(val_start..val_start + val_len); // Execute fast SET using pooled response slot (avoids oneshot allocation) - let start = Instant::now(); let response = self.state.pooled_fast_set(key, value).await; - let duration_ms = start.elapsed().as_secs_f64() * 1000.0; + let duration_ms = self.batch_start.elapsed().as_secs_f64() * 1000.0; let success = !matches!(&response, RespValue::Error(_)); self.metrics.record_command("SET", duration_ms, success); diff --git a/src/production/perf_config.rs b/src/production/perf_config.rs index 6023997..1e4746e 100644 --- a/src/production/perf_config.rs +++ b/src/production/perf_config.rs @@ -25,6 +25,10 @@ pub struct PerformanceConfig { /// Batching configuration #[serde(default)] pub batching: BatchingConfig, + + /// Connection pool configuration + #[serde(default)] + pub connection_pool: ConnectionPoolConfig, } /// Response pool parameters for reducing channel allocation overhead @@ -63,6 +67,18 @@ pub struct BatchingConfig { pub batch_threshold: usize, } +/// Connection pool parameters +#[derive(Debug, Clone, Deserialize)] +pub struct ConnectionPoolConfig { + /// Maximum concurrent connections (default: 10000) + #[serde(default = "default_max_connections")] + pub max_connections: usize, + + /// Number of pre-allocated I/O buffers in the pool (default: 64) + #[serde(default = "default_buffer_pool_size")] + pub buffer_pool_size: usize, +} + // Default value functions for serde fn default_num_shards() -> usize { 16 @@ -85,6 +101,12 @@ fn default_min_pipeline_buffer() -> usize { fn default_batch_threshold() -> usize { 2 } +fn default_max_connections() -> usize { + 10000 +} +fn default_buffer_pool_size() -> usize { + 64 +} impl Default for PerformanceConfig { fn default() -> Self { @@ -93,6 +115,16 @@ impl Default for PerformanceConfig { response_pool: ResponsePoolConfig::default(), buffers: BufferConfig::default(), batching: BatchingConfig::default(), + connection_pool: ConnectionPoolConfig::default(), + } + } +} + +impl Default for ConnectionPoolConfig { + fn default() -> Self { + Self { + max_connections: default_max_connections(), + buffer_pool_size: default_buffer_pool_size(), } } } diff --git a/src/production/server_optimized.rs b/src/production/server_optimized.rs index d446b54..ed7977f 100644 --- a/src/production/server_optimized.rs +++ b/src/production/server_optimized.rs @@ -31,12 +31,14 @@ impl OptimizedRedisServer { } info!( - "Performance config: shards={}, pool_capacity={}, pool_prewarm={}, read_buffer={}, min_pipeline={}", + "Performance config: shards={}, pool_capacity={}, pool_prewarm={}, read_buffer={}, min_pipeline={}, max_conns={}, buffer_pool={}", perf_config.num_shards, perf_config.response_pool.capacity, perf_config.response_pool.prewarm, perf_config.buffers.read_size, perf_config.batching.min_pipeline_buffer, + perf_config.connection_pool.max_connections, + perf_config.connection_pool.buffer_pool_size, ); // Load security configuration @@ -78,7 +80,10 @@ impl OptimizedRedisServer { let acl_manager = Arc::new(RwLock::new(acl_manager)); let state = ShardedActorState::with_perf_config(&perf_config); - let connection_pool = Arc::new(ConnectionPool::new(10000, 512)); + let connection_pool = Arc::new(ConnectionPool::new( + perf_config.connection_pool.max_connections, + perf_config.connection_pool.buffer_pool_size, + )); // Create connection config from performance config let conn_config = diff --git a/src/production/sharded_actor.rs b/src/production/sharded_actor.rs index 7612db2..72627c0 100644 --- a/src/production/sharded_actor.rs +++ b/src/production/sharded_actor.rs @@ -183,78 +183,90 @@ impl ShardActor { async fn run(mut self) { while let Some(msg) = self.rx.recv().await { - match msg { - ShardMessage::Command { - cmd, - virtual_time, - response_tx, - } => { - self.executor.set_time(virtual_time); - let response = self.executor.execute(&cmd); - let _ = response_tx.send(response); - } - ShardMessage::BatchCommand { cmd, virtual_time } => { - // Fire-and-forget: execute without sending response - self.executor.set_time(virtual_time); - let _ = self.executor.execute(&cmd); - } - ShardMessage::EvictExpired { - virtual_time, - response_tx, - } => { - let evicted = self.executor.evict_expired_direct(virtual_time); - let _ = response_tx.send(evicted); - } - ShardMessage::FastGet { key, response_tx } => { - // Fast path: direct GET without Command enum overhead - let key_str = unsafe { std::str::from_utf8_unchecked(&key) }; - let response = self.executor.get_direct(key_str); - let _ = response_tx.send(response); - } - ShardMessage::FastSet { - key, - value, - response_tx, - } => { - // Fast path: direct SET without Command enum overhead - let key_str = unsafe { std::str::from_utf8_unchecked(&key) }; - let response = self.executor.set_direct(key_str, &value); - let _ = response_tx.send(response); - } - ShardMessage::FastBatchGet { keys, response_tx } => { - // Batch GET: process multiple keys in single message - let mut results = Vec::with_capacity(keys.len()); - for key in keys { - let key_str = unsafe { std::str::from_utf8_unchecked(&key) }; - results.push(self.executor.get_direct(key_str)); - } - let _ = response_tx.send(results); - } - ShardMessage::FastBatchSet { pairs, response_tx } => { - // Batch SET: process multiple key-value pairs in single message - let mut results = Vec::with_capacity(pairs.len()); - for (key, value) in pairs { - let key_str = unsafe { std::str::from_utf8_unchecked(&key) }; - results.push(self.executor.set_direct(key_str, &value)); - } - let _ = response_tx.send(results); - } - ShardMessage::PooledFastGet { key, response_slot } => { - // Pooled fast GET: uses response slot instead of oneshot + self.process_message(msg); + // FoundationDB actor loop pattern: drain all pending messages without + // yielding back to the tokio scheduler. This reduces context switches + // (24% of CPU in profiling) by batching work within a single scheduler turn. + while let Ok(msg) = self.rx.try_recv() { + self.process_message(msg); + } + } + } + + /// Process a single shard message. Extracted to support the try_recv drain pattern. + #[inline] + fn process_message(&mut self, msg: ShardMessage) { + match msg { + ShardMessage::Command { + cmd, + virtual_time, + response_tx, + } => { + self.executor.set_time(virtual_time); + let response = self.executor.execute(&cmd); + let _ = response_tx.send(response); + } + ShardMessage::BatchCommand { cmd, virtual_time } => { + // Fire-and-forget: execute without sending response + self.executor.set_time(virtual_time); + let _ = self.executor.execute(&cmd); + } + ShardMessage::EvictExpired { + virtual_time, + response_tx, + } => { + let evicted = self.executor.evict_expired_direct(virtual_time); + let _ = response_tx.send(evicted); + } + ShardMessage::FastGet { key, response_tx } => { + // Fast path: direct GET without Command enum overhead + let key_str = unsafe { std::str::from_utf8_unchecked(&key) }; + let response = self.executor.get_direct(key_str); + let _ = response_tx.send(response); + } + ShardMessage::FastSet { + key, + value, + response_tx, + } => { + // Fast path: direct SET without Command enum overhead + let key_str = unsafe { std::str::from_utf8_unchecked(&key) }; + let response = self.executor.set_direct(key_str, &value); + let _ = response_tx.send(response); + } + ShardMessage::FastBatchGet { keys, response_tx } => { + // Batch GET: process multiple keys in single message + let mut results = Vec::with_capacity(keys.len()); + for key in keys { let key_str = unsafe { std::str::from_utf8_unchecked(&key) }; - let response = self.executor.get_direct(key_str); - response_slot.send(response); + results.push(self.executor.get_direct(key_str)); } - ShardMessage::PooledFastSet { - key, - value, - response_slot, - } => { - // Pooled fast SET: uses response slot instead of oneshot + let _ = response_tx.send(results); + } + ShardMessage::FastBatchSet { pairs, response_tx } => { + // Batch SET: process multiple key-value pairs in single message + let mut results = Vec::with_capacity(pairs.len()); + for (key, value) in pairs { let key_str = unsafe { std::str::from_utf8_unchecked(&key) }; - let response = self.executor.set_direct(key_str, &value); - response_slot.send(response); + results.push(self.executor.set_direct(key_str, &value)); } + let _ = response_tx.send(results); + } + ShardMessage::PooledFastGet { key, response_slot } => { + // Pooled fast GET: uses response slot instead of oneshot + let key_str = unsafe { std::str::from_utf8_unchecked(&key) }; + let response = self.executor.get_direct(key_str); + response_slot.send(response); + } + ShardMessage::PooledFastSet { + key, + value, + response_slot, + } => { + // Pooled fast SET: uses response slot instead of oneshot + let key_str = unsafe { std::str::from_utf8_unchecked(&key) }; + let response = self.executor.set_direct(key_str, &value); + response_slot.send(response); } } } diff --git a/src/redis/executor/bitmap_ops.rs b/src/redis/executor/bitmap_ops.rs index d80318d..ad0ea13 100644 --- a/src/redis/executor/bitmap_ops.rs +++ b/src/redis/executor/bitmap_ops.rs @@ -47,8 +47,6 @@ impl CommandExecutor { if need_create { let sds = SDS::new(vec![0u8; required_len]); self.data.insert(key.to_string(), Value::String(sds)); - self.access_times - .insert(key.to_string(), self.current_time); } // Get mutable reference to the string diff --git a/src/redis/executor/hash_ops.rs b/src/redis/executor/hash_ops.rs index 043db18..22d5bcc 100644 --- a/src/redis/executor/hash_ops.rs +++ b/src/redis/executor/hash_ops.rs @@ -16,7 +16,6 @@ impl CommandExecutor { .data .entry(key.to_string()) .or_insert_with(|| Value::Hash(RedisHash::new())); - self.access_times.insert(key.to_string(), self.current_time); match hash { Value::Hash(h) => { let mut new_fields = 0i64; @@ -92,7 +91,6 @@ impl CommandExecutor { if matches!(self.data.get(key), Some(Value::Hash(h)) if h.is_empty()) { self.data.remove(key); self.expirations.remove(key); - self.access_times.remove(key); } result } @@ -205,7 +203,6 @@ impl CommandExecutor { if self.is_expired(key) { self.data.remove(key); self.expirations.remove(key); - self.access_times.remove(key); } // Check if key exists and is wrong type before inserting @@ -221,7 +218,6 @@ impl CommandExecutor { .data .entry(key.to_string()) .or_insert_with(|| Value::Hash(RedisHash::new())); - self.access_times.insert(key.to_string(), self.current_time); match hash { Value::Hash(h) => { diff --git a/src/redis/executor/key_ops.rs b/src/redis/executor/key_ops.rs index bd5bb40..b1b5d73 100644 --- a/src/redis/executor/key_ops.rs +++ b/src/redis/executor/key_ops.rs @@ -5,10 +5,10 @@ //! //! # TigerStyle Invariants //! -//! - DEL removes keys from data, expirations, AND access_times +//! - DEL removes keys from data and expirations //! - EXISTS count is always in range [0, keys.len()] //! - TTL/PTTL returns -2 (not exists), -1 (no expiry), or >= 0 (remaining) -//! - FLUSH clears all three maps completely +//! - FLUSH clears data and expirations completely use super::CommandExecutor; use crate::redis::data::Value; @@ -27,7 +27,6 @@ impl CommandExecutor { count += 1; } self.expirations.remove(key); - self.access_times.remove(key); } // TigerStyle: Postconditions @@ -97,7 +96,6 @@ impl CommandExecutor { pub(super) fn execute_flush(&mut self) -> RespValue { self.data.clear(); self.expirations.clear(); - self.access_times.clear(); // TigerStyle: Postconditions - all state must be cleared debug_assert!( @@ -108,10 +106,6 @@ impl CommandExecutor { self.expirations.is_empty(), "Postcondition violated: expirations must be empty after FLUSH" ); - debug_assert!( - self.access_times.is_empty(), - "Postcondition violated: access_times must be empty after FLUSH" - ); RespValue::simple("OK") } @@ -145,7 +139,6 @@ impl CommandExecutor { // Negative/zero TTL means delete immediately (skip flag checks for delete) self.data.remove(key); self.expirations.remove(key); - self.access_times.remove(key); return RespValue::Integer(1); } @@ -215,7 +208,6 @@ impl CommandExecutor { if milliseconds <= 0 { self.data.remove(key); self.expirations.remove(key); - self.access_times.remove(key); return RespValue::Integer(1); } @@ -294,12 +286,10 @@ impl CommandExecutor { if simulation_relative_ms <= 0 { self.data.remove(key); self.expirations.remove(key); - self.access_times.remove(key); RespValue::Integer(1) } else if (simulation_relative_ms as u64) <= self.current_time.as_millis() { self.data.remove(key); self.expirations.remove(key); - self.access_times.remove(key); RespValue::Integer(1) } else { let expiration = VirtualTime::from_millis(simulation_relative_ms as u64); @@ -319,12 +309,10 @@ impl CommandExecutor { if simulation_relative_millis <= 0 { self.data.remove(key); self.expirations.remove(key); - self.access_times.remove(key); RespValue::Integer(1) } else if (simulation_relative_millis as u64) <= self.current_time.as_millis() { self.data.remove(key); self.expirations.remove(key); - self.access_times.remove(key); RespValue::Integer(1) } else { let expiration = VirtualTime::from_millis(simulation_relative_millis as u64); diff --git a/src/redis/executor/list_ops.rs b/src/redis/executor/list_ops.rs index 8743622..8e2a5ca 100644 --- a/src/redis/executor/list_ops.rs +++ b/src/redis/executor/list_ops.rs @@ -25,7 +25,6 @@ impl CommandExecutor { .data .entry(key.to_string()) .or_insert_with(|| Value::List(RedisList::new())); - self.access_times.insert(key.to_string(), self.current_time); match list { Value::List(l) => { #[cfg(debug_assertions)] @@ -61,7 +60,6 @@ impl CommandExecutor { .data .entry(key.to_string()) .or_insert_with(|| Value::List(RedisList::new())); - self.access_times.insert(key.to_string(), self.current_time); match list { Value::List(l) => { #[cfg(debug_assertions)] @@ -103,7 +101,6 @@ impl CommandExecutor { if matches!(self.data.get(key), Some(Value::List(l)) if l.is_empty()) { self.data.remove(key); self.expirations.remove(key); - self.access_times.remove(key); } #[cfg(debug_assertions)] if matches!(self.data.get(key), Some(Value::List(l)) if l.is_empty()) { @@ -127,7 +124,6 @@ impl CommandExecutor { if matches!(self.data.get(key), Some(Value::List(l)) if l.is_empty()) { self.data.remove(key); self.expirations.remove(key); - self.access_times.remove(key); } #[cfg(debug_assertions)] if matches!(self.data.get(key), Some(Value::List(l)) if l.is_empty()) { @@ -216,7 +212,6 @@ impl CommandExecutor { match self.data.get_mut(key) { Some(Value::List(list)) => match list.set(index, value.clone()) { Ok(()) => { - self.access_times.insert(key.to_string(), self.current_time); let resp = RespValue::simple("OK"); debug_assert!(matches!(&resp, RespValue::SimpleString(s) if s == "OK"), "Postcondition: LSET success must return OK"); resp @@ -238,11 +233,9 @@ impl CommandExecutor { match self.data.get_mut(key) { Some(Value::List(list)) => { list.trim(start, stop); - self.access_times.insert(key.to_string(), self.current_time); // Remove key if list becomes empty if list.is_empty() { self.data.remove(key); - self.access_times.remove(key); self.expirations.remove(key); } #[cfg(debug_assertions)] @@ -280,7 +273,6 @@ impl CommandExecutor { if let Some(Value::List(list)) = self.data.get(source) { if list.is_empty() { self.data.remove(source); - self.access_times.remove(source); self.expirations.remove(source); } } @@ -297,8 +289,6 @@ impl CommandExecutor { match dest_list { Value::List(list) => { list.lpush(value.clone()); - self.access_times - .insert(dest.to_string(), self.current_time); #[cfg(debug_assertions)] debug_assert!(self.data.contains_key(dest), "Postcondition: RPOPLPUSH dest must exist after push"); RespValue::BulkString(Some(value.as_bytes().to_vec())) @@ -346,7 +336,6 @@ impl CommandExecutor { if let Some(Value::List(list)) = self.data.get(source) { if list.is_empty() { self.data.remove(source); - self.access_times.remove(source); self.expirations.remove(source); } } @@ -367,8 +356,6 @@ impl CommandExecutor { } else { list.rpush(value.clone()); } - self.access_times - .insert(dest.to_string(), self.current_time); #[cfg(debug_assertions)] debug_assert!(self.data.contains_key(dest), "Postcondition: LMOVE dest must exist after push"); RespValue::BulkString(Some(value.as_bytes().to_vec())) diff --git a/src/redis/executor/mod.rs b/src/redis/executor/mod.rs index 6f2238a..869fd2b 100644 --- a/src/redis/executor/mod.rs +++ b/src/redis/executor/mod.rs @@ -45,7 +45,6 @@ pub struct CommandExecutor { pub(crate) data: AHashMap, pub(crate) expirations: AHashMap, pub(crate) current_time: VirtualTime, - pub(crate) access_times: AHashMap, #[allow(dead_code)] pub(crate) key_count: usize, pub(crate) commands_processed: usize, @@ -70,7 +69,6 @@ impl CommandExecutor { data: AHashMap::new(), expirations: AHashMap::new(), current_time: VirtualTime::from_millis(0), - access_times: AHashMap::new(), key_count: 0, commands_processed: 0, simulation_start_epoch: 0, @@ -90,7 +88,6 @@ impl CommandExecutor { data: AHashMap::new(), expirations: AHashMap::new(), current_time: VirtualTime::from_millis(0), - access_times: AHashMap::new(), key_count: 0, commands_processed: 0, simulation_start_epoch: 0, @@ -167,7 +164,6 @@ impl CommandExecutor { self.data .insert(key_owned.clone(), Value::String(SDS::new(value.to_vec()))); self.expirations.remove(key); - self.access_times.insert(key_owned, self.current_time); } #[cfg(not(feature = "opt-single-key-alloc"))] @@ -175,7 +171,6 @@ impl CommandExecutor { self.data .insert(key.to_string(), Value::String(SDS::new(value.to_vec()))); self.expirations.remove(key); - self.access_times.insert(key.to_string(), self.current_time); } #[cfg(debug_assertions)] @@ -196,18 +191,20 @@ impl CommandExecutor { self.current_time = current_time; - let expired_keys: Vec = self - .expirations - .iter() - .filter(|(_, &exp_time)| exp_time <= self.current_time) - .map(|(k, _)| k.clone()) - .collect(); + // Single-pass: retain unexpired keys, collect expired ones for data removal + let mut expired_keys = Vec::new(); + self.expirations.retain(|k, &mut exp_time| { + if exp_time <= self.current_time { + expired_keys.push(k.clone()); + false + } else { + true + } + }); let count = expired_keys.len(); - for key in expired_keys { - self.data.remove(&key); - self.expirations.remove(&key); - self.access_times.remove(&key); + for key in &expired_keys { + self.data.remove(key); } #[cfg(debug_assertions)] @@ -228,17 +225,18 @@ impl CommandExecutor { } pub(crate) fn evict_expired_keys(&mut self) { - let expired_keys: Vec = self - .expirations - .iter() - .filter(|(_, &exp_time)| exp_time <= self.current_time) - .map(|(k, _)| k.clone()) - .collect(); - - for key in expired_keys { - self.data.remove(&key); - self.expirations.remove(&key); - self.access_times.remove(&key); + let mut expired_keys = Vec::new(); + self.expirations.retain(|k, &mut exp_time| { + if exp_time <= self.current_time { + expired_keys.push(k.clone()); + false + } else { + true + } + }); + + for key in &expired_keys { + self.data.remove(key); } } @@ -246,10 +244,8 @@ impl CommandExecutor { if self.is_expired(key) { self.data.remove(key); self.expirations.remove(key); - self.access_times.remove(key); None } else { - self.access_times.insert(key.to_string(), self.current_time); self.data.get(key) } } @@ -258,10 +254,8 @@ impl CommandExecutor { if self.is_expired(key) { self.data.remove(key); self.expirations.remove(key); - self.access_times.remove(key); None } else { - self.access_times.insert(key.to_string(), self.current_time); self.data.get_mut(key) } } @@ -721,8 +715,6 @@ impl CommandExecutor { } else { self.expirations.remove(dst); } - self.access_times.remove(src); - self.access_times.insert(dst.clone(), self.current_time); #[cfg(debug_assertions)] { debug_assert!(self.data.contains_key(dst.as_str()), "Postcondition: RENAME dst must exist"); @@ -745,8 +737,6 @@ impl CommandExecutor { } else { self.expirations.remove(dst); } - self.access_times.remove(src); - self.access_times.insert(dst.clone(), self.current_time); #[cfg(debug_assertions)] { debug_assert!(self.data.contains_key(dst.as_str()), "Postcondition: RENAMENX dst must exist"); diff --git a/src/redis/executor/set_ops.rs b/src/redis/executor/set_ops.rs index 9981eee..98d9609 100644 --- a/src/redis/executor/set_ops.rs +++ b/src/redis/executor/set_ops.rs @@ -16,7 +16,6 @@ impl CommandExecutor { .data .entry(key.to_string()) .or_insert_with(|| Value::Set(RedisSet::new())); - self.access_times.insert(key.to_string(), self.current_time); match set { Value::Set(s) => { let mut added = 0; @@ -78,7 +77,6 @@ impl CommandExecutor { if matches!(self.data.get(key), Some(Value::Set(s)) if s.is_empty()) { self.data.remove(key); self.expirations.remove(key); - self.access_times.remove(key); } result } @@ -166,7 +164,6 @@ impl CommandExecutor { if matches!(self.data.get(key), Some(Value::Set(s)) if s.is_empty()) { self.data.remove(key); self.expirations.remove(key); - self.access_times.remove(key); } #[cfg(debug_assertions)] if matches!(self.data.get(key), Some(Value::Set(s)) if s.is_empty()) { diff --git a/src/redis/executor/sorted_set_ops.rs b/src/redis/executor/sorted_set_ops.rs index 39fcb44..a8f6dec 100644 --- a/src/redis/executor/sorted_set_ops.rs +++ b/src/redis/executor/sorted_set_ops.rs @@ -25,7 +25,6 @@ impl CommandExecutor { .data .entry(key.to_string()) .or_insert_with(|| Value::SortedSet(RedisSortedSet::new())); - self.access_times.insert(key.to_string(), self.current_time); match zset { Value::SortedSet(zs) => { let mut added = 0i64; @@ -141,7 +140,6 @@ impl CommandExecutor { if matches!(self.data.get(key), Some(Value::SortedSet(zs)) if zs.len() == 0) { self.data.remove(key); self.expirations.remove(key); - self.access_times.remove(key); } result } diff --git a/src/redis/executor/string_ops.rs b/src/redis/executor/string_ops.rs index edbdfc8..84baf03 100644 --- a/src/redis/executor/string_ops.rs +++ b/src/redis/executor/string_ops.rs @@ -27,7 +27,6 @@ impl CommandExecutor { // Key doesn't exist - insert and return 1 self.data .insert(key.to_string(), Value::String(value.clone())); - self.access_times.insert(key.to_string(), self.current_time); self.expirations.remove(key); #[cfg(debug_assertions)] debug_assert!( @@ -108,7 +107,6 @@ impl CommandExecutor { // Set the value self.data .insert(key.to_string(), Value::String(value.clone())); - self.access_times.insert(key.to_string(), self.current_time); // Handle expiration if let Some(seconds) = ex { @@ -126,7 +124,6 @@ impl CommandExecutor { if simulation_relative_ms <= 0 { self.data.remove(key); self.expirations.remove(key); - self.access_times.remove(key); } else { let expiration = crate::simulator::VirtualTime::from_millis(simulation_relative_ms as u64); self.expirations.insert(key.to_string(), expiration); @@ -137,7 +134,6 @@ impl CommandExecutor { if simulation_relative_ms <= 0 { self.data.remove(key); self.expirations.remove(key); - self.access_times.remove(key); } else { let expiration = crate::simulator::VirtualTime::from_millis(simulation_relative_ms as u64); @@ -183,7 +179,6 @@ impl CommandExecutor { let len = value.len(); self.data .insert(key.to_string(), Value::String(value.clone())); - self.access_times.insert(key.to_string(), self.current_time); RespValue::Integer(len as i64) } } @@ -201,7 +196,6 @@ impl CommandExecutor { }; self.data .insert(key.to_string(), Value::String(value.clone())); - self.access_times.insert(key.to_string(), self.current_time); #[cfg(debug_assertions)] debug_assert!( matches!(self.data.get(key), Some(Value::String(v)) if v == value), @@ -250,7 +244,6 @@ impl CommandExecutor { pub(super) fn execute_mset(&mut self, pairs: &[(String, SDS)]) -> RespValue { for (key, value) in pairs { self.data.insert(key.clone(), Value::String(value.clone())); - self.access_times.insert(key.clone(), self.current_time); } // TigerStyle: Postcondition - last value for each key is stored @@ -284,7 +277,6 @@ impl CommandExecutor { // All keys are new — set them all for (key, value) in pairs { self.data.insert(key.clone(), Value::String(value.clone())); - self.access_times.insert(key.clone(), self.current_time); } #[cfg(debug_assertions)] { @@ -302,7 +294,6 @@ impl CommandExecutor { // Optimized batch set - all keys are guaranteed to be on this shard for (key, value) in pairs { self.data.insert(key.clone(), Value::String(value.clone())); - self.access_times.insert(key.clone(), self.current_time); } #[cfg(debug_assertions)] { @@ -407,7 +398,6 @@ impl CommandExecutor { bytes[offset..needed].copy_from_slice(val_bytes); let new_len = bytes.len() as i64; self.data.insert(key.to_string(), Value::String(SDS::new(bytes))); - self.access_times.insert(key.to_string(), self.current_time); #[cfg(debug_assertions)] debug_assert!( self.data.contains_key(key), @@ -460,7 +450,6 @@ impl CommandExecutor { if simulation_relative_ms <= 0 { self.data.remove(key); self.expirations.remove(key); - self.access_times.remove(key); return result; } let expiration = crate::simulator::VirtualTime::from_millis(simulation_relative_ms as u64); @@ -470,7 +459,6 @@ impl CommandExecutor { if simulation_relative_ms <= 0 { self.data.remove(key); self.expirations.remove(key); - self.access_times.remove(key); return result; } let expiration = crate::simulator::VirtualTime::from_millis(simulation_relative_ms as u64); @@ -488,7 +476,6 @@ impl CommandExecutor { let result = RespValue::BulkString(Some(s.as_bytes().to_vec())); self.data.remove(key); self.expirations.remove(key); - self.access_times.remove(key); #[cfg(debug_assertions)] { debug_assert!(!self.data.contains_key(key), "Postcondition: GETDEL must remove key"); @@ -541,7 +528,6 @@ impl CommandExecutor { let new_str = format_float(new_value); let sds = SDS::from_str(&new_str); self.data.insert(key.to_string(), Value::String(sds)); - self.access_times.insert(key.to_string(), self.current_time); #[cfg(debug_assertions)] if let Some(Value::String(s)) = self.data.get(key) { debug_assert!( @@ -580,7 +566,6 @@ impl CommandExecutor { key.to_string(), Value::String(SDS::from_str(&increment.to_string())), ); - self.access_times.insert(key.to_string(), self.current_time); RespValue::Integer(increment) } };