Skip to content

Commit cca57ab

Browse files
committed
feat(core): add prefetching to ArchiveBlockProvider to download next archive early
1 parent 1222dd6 commit cca57ab

1 file changed

Lines changed: 66 additions & 3 deletions

File tree

core/src/block_strider/provider/archive_provider.rs

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,14 @@ use crate::overlay_client::{Neighbour, PunishReason};
2323
#[serde(default)]
2424
pub struct ArchiveBlockProviderConfig {
2525
pub max_archive_to_memory_size: ByteSize,
26+
pub enable_prefetching: bool,
2627
}
2728

2829
impl Default for ArchiveBlockProviderConfig {
2930
fn default() -> Self {
3031
Self {
3132
max_archive_to_memory_size: ByteSize::mb(100),
33+
enable_prefetching: true,
3234
}
3335
}
3436
}
@@ -61,7 +63,7 @@ impl ArchiveBlockProvider {
6163
}
6264

6365
async fn get_next_block_impl(&self, block_id: &BlockId) -> OptionalBlockStuff {
64-
let this = self.inner.as_ref();
66+
let this = &self.inner;
6567

6668
let next_mc_seqno = block_id.seqno + 1;
6769

@@ -95,7 +97,7 @@ impl ArchiveBlockProvider {
9597
}
9698

9799
async fn get_block_impl(&self, block_id_relation: &BlockIdRelation) -> OptionalBlockStuff {
98-
let this = self.inner.as_ref();
100+
let this = &self.inner;
99101

100102
let block_id = block_id_relation.block_id;
101103
let mc_block_id = block_id_relation.mc_block_id;
@@ -161,7 +163,7 @@ struct Inner {
161163
}
162164

163165
impl Inner {
164-
async fn get_archive(&self, mc_seqno: u32) -> Option<(u32, ArchiveInfo)> {
166+
async fn get_archive(self: &Arc<Self>, mc_seqno: u32) -> Option<(u32, ArchiveInfo)> {
165167
loop {
166168
let mut pending = 'pending: {
167169
let mut guard = self.known_archives.lock();
@@ -171,6 +173,19 @@ impl Inner {
171173
match value {
172174
ArchiveSlot::Downloaded(info) => {
173175
if info.archive.mc_block_ids.contains_key(&mc_seqno) {
176+
// Prefetch next archive if enabled
177+
if self.config.enable_prefetching {
178+
if let Some((last_seqno, _)) =
179+
info.archive.mc_block_ids.last_key_value()
180+
{
181+
let next_archive_start_seqno = last_seqno.saturating_add(1);
182+
let this = self.clone();
183+
tokio::spawn(async move {
184+
this.try_prefetch_archive(next_archive_start_seqno)
185+
.await;
186+
});
187+
}
188+
}
174189
return Some((*archive_key, info.clone()));
175190
}
176191
}
@@ -221,6 +236,21 @@ impl Inner {
221236
}
222237

223238
if finished {
239+
// Prefetch next archive if enabled
240+
if self.config.enable_prefetching {
241+
if let Some(info) = res
242+
.as_ref()
243+
.and_then(|i| i.archive.mc_block_ids.last_key_value())
244+
{
245+
let (last_seqno, _) = info;
246+
let next_archive_start_seqno = last_seqno.saturating_add(1);
247+
let this = self.clone();
248+
// Fire and forget prefetch
249+
tokio::spawn(async move {
250+
this.try_prefetch_archive(next_archive_start_seqno).await;
251+
});
252+
}
253+
}
224254
return res.map(|info| (pending.archive_key, info));
225255
}
226256

@@ -248,6 +278,39 @@ impl Inner {
248278
}
249279
}
250280

281+
/// Try to prefetch the archive containing `prefetch_seqno` if not already present or pending.
282+
async fn try_prefetch_archive(&self, prefetch_seqno: u32) {
283+
let mut guard = self.known_archives.lock();
284+
285+
// Check if archive already present or pending
286+
for value in guard.values() {
287+
match value {
288+
ArchiveSlot::Downloaded(info) => {
289+
if info.archive.mc_block_ids.contains_key(&prefetch_seqno) {
290+
tracing::trace!(
291+
prefetch_seqno,
292+
"archive already downloaded, skipping prefetch"
293+
);
294+
return;
295+
}
296+
}
297+
ArchiveSlot::Pending(task) => {
298+
if task.archive_key == prefetch_seqno {
299+
tracing::trace!(
300+
prefetch_seqno,
301+
"archive download already pending, skipping prefetch"
302+
);
303+
return;
304+
}
305+
}
306+
}
307+
}
308+
309+
let task = self.make_downloader().spawn(prefetch_seqno);
310+
guard.insert(prefetch_seqno, ArchiveSlot::Pending(task));
311+
tracing::debug!(prefetch_seqno, "starting archive prefetch");
312+
}
313+
251314
fn make_downloader(&self) -> ArchiveDownloader {
252315
ArchiveDownloader {
253316
client: self.client.clone(),

0 commit comments

Comments
 (0)