Skip to content

Commit 6f9a59a

Browse files
authored
refactor: Error cleanup (#202)
## Description <!-- A summary of what this pull request achieves and a rough list of changes. --> ## Breaking Changes <!-- Optional, if there are any breaking changes document them, including how to migrate older code. --> ## Notes & open questions Note: we might replace the probably overused io::Error and replace it with an enum done via n0_error. But that is a can of worms because it would require serializing n0_error error chains... ## Change checklist - [ ] Self-review. - [ ] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [ ] Tests if relevant. - [ ] All breaking changes documented.
1 parent 6ef70e3 commit 6f9a59a

File tree

7 files changed

+67
-75
lines changed

7 files changed

+67
-75
lines changed

src/api.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ impl From<EncodeError> for Error {
199199
fn from(value: EncodeError) -> Self {
200200
match value {
201201
EncodeError::Io(cause) => Self::Io(cause),
202-
_ => Self::other(value),
202+
_ => Self::Io(io::Error::other(value)),
203203
}
204204
}
205205
}

src/api/blobs.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -437,16 +437,13 @@ impl Blobs {
437437
mut reader: R,
438438
) -> RequestResult<R> {
439439
let mut size = [0; 8];
440-
reader
441-
.recv_exact(&mut size)
442-
.await
443-
.map_err(super::Error::other)?;
440+
reader.recv_exact(&mut size).await?;
444441
let size = u64::from_le_bytes(size);
445442
let Some(size) = NonZeroU64::new(size) else {
446443
return if hash == Hash::EMPTY {
447444
Ok(reader)
448445
} else {
449-
Err(super::Error::other("invalid size for hash").into())
446+
Err(io::Error::other("invalid size for hash").into())
450447
};
451448
};
452449
let tree = BaoTree::new(size.get(), IROH_BLOCK_SIZE);
@@ -651,7 +648,7 @@ impl<'a> AddProgress<'a> {
651648
_ => {}
652649
}
653650
}
654-
Err(super::Error::other("unexpected end of stream").into())
651+
Err(io::Error::other("unexpected end of stream").into())
655652
}
656653

657654
pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
@@ -704,7 +701,7 @@ impl IntoFuture for ObserveProgress {
704701
let mut rx = self.inner.await?;
705702
match rx.recv().await? {
706703
Some(bitfield) => Ok(bitfield),
707-
None => Err(super::Error::other("unexpected end of stream").into()),
704+
None => Err(io::Error::other("unexpected end of stream").into()),
708705
}
709706
})
710707
}
@@ -726,7 +723,7 @@ impl ObserveProgress {
726723
return Ok(item);
727724
}
728725
}
729-
Err(super::Error::other("unexpected end of stream").into())
726+
Err(io::Error::other("unexpected end of stream").into())
730727
}
731728

732729
/// Returns an infinite stream of bitfields. The first bitfield is the
@@ -805,7 +802,7 @@ impl ExportProgress {
805802
if let Some(size) = size {
806803
Ok(size)
807804
} else {
808-
Err(super::Error::other("unexpected end of stream").into())
805+
Err(io::Error::other("unexpected end of stream").into())
809806
}
810807
}
811808
}

