diff --git a/src/crates/primitives/src/indecies.rs b/src/crates/primitives/src/indecies.rs index 5cc2c66..49e8c15 100644 --- a/src/crates/primitives/src/indecies.rs +++ b/src/crates/primitives/src/indecies.rs @@ -1,5 +1,5 @@ use std::fs::{File, OpenOptions}; -use std::io::{self, Seek, SeekFrom, Write}; +use std::io::{self, BufWriter, Seek, SeekFrom, Write}; use std::os::unix::fs::FileExt; use std::path::Path; @@ -11,6 +11,10 @@ const TXPTR_LEN_BYTES: usize = 28; const BLOCK_TX_END_LEN_BYTES: usize = 4; const LINK_LEN_BYTES: usize = 8; +/// Capacity of the [`BufWriter`] wrapping each index file. Chosen to amortize +/// the per-write syscall cost. Each index instance holds at most one such buffer. +const APPEND_BUF_CAP_BYTES: usize = 64 * 1024; + pub const OUTID_NONE: u64 = u64::MAX; pub const INID_NONE: u64 = u64::MAX; @@ -88,9 +92,15 @@ impl TxPtr { } } +/// Fixed-width append-mostly log of `N`-byte records. +/// +/// Appends are buffered via [`BufWriter`] and flushed to disk in batches. +/// `get_bytes` serves buffered tail records directly from the writer's buffer. +/// `set_bytes` flushes the buffer first, then uses a positional write so the +/// file cursor is undisturbed for subsequent appends. #[derive(Debug)] struct FixedWidthIndex { - file: File, + writer: BufWriter, len: u64, mmap: Option, } @@ -104,14 +114,14 @@ impl FixedWidthIndex { .truncate(true) .open(path)?; Ok(Self { - file, + writer: BufWriter::with_capacity(APPEND_BUF_CAP_BYTES, file), len: 0, mmap: None, }) } fn open(path: impl AsRef, len_error: &'static str) -> io::Result { - let file = OpenOptions::new().read(true).write(true).open(path)?; + let mut file = OpenOptions::new().read(true).write(true).open(path)?; let len_bytes = file.metadata()?.len(); if len_bytes % (N as u64) != 0 { return Err(io::Error::new(io::ErrorKind::InvalidData, len_error)); @@ -123,12 +133,17 @@ impl FixedWidthIndex { } else { None }; - Ok(Self { file, len, mmap }) + file.seek(SeekFrom::End(0))?; + Ok(Self { + writer: BufWriter::with_capacity(APPEND_BUF_CAP_BYTES, file), + len, + mmap, + }) } /// Open an existing file or create a new one without truncating existing content. fn open_or_create(path: impl AsRef, len_error: &'static str) -> io::Result { - let file = OpenOptions::new() + let mut file = OpenOptions::new() .read(true) .write(true) .truncate(false) @@ -138,20 +153,27 @@ impl FixedWidthIndex { if len_bytes % (N as u64) != 0 { return Err(io::Error::new(io::ErrorKind::InvalidData, len_error)); } + let len = len_bytes / (N as u64); + file.seek(SeekFrom::End(0))?; Ok(Self { - file, - len: len_bytes / (N as u64), + writer: BufWriter::with_capacity(APPEND_BUF_CAP_BYTES, file), + len, mmap: None, }) } + fn flush(&mut self) -> io::Result<()> { + self.writer.flush() + } + /// Remap the file for read access after all writes are complete. /// /// Must not be called while any concurrent writes to this file are in flight. fn remap(&mut self) -> io::Result<()> { + self.writer.flush()?; self.mmap = if self.len > 0 { // Safety: no more writes will occur on this handle after remap. - Some(unsafe { Mmap::map(&self.file)? }) + Some(unsafe { Mmap::map(self.writer.get_ref())? }) } else { None }; @@ -167,31 +189,46 @@ impl FixedWidthIndex { } fn append_bytes(&mut self, bytes: &[u8; N]) -> io::Result { - self.file.seek(SeekFrom::End(0))?; - self.file.write_all(bytes)?; + self.writer.write_all(bytes)?; self.len += 1; Ok(self.len - 1) } fn set_bytes(&mut self, index: u64, bytes: &[u8; N]) -> io::Result<()> { + if index >= self.len { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "set_bytes index out of range", + )); + } + // Flush so the record is on disk before the positional write. + self.writer.flush()?; let offset = index * (N as u64); - self.file.seek(SeekFrom::Start(offset))?; - self.file.write_all(bytes) + self.writer.get_ref().write_all_at(bytes, offset) } fn get_bytes(&self, index: u64) -> io::Result> { if index >= self.len { return Ok(None); } - let offset = (index * N as u64) as usize; + let buf = self.writer.buffer(); + let flushed_count = self.len - (buf.len() / N) as u64; + if index >= flushed_count { + let buf_offset = ((index - flushed_count) as usize) * N; + let mut out = [0u8; N]; + out.copy_from_slice(&buf[buf_offset..buf_offset + N]); + return Ok(Some(out)); + } + let offset = index * (N as u64); if let Some(mmap) = &self.mmap { - let mut buf = [0u8; N]; - buf.copy_from_slice(&mmap[offset..offset + N]); - return Ok(Some(buf)); + let offset = offset as usize; + let mut out = [0u8; N]; + out.copy_from_slice(&mmap[offset..offset + N]); + return Ok(Some(out)); } - let mut buf = [0u8; N]; - self.file.read_exact_at(&mut buf, offset as u64)?; - Ok(Some(buf)) + let mut out = [0u8; N]; + self.writer.get_ref().read_exact_at(&mut out, offset)?; + Ok(Some(out)) } } @@ -441,6 +478,17 @@ impl DenseIndexSet { }) } + /// Flush all buffered appends to disk. + /// + /// Must be called before any `set` on recently appended records so those + /// records are on disk for the positional write. + pub fn flush(&mut self) -> io::Result<()> { + self.txptr.inner.flush()?; + self.block_tx.inner.flush()?; + self.in_prevout.inner.flush()?; + self.out_spent.inner.flush() + } + /// Map all four index files into memory for zero-syscall reads. /// /// Call once after all writes are complete. diff --git a/src/crates/primitives/src/parser.rs b/src/crates/primitives/src/parser.rs index 80cf4bb..bc85b09 100644 --- a/src/crates/primitives/src/parser.rs +++ b/src/crates/primitives/src/parser.rs @@ -100,7 +100,6 @@ impl Parser { log::debug!("Starting to parse blocks in range: {:?}", range); let parse_start = std::time::Instant::now(); - let (mut tx_in_total, mut tx_out_total) = tx_io_totals(&indices.txptr); let mut tx_total = indices .block_tx .last() @@ -163,26 +162,64 @@ impl Parser { if global_height >= range.start { let block_slice = &bytes[block_start..block_end]; let block_start_in_file = block_start as u64; - let mut collector = TxIdCollector { - block_file: file_id, - block_start_in_file, - block_slice, - indices, - error: None, - tx_in_total: &mut tx_in_total, - tx_out_total: &mut tx_out_total, - tx_count: 0, - current_in: 0, - current_out: 0, - spk_db, - txids: &mut txids, + + // Collect all per-block writes into a batch. The collector is + // scoped here so its &ConfirmedTxPtrIndex borrow drops before + // the bulk flush below mutates indices. + let batch = { + let txptr_base = indices.txptr.len() as u32; + let tx_in_base = indices.in_prevout.len(); + let tx_out_base = indices.out_spent.len(); + let mut collector = TxIdCollector { + block_file: file_id, + block_start_in_file, + block_slice, + txids: &mut txids, + committed_txptr: &indices.txptr, + txptr_base, + batch: BlockBatch::default(), + tx_in_running: tx_in_base, + tx_out_running: tx_out_base, + tx_out_base, + current_in: 0, + current_out: 0, + spk_db, + error: None, + }; + bsl::Block::visit(block_slice, &mut collector) + .map_err(BlockFileError::Parse)?; + if let Some(error) = collector.error.take() { + return Err(error); + } + collector.batch }; - bsl::Block::visit(block_slice, &mut collector) - .map_err(BlockFileError::Parse)?; - if let Some(error) = collector.error.take() { - return Err(error); + + // Bulk-append all records from the block, then flush so the + // subsequent set() calls target on-disk data. + for ptr in &batch.txptrs { + indices.txptr.append(*ptr).map_err(BlockFileError::Io)?; + } + for out_id in &batch.in_prevouts { + indices + .in_prevout + .append(*out_id) + .map_err(BlockFileError::Io)?; + } + for in_id in &batch.out_spents { + indices + .out_spent + .append(*in_id) + .map_err(BlockFileError::Io)?; } - tx_total += collector.tx_count; + indices.flush().map_err(BlockFileError::Io)?; + for (out_id, in_id) in &batch.out_spent_updates { + indices + .out_spent + .set(*out_id, *in_id) + .map_err(BlockFileError::Io)?; + } + + tx_total += batch.txptrs.len() as u64; if tx_total > u32::MAX as u64 { return Err(BlockFileError::CorruptId()); } @@ -207,6 +244,17 @@ impl Parser { } } +/// Writes staged for a single block before being bulk-applied to the indices. +#[derive(Default)] +struct BlockBatch { + txptrs: Vec, + in_prevouts: Vec, + /// One entry per output, all initialised to `INID_NONE`. + out_spents: Vec, + /// `(out_id, in_id)` pairs for outputs spent within or before this block. + out_spent_updates: Vec<(u64, u64)>, +} + /// Visitor that collects TxIds (file + byte offset) for each transaction in a block. struct TxIdCollector<'a> { block_file: BlockFileId, @@ -215,26 +263,65 @@ struct TxIdCollector<'a> { // TODO: This is an unbounded map and will consume many GBs for mainnet. // The problem is we need to resolve txids to dense ids. txids: &'a mut HashMap<[u8; 32], TxId>, - indices: &'a mut DenseIndexSet, - error: Option, - tx_in_total: &'a mut u64, - tx_out_total: &'a mut u64, - tx_count: u64, + /// Read-only view of txptrs committed before this block. + committed_txptr: &'a ConfirmedTxPtrIndex, + /// `committed_txptr.len()` at the start of this block, used to distinguish + /// same-block txptrs (staged in `batch`) from committed ones. + txptr_base: u32, + batch: BlockBatch, + /// Running total of inputs through the end of the last completed tx. + tx_in_running: u64, + /// Running total of outputs through the end of the last completed tx. + tx_out_running: u64, + /// `out_spent.len()` at block start, used for the sequential-id sanity check. + tx_out_base: u64, current_in: u64, current_out: u64, spk_db: &'a mut SledScriptPubkeyDb, + error: Option, +} + +impl TxIdCollector<'_> { + fn tx_out_range_for(&self, txid: TxId) -> (u64, u64) { + let idx = txid.index(); + if idx >= self.txptr_base { + let i = (idx - self.txptr_base) as usize; + let end = self.batch.txptrs[i].tx_out_end(); + let start = if i > 0 { + self.batch.txptrs[i - 1].tx_out_end() + } else if self.txptr_base > 0 { + self.committed_txptr + .get(TxId::new(self.txptr_base - 1)) + .unwrap_or_else(|e| { + panic!("Corrupted data store: error reading txptr index: {:?}", e) + }) + .unwrap_or_else(|| { + panic!( + "Corrupted data store: txid out of range: {}", + self.txptr_base - 1 + ) + }) + .tx_out_end() + } else { + 0 + }; + (start, end) + } else { + tx_out_range_for(txid, self.committed_txptr) + } + } } impl Visitor for TxIdCollector<'_> { fn visit_tx_in(&mut self, _vin: usize, tx_in: &bsl::TxIn<'_>) -> ControlFlow<()> { - let in_id = *self.tx_in_total + self.current_in; + let in_id = self.tx_in_running + self.current_in; let prevout = tx_in.prevout(); let out_id = if is_null_prevout(prevout) { OUTID_NONE } else { let bytes = <&[u8; 32]>::try_from(prevout.txid()).expect("prevout txid is 32 bytes"); if let Some(prev_dense) = self.txids.get(bytes).copied() { - let (start, end) = tx_out_range_for(prev_dense, &self.indices.txptr); + let (start, end) = self.tx_out_range_for(prev_dense); let vout = prevout.vout() as u64; let out_id = start + vout; if out_id >= end { OUTID_NONE } else { out_id } @@ -242,27 +329,18 @@ impl Visitor for TxIdCollector<'_> { OUTID_NONE } }; - if let Err(err) = self.indices.in_prevout.append(out_id) { - self.error = Some(BlockFileError::Io(err)); - return ControlFlow::Break(()); - } - if out_id != OUTID_NONE - && let Err(err) = self.indices.out_spent.set(out_id, in_id) - { - self.error = Some(BlockFileError::Io(err)); - return ControlFlow::Break(()); + self.batch.in_prevouts.push(out_id); + if out_id != OUTID_NONE { + self.batch.out_spent_updates.push((out_id, in_id)); } self.current_in += 1; ControlFlow::Continue(()) } fn visit_tx_out(&mut self, _vout: usize, tx_out: &bsl::TxOut<'_>) -> ControlFlow<()> { - let out_id = *self.tx_out_total + self.current_out; - if let Err(err) = self.indices.out_spent.append(INID_NONE) { - self.error = Some(BlockFileError::Io(err)); - return ControlFlow::Break(()); - } - if out_id != self.indices.out_spent.len() - 1 { + let out_id = self.tx_out_running + self.current_out; + let expected_out_id = self.tx_out_base + self.batch.out_spents.len() as u64; + if out_id != expected_out_id { self.error = Some(BlockFileError::CorruptId()); return ControlFlow::Break(()); } @@ -271,6 +349,7 @@ impl Visitor for TxIdCollector<'_> { self.error = Some(BlockFileError::SpkDb(err)); return ControlFlow::Break(()); } + self.batch.out_spents.push(INID_NONE); self.current_out += 1; ControlFlow::Continue(()) } @@ -287,53 +366,24 @@ impl Visitor for TxIdCollector<'_> { } let offset_in_block = tx_slice.as_ptr() as usize - self.block_slice.as_ptr() as usize; let file_offset = self.block_start_in_file + offset_in_block as u64; - *self.tx_in_total += self.current_in; - *self.tx_out_total += self.current_out; + self.tx_in_running += self.current_in; + self.tx_out_running += self.current_out; let ptr = TxPtr::new( self.block_file.0, file_offset as u32, tx_len as u32, - *self.tx_in_total, - *self.tx_out_total, + self.tx_in_running, + self.tx_out_running, ); - match self.indices.txptr.append(ptr) { - Ok(txid) => { - self.txids.insert(tx.txid().to_byte_array(), txid); - } - Err(err) => { - self.error = Some(BlockFileError::Io(err)); - return ControlFlow::Break(()); - } - } - self.tx_count += 1; + let txid = TxId::new(self.txptr_base + self.batch.txptrs.len() as u32); + self.batch.txptrs.push(ptr); + self.txids.insert(tx.txid().to_byte_array(), txid); self.current_in = 0; self.current_out = 0; ControlFlow::Continue(()) } } -fn tx_io_totals(txptr_index: &ConfirmedTxPtrIndex) -> (u64, u64) { - let len = txptr_index.len(); - if txptr_index.is_empty() { - return (0, 0); - } - let last = TxId::new((len - 1) as u32); - let ptr = txptr_index - .get(last) - .unwrap_or_else(|e| panic!("Corrupted data store: error reading txptr: {:?}", e)) - .unwrap_or_else(|| { - panic!( - "Corrupted data store: transaction not found for txid: {:?}", - last - ) - }); - (ptr.tx_in_end(), ptr.tx_out_end()) -} - -fn is_null_prevout(prevout: &bsl::OutPoint<'_>) -> bool { - prevout.vout() == u32::MAX && prevout.txid().iter().all(|b| *b == 0) -} - fn tx_out_range_for(txid: TxId, txptr_index: &ConfirmedTxPtrIndex) -> (u64, u64) { let end = txptr_index .get(txid) @@ -357,6 +407,10 @@ fn tx_out_range_for(txid: TxId, txptr_index: &ConfirmedTxPtrIndex) -> (u64, u64) } } +fn is_null_prevout(prevout: &bsl::OutPoint<'_>) -> bool { + prevout.vout() == u32::MAX && prevout.txid().iter().all(|b| *b == 0) +} + fn script_pubkey_hash(script_pubkey: &[u8]) -> ScriptPubkeyHash { let hash = Hash160::hash(script_pubkey); hash.to_byte_array()