Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 132 additions & 3 deletions crates/cli/src/util/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ use contender_testfile::TestConfig;
use nu_ansi_term::{AnsiGenericString, Style as ANSIStyle};
use rand::Rng;
use std::{str::FromStr, sync::Arc, time::Duration};
use tokio::sync::Semaphore;
use tracing::{debug, info, warn};

/// Maximum number of concurrent funding tasks to avoid overwhelming the RPC with connections.
const FUNDING_CONCURRENCY_LIMIT: usize = 25;

pub const DEFAULT_PRV_KEYS: [&str; 10] = [
"0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80",
"0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d",
Expand Down Expand Up @@ -205,23 +209,46 @@ pub async fn fund_accounts(
insufficient_balances.len()
);
}

let semaphore = Arc::new(Semaphore::new(FUNDING_CONCURRENCY_LIMIT));

for (idx, (address, _)) in insufficient_balances.into_iter().enumerate() {
let fund_amount = min_balance;
let fund_with = fund_with.to_owned();
let sender = sender_pending_tx.clone();
let rpc_client = rpc_client.clone();
let sem = semaphore.clone();

fund_handles.push(tokio::task::spawn(async move {
let res = fund_account(
let _permit = sem.acquire().await.expect("semaphore closed");
match fund_account(
&fund_with,
address,
fund_amount,
&rpc_client,
Some(admin_nonce + idx as u64),
tx_type,
)
.await?;
sender.send(res).await.expect("failed to handle pending tx");
.await
{
Ok(res) => {
// If the receiver is already closed (e.g. an earlier task failed
// and the function returned), just drop the result.
let _ = sender.send(res).await;
}
Err(UtilError::Rpc(e))
if [
"already known",
"replacement transaction underpriced",
"transaction already imported",
]
.iter()
.any(|em| e.to_string().to_lowercase().contains(em)) =>
{
warn!("funding tx for {address} already in mempool, skipping");
}
Err(e) => return Err(e.into()),
}

Ok::<_, CliError>(())
}));
Expand Down Expand Up @@ -482,8 +509,10 @@ pub fn human_readable_duration(duration: Duration) -> String {

#[cfg(test)]
mod test {
use crate::commands::common::EngineParams;
use crate::error::CliError;
use crate::util::error::UtilError;
use crate::util::fund_account;
use crate::util::human_readable_duration;
use crate::util::utils::human_readable_gas;

Expand All @@ -498,13 +527,25 @@ mod test {
providers::{DynProvider, Provider, ProviderBuilder},
signers::local::PrivateKeySigner,
};
use contender_core::util::default_signers;
use std::str::FromStr;
use std::time::Duration;

pub fn spawn_anvil() -> AnvilInstance {
Anvil::new().block_time_f64(0.25).spawn()
}

pub fn spawn_anvil_no_mining() -> AnvilInstance {
Anvil::new().args(["--no-mining"]).spawn()
}

pub fn init_tracing() {
let _ = tracing_subscriber::fmt()
.with_env_filter("contender=debug,info")
.with_test_writer()
.try_init();
}

#[test]
fn it_parses_durations() {
let test_duration = |s: &str, d: Duration| {
Expand Down Expand Up @@ -688,6 +729,94 @@ mod test {
))
}

#[tokio::test]
async fn fund_account_returns_rpc_error_on_duplicate_tx() {
init_tracing();
let anvil = spawn_anvil_no_mining();
let rpc_client = DynProvider::new(
ProviderBuilder::new()
.network::<AnyNetwork>()
.connect_http(anvil.endpoint_url()),
);
let signer = PrivateKeySigner::from_str(super::DEFAULT_PRV_KEYS[0]).unwrap();
let recipient: Address = "0x0000000000000000000000000000000000000013"
.parse()
.unwrap();
let min_balance = U256::from(ETH_TO_WEI);
let tx_type = alloy::consensus::TxType::Eip1559;
// send eth to the recipient
let res = fund_account(&signer, recipient, min_balance, &rpc_client, None, tx_type).await;
assert!(res.is_ok(), "initial funding should succeed");
println!("initial funding tx sent, attempting duplicate... {res:?}");
// attempt to send the same transaction again, which should result in an error because the nonce is the same and the first transaction is still pending
let res = fund_account(&signer, recipient, min_balance, &rpc_client, None, tx_type).await;
assert!(
res.is_err(),
"duplicate transaction should result in an error"
);
println!("error as expected: {res:?}");
assert!(
matches!(res.unwrap_err(), UtilError::Rpc(e) if e.to_string().to_lowercase().contains("transaction already imported")),
"error should be a util RPC error indicating the transaction is already known or underpriced"
);
}

#[tokio::test]
async fn fund_accounts_ignores_duplicate_transactions() {
init_tracing();
let anvil = spawn_anvil_no_mining();
println!("Anvil endpoint: {}", anvil.endpoint_url());
let provider = ProviderBuilder::new()
.network::<AnyNetwork>()
.connect_http(anvil.endpoint_url());
let rpc_client = DynProvider::new(provider);
let recipients = [Address::random(), Address::random()];
let signers = default_signers();
let admin_signer = signers.first().unwrap();

// call fund_accounts, which will stall waiting for transactions to confirm, but since we set no-mining, they will never confirm
// time out quickly so we can call it again
tokio::select! {
// time out after 3 seconds
_ = tokio::time::sleep(Duration::from_secs(3)) => {
// at this point we'll have sent all our transactions, now to the next stage
println!("cancelling fund_accounts call, retrying...");
}
res = fund_accounts(
&recipients,
admin_signer,
&rpc_client,
U256::from(ETH_TO_WEI),
alloy::consensus::TxType::Eip1559,
&EngineParams {
engine_provider: None,
call_fcu: false,
},
) => {
// this won't happen because we set no-mining, but if it does, we want to know about it
res.unwrap();
}
}

// call fund_accounts again with the same recipients, which will attempt to send the same transactions again, but since they are already in the mempool, it should skip them and not error
let res = fund_accounts(
&recipients,
admin_signer,
&rpc_client,
U256::from(ETH_TO_WEI),
alloy::consensus::TxType::Eip1559,
&EngineParams {
engine_provider: None,
call_fcu: false,
},
)
.await;
assert!(
res.is_ok(),
"fund_accounts should ignore duplicate transactions in mempool"
);
}

#[test]
fn human_readable_gas_works() {
assert_eq!(human_readable_gas(500), "500 gas");
Expand Down
Loading