src/store/fs.rs

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -731,7 +731,7 @@ impl HashSpecificCommand for ExportPathMsg {
731731
async fn on_error(self, arg: SpawnArg<EmParams>) {
732732
let err = match arg {
733733
SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
734-
SpawnArg::Dead => io::Error::other("entity is dead"),
734+
SpawnArg::Dead => err_entity_dead(),
735735
_ => unreachable!(),
736736
};
737737
self.tx
@@ -747,7 +747,7 @@ impl HashSpecificCommand for ExportBaoMsg {
747747
async fn on_error(self, arg: SpawnArg<EmParams>) {
748748
let err = match arg {
749749
SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
750-
SpawnArg::Dead => io::Error::other("entity is dead"),
750+
SpawnArg::Dead => err_entity_dead(),
751751
_ => unreachable!(),
752752
};
753753
self.tx
@@ -763,7 +763,7 @@ impl HashSpecificCommand for ExportRangesMsg {
763763
async fn on_error(self, arg: SpawnArg<EmParams>) {
764764
let err = match arg {
765765
SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
766-
SpawnArg::Dead => io::Error::other("entity is dead"),
766+
SpawnArg::Dead => err_entity_dead(),
767767
_ => unreachable!(),
768768
};
769769
self.tx
@@ -779,7 +779,7 @@ impl HashSpecificCommand for ImportBaoMsg {
779779
async fn on_error(self, arg: SpawnArg<EmParams>) {
780780
let err = match arg {
781781
SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
782-
SpawnArg::Dead => io::Error::other("entity is dead"),
782+
SpawnArg::Dead => err_entity_dead(),
783783
_ => unreachable!(),
784784
};
785785
self.tx.send(Err(api::Error::from(err))).await.ok();
@@ -798,13 +798,17 @@ impl HashSpecificCommand for (TempTag, ImportEntryMsg) {
798798
async fn on_error(self, arg: SpawnArg<EmParams>) {
799799
let err = match arg {
800800
SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
801-
SpawnArg::Dead => io::Error::other("entity is dead"),
801+
SpawnArg::Dead => err_entity_dead(),
802802
_ => unreachable!(),
803803
};
804804
self.1.tx.send(AddProgressItem::Error(err)).await.ok();
805805
}
806806
}
807807

808+
fn err_entity_dead() -> io::Error {
809+
io::Error::other("entity is dead")
810+
}
811+
808812
struct RtWrapper(Option<tokio::runtime::Runtime>);
809813

810814
impl From<tokio::runtime::Runtime> for RtWrapper {
@@ -849,7 +853,7 @@ async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>, ctx: A
849853
async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
850854
let BatchMsg { tx, mut rx, .. } = cmd;
851855
trace!("created scope {}", id);
852-
tx.send(id).await.map_err(api::Error::other)?;
856+
tx.send(id).await?;
853857
while let Some(msg) = rx.recv().await? {
854858
match msg {
855859
BatchResponse::Drop(msg) => scope.on_drop(&msg),
@@ -1262,9 +1266,7 @@ async fn export_path_impl(
12621266
MemOrFile::Mem(data) => data.len() as u64,
12631267
MemOrFile::File((_, size)) => *size,
12641268
};
1265-
tx.send(ExportProgressItem::Size(size))
1266-
.await
1267-
.map_err(api::Error::other)?;
1269+
tx.send(ExportProgressItem::Size(size)).await?;
12681270
match data {
12691271
MemOrFile::Mem(data) => {
12701272
let mut target = fs::File::create(&target)?;
@@ -1320,9 +1322,7 @@ async fn export_path_impl(
13201322
}
13211323
},
13221324
}
1323-
tx.send(ExportProgressItem::Done)
1324-
.await
1325-
.map_err(api::Error::other)?;
1325+
tx.send(ExportProgressItem::Done).await?;
13261326
Ok(())
13271327
}
13281328

@@ -1378,9 +1378,7 @@ async fn copy_with_progress<T: CopyProgress>(
13781378
let buf: &mut [u8] = &mut buf[..remaining];
13791379
file.read_exact_at(offset, buf)?;
13801380
target.write_all(buf)?;
1381-
tx.try_send(T::from_offset(offset))
1382-
.await
1383-
.map_err(|_e| io::Error::other(""))?;
1381+
tx.try_send(T::from_offset(offset)).await?;
13841382
yield_now().await;
13851383
offset += buf.len() as u64;
13861384
}

src/store/fs/import.rs

Lines changed: 9 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -196,12 +196,8 @@ async fn import_bytes_tiny_impl(
196196
let size = cmd.data.len() as u64;
197197
// send the required progress events
198198
// AddProgressItem::Done will be sent when finishing the import!
199-
tx.send(AddProgressItem::Size(size))
200-
.await
201-
.map_err(|_e| io::Error::other("error"))?;
202-
tx.send(AddProgressItem::CopyDone)
203-
.await
204-
.map_err(|_e| io::Error::other("error"))?;
199+
tx.send(AddProgressItem::Size(size)).await?;
200+
tx.send(AddProgressItem::CopyDone).await?;
205201
Ok(if raw_outboard_size(size) == 0 {
206202
// the thing is so small that it does not even need an outboard
207203
ImportEntry {
@@ -286,12 +282,8 @@ async fn import_byte_stream_impl(
286282
) -> io::Result<ImportEntry> {
287283
let ImportByteStreamRequest { format, scope } = cmd;
288284
let import_source = get_import_source(stream, tx, &options).await?;
289-
tx.send(AddProgressItem::Size(import_source.size()))
290-
.await
291-
.map_err(|_e| io::Error::other("error"))?;
292-
tx.send(AddProgressItem::CopyDone)
293-
.await
294-
.map_err(|_e| io::Error::other("error"))?;
285+
tx.send(AddProgressItem::Size(import_source.size())).await?;
286+
tx.send(AddProgressItem::CopyDone).await?;
295287
compute_outboard(import_source, format, scope, options, tx).await
296288
}
297289

@@ -344,18 +336,14 @@ async fn get_import_source(
344336
data.extend_from_slice(&chunk);
345337
}
346338
// todo: don't send progress for every chunk if the chunks are small?
347-
tx.try_send(AddProgressItem::CopyProgress(size))
348-
.await
349-
.map_err(|_e| io::Error::other("error"))?;
339+
tx.try_send(AddProgressItem::CopyProgress(size)).await?;
350340
}
351341
Ok(if let Some((mut file, temp_path)) = disk {
352342
while let Some(chunk) = stream.next().await {
353343
let chunk = chunk?;
354344
file.write_all(&chunk)?;
355345
size += chunk.len() as u64;
356-
tx.send(AddProgressItem::CopyProgress(size))
357-
.await
358-
.map_err(|_e| io::Error::other("error"))?;
346+
tx.send(AddProgressItem::CopyProgress(size)).await?;
359347
}
360348
ImportSource::TempFile(temp_path, file, size)
361349
} else {
@@ -473,14 +461,10 @@ async fn import_path_impl(
473461
}
474462

475463
let size = path.metadata()?.len();
476-
tx.send(AddProgressItem::Size(size))
477-
.await
478-
.map_err(|_e| io::Error::other("error"))?;
464+
tx.send(AddProgressItem::Size(size)).await?;
479465
let import_source = if size <= options.inline.max_data_inlined {
480466
let data = std::fs::read(path)?;
481-
tx.send(AddProgressItem::CopyDone)
482-
.await
483-
.map_err(|_e| io::Error::other("error"))?;
467+
tx.send(AddProgressItem::CopyDone).await?;
484468
ImportSource::Memory(data.into())
485469
} else if mode == ImportMode::TryReference {
486470
// reference where it is. We are going to need the file handle to
@@ -500,9 +484,7 @@ async fn import_path_impl(
500484
);
501485
// copy from path to temp_path
502486
let file = OpenOptions::new().read(true).open(&temp_path)?;
503-
tx.send(AddProgressItem::CopyDone)
504-
.await
505-
.map_err(|_| io::Error::other("error"))?;
487+
tx.send(AddProgressItem::CopyDone).await?;
506488
ImportSource::TempFile(temp_path, file, size)
507489
};
508490
compute_outboard(import_source, format, batch, options, tx).await

src/store/fs/meta.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ async fn handle_list_tags(msg: ListTagsMsg, tables: &impl ReadableTables) -> Act
363363
res.push(crate::api::Result::Ok(info));
364364
}
365365
}
366-
Err(e) => res.push(Err(crate::api::Error::other(e))),
366+
Err(e) => res.push(Err(api_error_from_storage_error(e))),
367367
}
368368
}
369369
tx.send(res).await.ok();
@@ -629,8 +629,12 @@ impl Actor {
629629
tx,
630630
..
631631
} = cmd;
632-
let res = tables.tags.insert(tag, value).map(|_| ());
633-
tx.send(res.map_err(crate::api::Error::other)).await.ok();
632+
let res = tables
633+
.tags
634+
.insert(tag, value)
635+
.map_err(api_error_from_storage_error)
636+
.map(|_| ());
637+
tx.send(res).await.ok();
634638
Ok(())
635639
}
636640

@@ -852,6 +856,13 @@ impl Actor {
852856
}
853857
}
854858

859+
/// Convert a redb StorageError into an api::Error
860+
///
861+
/// This can't be a From instance because that would require exposing redb::StorageError in the public API.
862+
fn api_error_from_storage_error(e: redb::StorageError) -> api::Error {
863+
api::Error::Io(io::Error::other(e))
864+
}
865+
855866
#[derive(Debug)]
856867
struct DbWrapper(Option<Database>);
857868

@@ -953,8 +964,12 @@ async fn list_blobs_impl(
953964
_cmd: ListRequest,
954965
tx: &mut mpsc::Sender<api::Result<Hash>>,
955966
) -> api::Result<()> {
956-
for item in snapshot.blobs.iter().map_err(api::Error::other)? {
957-
let (k, _) = item.map_err(api::Error::other)?;
967+
for item in snapshot
968+
.blobs
969+
.iter()
970+
.map_err(api_error_from_storage_error)?
971+
{
972+
let (k, _) = item.map_err(api_error_from_storage_error)?;
958973
let k = k.value();
959974
tx.send(Ok(k)).await.ok();
960975
}

src/store/mem.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -542,7 +542,7 @@ async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>) -> Sco
542542
async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
543543
let BatchMsg { tx, mut rx, .. } = cmd;
544544
trace!("created scope {}", id);
545-
tx.send(id).await.map_err(api::Error::other)?;
545+
tx.send(id).await?;
546546
while let Some(msg) = rx.recv().await? {
547547
match msg {
548548
BatchResponse::Drop(msg) => scope.on_drop(&msg),
@@ -837,8 +837,7 @@ async fn export_path_impl(
837837
entry.0.state.borrow().data().read_exact_at(offset, buf)?;
838838
file.write_all(buf)?;
839839
tx.try_send(ExportProgressItem::CopyProgress(offset))
840-
.await
841-
.map_err(|_e| io::Error::other(""))?;
840+
.await?;
842841
yield_now().await;
843842
}
844843
Ok(())

0 commit comments

Comments
 (0)