Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 68 additions & 20 deletions src/crates/primitives/src/indecies.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::fs::{File, OpenOptions};
use std::io::{self, Seek, SeekFrom, Write};
use std::io::{self, BufWriter, Seek, SeekFrom, Write};
use std::os::unix::fs::FileExt;
use std::path::Path;

Expand All @@ -11,6 +11,10 @@ const TXPTR_LEN_BYTES: usize = 28;
const BLOCK_TX_END_LEN_BYTES: usize = 4;
const LINK_LEN_BYTES: usize = 8;

/// Capacity of the [`BufWriter`] wrapping each index file. Chosen to amortize
/// the per-write syscall cost. Each index instance holds at most one such buffer.
const APPEND_BUF_CAP_BYTES: usize = 64 * 1024;

pub const OUTID_NONE: u64 = u64::MAX;
pub const INID_NONE: u64 = u64::MAX;

Expand Down Expand Up @@ -88,9 +92,15 @@ impl TxPtr {
}
}

/// Fixed-width append-mostly log of `N`-byte records.
///
/// Appends are buffered via [`BufWriter`] and flushed to disk in batches.
/// `get_bytes` serves buffered tail records directly from the writer's buffer.
/// `set_bytes` flushes the buffer first, then uses a positional write so the
/// file cursor is undisturbed for subsequent appends.
#[derive(Debug)]
struct FixedWidthIndex<const N: usize> {
file: File,
writer: BufWriter<File>,
len: u64,
mmap: Option<Mmap>,
}
Expand All @@ -104,14 +114,14 @@ impl<const N: usize> FixedWidthIndex<N> {
.truncate(true)
.open(path)?;
Ok(Self {
file,
writer: BufWriter::with_capacity(APPEND_BUF_CAP_BYTES, file),
len: 0,
mmap: None,
})
}

fn open(path: impl AsRef<Path>, len_error: &'static str) -> io::Result<Self> {
let file = OpenOptions::new().read(true).write(true).open(path)?;
let mut file = OpenOptions::new().read(true).write(true).open(path)?;
let len_bytes = file.metadata()?.len();
if len_bytes % (N as u64) != 0 {
return Err(io::Error::new(io::ErrorKind::InvalidData, len_error));
Expand All @@ -123,12 +133,17 @@ impl<const N: usize> FixedWidthIndex<N> {
} else {
None
};
Ok(Self { file, len, mmap })
file.seek(SeekFrom::End(0))?;
Ok(Self {
writer: BufWriter::with_capacity(APPEND_BUF_CAP_BYTES, file),
len,
mmap,
})
}

/// Open an existing file or create a new one without truncating existing content.
fn open_or_create(path: impl AsRef<Path>, len_error: &'static str) -> io::Result<Self> {
let file = OpenOptions::new()
let mut file = OpenOptions::new()
.read(true)
.write(true)
.truncate(false)
Expand All @@ -138,20 +153,27 @@ impl<const N: usize> FixedWidthIndex<N> {
if len_bytes % (N as u64) != 0 {
return Err(io::Error::new(io::ErrorKind::InvalidData, len_error));
}
let len = len_bytes / (N as u64);
file.seek(SeekFrom::End(0))?;
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might break when we do incremental index building

Ok(Self {
file,
len: len_bytes / (N as u64),
writer: BufWriter::with_capacity(APPEND_BUF_CAP_BYTES, file),
len,
mmap: None,
})
}

fn flush(&mut self) -> io::Result<()> {
self.writer.flush()
}

/// Remap the file for read access after all writes are complete.
///
/// Must not be called while any concurrent writes to this file are in flight.
fn remap(&mut self) -> io::Result<()> {
self.writer.flush()?;
self.mmap = if self.len > 0 {
// Safety: no more writes will occur on this handle after remap.
Some(unsafe { Mmap::map(&self.file)? })
Some(unsafe { Mmap::map(self.writer.get_ref())? })
} else {
None
};
Expand All @@ -167,31 +189,46 @@ impl<const N: usize> FixedWidthIndex<N> {
}

fn append_bytes(&mut self, bytes: &[u8; N]) -> io::Result<u64> {
self.file.seek(SeekFrom::End(0))?;
self.file.write_all(bytes)?;
self.writer.write_all(bytes)?;
self.len += 1;
Ok(self.len - 1)
}

fn set_bytes(&mut self, index: u64, bytes: &[u8; N]) -> io::Result<()> {
if index >= self.len {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"set_bytes index out of range",
));
}
// Flush so the record is on disk before the positional write.
self.writer.flush()?;
let offset = index * (N as u64);
self.file.seek(SeekFrom::Start(offset))?;
self.file.write_all(bytes)
self.writer.get_ref().write_all_at(bytes, offset)
}

fn get_bytes(&self, index: u64) -> io::Result<Option<[u8; N]>> {
if index >= self.len {
return Ok(None);
}
let offset = (index * N as u64) as usize;
let buf = self.writer.buffer();
let flushed_count = self.len - (buf.len() / N) as u64;
if index >= flushed_count {
let buf_offset = ((index - flushed_count) as usize) * N;
let mut out = [0u8; N];
out.copy_from_slice(&buf[buf_offset..buf_offset + N]);
return Ok(Some(out));
}
let offset = index * (N as u64);
if let Some(mmap) = &self.mmap {
let mut buf = [0u8; N];
buf.copy_from_slice(&mmap[offset..offset + N]);
return Ok(Some(buf));
let offset = offset as usize;
let mut out = [0u8; N];
out.copy_from_slice(&mmap[offset..offset + N]);
return Ok(Some(out));
}
let mut buf = [0u8; N];
self.file.read_exact_at(&mut buf, offset as u64)?;
Ok(Some(buf))
let mut out = [0u8; N];
self.writer.get_ref().read_exact_at(&mut out, offset)?;
Ok(Some(out))
}
}

Expand Down Expand Up @@ -441,6 +478,17 @@ impl DenseIndexSet {
})
}

/// Flush all buffered appends to disk.
///
/// Must be called before any `set` on recently appended records so those
/// records are on disk for the positional write.
pub fn flush(&mut self) -> io::Result<()> {
self.txptr.inner.flush()?;
self.block_tx.inner.flush()?;
self.in_prevout.inner.flush()?;
self.out_spent.inner.flush()
}

/// Map all four index files into memory for zero-syscall reads.
///
/// Call once after all writes are complete.
Expand Down
Loading
Loading