Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fn update_wallet(node: &Node, wallet: &Wallet) -> Result<(), Error> {
let addresses = wallet.get_addresses()?;
let unconfirmed_utxos =
node.get_unconfirmed_utxos_by_addresses(&addresses)?;
let utxos = node.get_utxos_by_addresses(&addresses)?;
let utxos = node.get_utxos_by_addresses(&addresses, 0)?;
let confirmed_outpoints: Vec<_> = wallet.get_utxos()?.into_keys().collect();
let confirmed_spent = node
.get_spent_utxos(&confirmed_outpoints)?
Expand Down
145 changes: 143 additions & 2 deletions lib/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,24 @@ use std::{
use bitcoin::{self, hashes::Hash as _};
use fallible_iterator::{FallibleIterator, IteratorExt};
use heed::types::SerdeBincode;
use serde::{Deserialize, Serialize};
use sneed::{
DatabaseUnique, EnvError, RoTxn, RwTxn, RwTxnError, UnitKey,
db::{self, error::Error as DbError},
env, rwtxn,
};

use crate::types::{
Block, BlockHash, BmmResult, Body, Header, Tip, Txid, VERSION, Version,
Address, AddressTxidKey, Block, BlockHash, BmmResult, Body,
FilledTransaction, Header, MerkleProof, Tip, Txid, VERSION, Version,
proto::mainchain,
};

#[allow(clippy::duplicated_attributes)]
#[derive(thiserror::Error, transitive::Transitive, Debug)]
#[transitive(from(db::error::Delete, DbError))]
#[transitive(from(db::error::IterInit, DbError))]
#[transitive(from(db::error::IterItem, DbError))]
#[transitive(from(db::error::Put, DbError))]
#[transitive(from(db::error::TryGet, DbError))]
#[transitive(from(env::error::CreateDb, EnvError))]
Expand Down Expand Up @@ -73,6 +78,19 @@ pub enum Error {
NoMainHeight(bitcoin::BlockHash),
#[error("no tx with txid {0}")]
NoTx(Txid),
#[error(
"txdb filled transaction length mismatch: expected {expected}, actual {actual}"
)]
TxDbFilledTxLenMismatch { expected: usize, actual: usize },
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct FilledTxEntry {
pub tx: FilledTransaction,
pub merkle_proof: MerkleProof,
pub block_hash: BlockHash,
pub block_height: u32,
pub unix_stamp: u64,
}

#[derive(Clone)]
Expand Down Expand Up @@ -148,11 +166,13 @@ pub struct Archive {
SerdeBincode<Txid>,
SerdeBincode<BTreeMap<BlockHash, u32>>,
>,
/// Capped tx cache, keyed by (address, txid).
txdb: DatabaseUnique<AddressTxidKey, SerdeBincode<FilledTxEntry>>,
_version: DatabaseUnique<UnitKey, SerdeBincode<Version>>,
}

impl Archive {
pub const NUM_DBS: u32 = 14;
pub const NUM_DBS: u32 = 15;

pub fn new(env: &sneed::Env) -> Result<Self, Error> {
let mut rwtxn = env.write_txn()?;
Expand Down Expand Up @@ -224,6 +244,7 @@ impl Archive {
let total_work = DatabaseUnique::create(env, &mut rwtxn, "total_work")?;
let txid_to_inclusions =
DatabaseUnique::create(env, &mut rwtxn, "txid_to_inclusions")?;
let txdb = DatabaseUnique::create(env, &mut rwtxn, "txdb")?;
rwtxn.commit()?;
Ok(Self {
block_hash_to_height,
Expand All @@ -239,6 +260,7 @@ impl Archive {
successors,
total_work,
txid_to_inclusions,
txdb,
_version: version,
})
}
Expand Down Expand Up @@ -672,6 +694,125 @@ impl Archive {
Ok(inclusions)
}

pub fn prune_txdb_block(
&self,
rwtxn: &mut RwTxn,
block_hash: BlockHash,
) -> Result<usize, Error> {
let keys = {
let mut keys = Vec::new();
let mut iter = self.txdb.iter(rwtxn).map_err(DbError::from)?;
while let Some((key, entry)) = iter.next().map_err(DbError::from)? {
if entry.block_hash == block_hash {
keys.push(key);
}
}
keys
};
for key in &keys {
let _ = self.txdb.delete(rwtxn, key)?;
}
Ok(keys.len())
}

pub fn prune_txdb_older_than(
&self,
rwtxn: &mut RwTxn,
cutoff_unix: u64,
) -> Result<usize, Error> {
let keys = {
let mut keys = Vec::new();
let mut iter = self.txdb.iter(rwtxn).map_err(DbError::from)?;
while let Some((key, entry)) = iter.next().map_err(DbError::from)? {
if entry.unix_stamp < cutoff_unix {
keys.push(key);
}
}
keys
};
for key in &keys {
let _ = self.txdb.delete(rwtxn, key)?;
}
Ok(keys.len())
}

pub fn put_txdb_for_connected_block(
&self,
rwtxn: &mut RwTxn,
block_hash: BlockHash,
block_height: u32,
body: &Body,
filled_txs: &[FilledTransaction],
unix_stamp: u64,
) -> Result<(), Error> {
if body.transactions.len() != filled_txs.len() {
return Err(Error::TxDbFilledTxLenMismatch {
expected: body.transactions.len(),
actual: filled_txs.len(),
});
}
for (tx_index, filled_tx) in filled_txs.iter().enumerate() {
let txid = filled_tx.txid();
debug_assert_eq!(body.transactions[tx_index].txid(), txid);
let merkle_proof = Body::compute_tx_merkle_proof(
&body.coinbase,
&body.transactions,
tx_index,
);
debug_assert_eq!(merkle_proof.leaf_index, tx_index + 1);

let mut addresses = HashSet::new();
addresses.extend(
filled_tx.spent_utxos.iter().map(|output| output.address),
);
addresses.extend(
filled_tx
.transaction
.outputs
.iter()
.map(|output| output.address),
);

for address in addresses {
let key = AddressTxidKey::new(address, txid);
let entry = FilledTxEntry {
tx: filled_tx.clone(),
merkle_proof: merkle_proof.clone(),
block_hash,
block_height,
unix_stamp,
};
self.txdb.put(rwtxn, &key, &entry)?;
}
}
Ok(())
}

pub fn get_txdb_entries_by_address(
&self,
rotxn: &RoTxn,
addresses: &HashSet<Address>,
height_threshold: u32,
) -> Result<Vec<FilledTxEntry>, Error> {
let mut entries = Vec::new();
for address in addresses {
let start = AddressTxidKey::start(*address);
let end = AddressTxidKey::end(*address);
let mut iter = self
.txdb
.range(rotxn, &(start..=end))
.map_err(DbError::from)?;
while let Some((_key, entry)) =
iter.next().map_err(DbError::from)?
{
if entry.block_height >= height_threshold {
entries.push(entry);
}
}
}
Ok(entries)
}

/// Store a block body. The header must already exist.
pub fn put_body(
&self,
Expand Down
6 changes: 3 additions & 3 deletions lib/authorization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use serde::{Deserialize, Serialize};
use utoipa::ToSchema;

use crate::types::{
Address, AuthorizedTransaction, Body, GetAddress, Transaction, Verify,
VerifyingKey,
ADDRESS_SIZE, Address, AuthorizedTransaction, Body, GetAddress,
Transaction, Verify, VerifyingKey,
};

pub use ed25519_dalek::{SignatureError, Signer, SigningKey, Verifier};
Expand Down Expand Up @@ -147,7 +147,7 @@ impl Verify for Authorization {
pub fn get_address(verifying_key: &VerifyingKey) -> Address {
let mut hasher = blake3::Hasher::new();
let mut reader = hasher.update(&verifying_key.to_bytes()).finalize_xof();
let mut output: [u8; 20] = [0; 20];
let mut output: [u8; ADDRESS_SIZE] = [0; ADDRESS_SIZE];
reader.fill(&mut output);
Address(output)
}
Expand Down
9 changes: 7 additions & 2 deletions lib/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ where
.try_get(&rotxn, &outpoint_key)
.map_err(state::Error::from)?
{
spent.push((*outpoint, output));
spent.push((*outpoint, output.output));
}
}
Ok(spent)
Expand Down Expand Up @@ -564,9 +564,14 @@ where
pub fn get_utxos_by_addresses(
&self,
addresses: &HashSet<Address>,
height_threshold: u32,
) -> Result<HashMap<OutPoint, FilledOutput>, Error> {
let rotxn = self.env.read_txn()?;
let utxos = self.state.get_utxos_by_addresses(&rotxn, addresses)?;
let utxos = self.state.get_utxos_by_addresses(
&rotxn,
addresses,
height_threshold,
)?;
Ok(utxos)
}

Expand Down
38 changes: 30 additions & 8 deletions lib/node/net_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
collections::{HashMap, HashSet},
net::SocketAddr,
sync::Arc,
time::Duration,
time::{Duration, SystemTime, UNIX_EPOCH},
};

use error_fatality::{Nested as _, Split};
Expand Down Expand Up @@ -129,6 +129,15 @@ impl From<net::Error> for Error {
}
}

const TXDB_PRUNE_INTERVAL_BLOCKS: u32 = 144 * 7;
const TXDB_RETENTION_SECS: u64 = 28 * 24 * 60 * 60;

fn unix_now() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |duration| duration.as_secs())
}

fn connect_tip_(
rwtxn: &mut RwTxn<'_>,
archive: &Archive,
Expand All @@ -139,18 +148,30 @@ fn connect_tip_(
two_way_peg_data: &mainchain::TwoWayPegData,
) -> Result<(), Error> {
let block_hash = header.hash();
let prevalidated = state.prevalidate_block(rwtxn, header, body)?;
let block_height = prevalidated.next_height;
let merkle_root = prevalidated.computed_merkle_root;
let filled_txs = prevalidated.filled_transactions.clone();
state.connect_prevalidated_block(rwtxn, header, body, prevalidated)?;
if tracing::enabled!(tracing::Level::DEBUG) {
let merkle_root =
Body::compute_merkle_root(&body.coinbase, &body.transactions);
let height = state.try_get_height(rwtxn)?;
state.apply_block(rwtxn, header, body)?;
tracing::debug!(?height, %merkle_root, %block_hash, "connected body")
} else {
state.apply_block(rwtxn, header, body)?;
tracing::debug!(height = block_height, %merkle_root, %block_hash, "connected body")
}
let () = state.connect_two_way_peg_data(rwtxn, two_way_peg_data)?;
let () = archive.put_header(rwtxn, header)?;
let () = archive.put_body(rwtxn, block_hash, body)?;
let unix_stamp = unix_now();
let () = archive.put_txdb_for_connected_block(
rwtxn,
block_hash,
block_height,
body,
&filled_txs,
unix_stamp,
)?;
if block_height > 0 && block_height % TXDB_PRUNE_INTERVAL_BLOCKS == 0 {
let cutoff_unix = unix_stamp.saturating_sub(TXDB_RETENTION_SECS);
let _ = archive.prune_txdb_older_than(rwtxn, cutoff_unix)?;
}
for transaction in &body.transactions {
let () = mempool.delete(rwtxn, transaction.txid())?;
}
Expand Down Expand Up @@ -250,6 +271,7 @@ fn disconnect_tip_(
}
};
let () = state.disconnect_two_way_peg_data(rwtxn, &two_way_peg_data)?;
let _ = archive.prune_txdb_block(rwtxn, tip_block_hash)?;
let () = state.disconnect_tip(rwtxn, &tip_header, &tip_body)?;
for transaction in tip_body.authorized_transactions().iter().rev() {
mempool.put(rwtxn, transaction)?;
Expand Down
Loading