diff --git a/crates/cli/src/util/utils.rs b/crates/cli/src/util/utils.rs index 95fcb47a..561335eb 100644 --- a/crates/cli/src/util/utils.rs +++ b/crates/cli/src/util/utils.rs @@ -23,8 +23,12 @@ use contender_testfile::TestConfig; use nu_ansi_term::{AnsiGenericString, Style as ANSIStyle}; use rand::Rng; use std::{str::FromStr, sync::Arc, time::Duration}; +use tokio::sync::Semaphore; use tracing::{debug, info, warn}; +/// Maximum number of concurrent funding tasks to avoid overwhelming the RPC with connections. +const FUNDING_CONCURRENCY_LIMIT: usize = 25; + pub const DEFAULT_PRV_KEYS: [&str; 10] = [ "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80", "0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d", @@ -205,14 +209,19 @@ pub async fn fund_accounts( insufficient_balances.len() ); } + + let semaphore = Arc::new(Semaphore::new(FUNDING_CONCURRENCY_LIMIT)); + for (idx, (address, _)) in insufficient_balances.into_iter().enumerate() { let fund_amount = min_balance; let fund_with = fund_with.to_owned(); let sender = sender_pending_tx.clone(); let rpc_client = rpc_client.clone(); + let sem = semaphore.clone(); fund_handles.push(tokio::task::spawn(async move { - let res = fund_account( + let _permit = sem.acquire().await.expect("semaphore closed"); + match fund_account( &fund_with, address, fund_amount, @@ -220,8 +229,26 @@ pub async fn fund_accounts( Some(admin_nonce + idx as u64), tx_type, ) - .await?; - sender.send(res).await.expect("failed to handle pending tx"); + .await + { + Ok(res) => { + // If the receiver is already closed (e.g. an earlier task failed + // and the function returned), just drop the result. + let _ = sender.send(res).await; + } + Err(UtilError::Rpc(e)) + if [ + "already known", + "replacement transaction underpriced", + "transaction already imported", + ] + .iter() + .any(|em| e.to_string().to_lowercase().contains(em)) => + { + warn!("funding tx for {address} already in mempool, skipping"); + } + Err(e) => return Err(e.into()), + } Ok::<_, CliError>(()) })); @@ -482,8 +509,10 @@ pub fn human_readable_duration(duration: Duration) -> String { #[cfg(test)] mod test { + use crate::commands::common::EngineParams; use crate::error::CliError; use crate::util::error::UtilError; + use crate::util::fund_account; use crate::util::human_readable_duration; use crate::util::utils::human_readable_gas; @@ -498,6 +527,7 @@ mod test { providers::{DynProvider, Provider, ProviderBuilder}, signers::local::PrivateKeySigner, }; + use contender_core::util::default_signers; use std::str::FromStr; use std::time::Duration; @@ -505,6 +535,17 @@ mod test { Anvil::new().block_time_f64(0.25).spawn() } + pub fn spawn_anvil_no_mining() -> AnvilInstance { + Anvil::new().args(["--no-mining"]).spawn() + } + + pub fn init_tracing() { + let _ = tracing_subscriber::fmt() + .with_env_filter("contender=debug,info") + .with_test_writer() + .try_init(); + } + #[test] fn it_parses_durations() { let test_duration = |s: &str, d: Duration| { @@ -688,6 +729,94 @@ mod test { )) } + #[tokio::test] + async fn fund_account_returns_rpc_error_on_duplicate_tx() { + init_tracing(); + let anvil = spawn_anvil_no_mining(); + let rpc_client = DynProvider::new( + ProviderBuilder::new() + .network::() + .connect_http(anvil.endpoint_url()), + ); + let signer = PrivateKeySigner::from_str(super::DEFAULT_PRV_KEYS[0]).unwrap(); + let recipient: Address = "0x0000000000000000000000000000000000000013" + .parse() + .unwrap(); + let min_balance = U256::from(ETH_TO_WEI); + let tx_type = alloy::consensus::TxType::Eip1559; + // send eth to the recipient + let res = fund_account(&signer, recipient, min_balance, &rpc_client, None, tx_type).await; + assert!(res.is_ok(), "initial funding should succeed"); + println!("initial funding tx sent, attempting duplicate... {res:?}"); + // attempt to send the same transaction again, which should result in an error because the nonce is the same and the first transaction is still pending + let res = fund_account(&signer, recipient, min_balance, &rpc_client, None, tx_type).await; + assert!( + res.is_err(), + "duplicate transaction should result in an error" + ); + println!("error as expected: {res:?}"); + assert!( + matches!(res.unwrap_err(), UtilError::Rpc(e) if e.to_string().to_lowercase().contains("transaction already imported")), + "error should be a util RPC error indicating the transaction is already known or underpriced" + ); + } + + #[tokio::test] + async fn fund_accounts_ignores_duplicate_transactions() { + init_tracing(); + let anvil = spawn_anvil_no_mining(); + println!("Anvil endpoint: {}", anvil.endpoint_url()); + let provider = ProviderBuilder::new() + .network::() + .connect_http(anvil.endpoint_url()); + let rpc_client = DynProvider::new(provider); + let recipients = [Address::random(), Address::random()]; + let signers = default_signers(); + let admin_signer = signers.first().unwrap(); + + // call fund_accounts, which will stall waiting for transactions to confirm, but since we set no-mining, they will never confirm + // time out quickly so we can call it again + tokio::select! { + // time out after 3 seconds + _ = tokio::time::sleep(Duration::from_secs(3)) => { + // at this point we'll have sent all our transactions, now to the next stage + println!("cancelling fund_accounts call, retrying..."); + } + res = fund_accounts( + &recipients, + admin_signer, + &rpc_client, + U256::from(ETH_TO_WEI), + alloy::consensus::TxType::Eip1559, + &EngineParams { + engine_provider: None, + call_fcu: false, + }, + ) => { + // this won't happen because we set no-mining, but if it does, we want to know about it + res.unwrap(); + } + } + + // call fund_accounts again with the same recipients, which will attempt to send the same transactions again, but since they are already in the mempool, it should skip them and not error + let res = fund_accounts( + &recipients, + admin_signer, + &rpc_client, + U256::from(ETH_TO_WEI), + alloy::consensus::TxType::Eip1559, + &EngineParams { + engine_provider: None, + call_fcu: false, + }, + ) + .await; + assert!( + res.is_ok(), + "fund_accounts should ignore duplicate transactions in mempool" + ); + } + #[test] fn human_readable_gas_works() { assert_eq!(human_readable_gas(500), "500 gas");