diff --git a/datafusion/core/tests/datasource/object_store_access.rs b/datafusion/core/tests/datasource/object_store_access.rs index 25150ae284cc0..2503de862e06a 100644 --- a/datafusion/core/tests/datasource/object_store_access.rs +++ b/datafusion/core/tests/datasource/object_store_access.rs @@ -231,23 +231,13 @@ async fn query_multi_csv_file() { ); } -/// Test that a CSV file split into byte ranges via repartitioning exercises -/// range-based object store access. +/// Test that a CSV file split into byte ranges via repartitioning produces +/// exactly one GET request per byte range — no extra requests for boundary seeking. /// /// With a single file and `target_partitions=3`, the repartitioner produces -/// exactly 3 ranges. For each range, `calculate_range` calls -/// `find_first_newline` via a GET for every non-file boundary it touches -/// (the start boundary if `start > 0`, the end boundary if `end < file_size`), -/// plus one GET for the actual data — so 2 GETs for the first range (end scan -/// + data), 3 for the middle range (start scan + end scan + data), and 2 for -/// the last range (start scan + data) = 7 data GETs total. Additionally, -/// adjacent ranges share a boundary position, so each shared boundary is scanned -/// twice — once as the left range's end and again as the right range's start — -/// producing the duplicate GETs visible in the snapshot. Add the 1 HEAD for -/// file-size metadata = **8 total**. -/// -/// This differs from the JSON reader which uses [`AlignedBoundaryStream`] and -/// needs only 1 GET per range. +/// exactly 3 ranges. Each range is served by a single [`AlignedBoundaryStream`] +/// which issues exactly one bounded `get_opts` call, so there are 3 data GETs +/// plus 1 HEAD (to determine file size) = **4 total**. /// /// This test documents the current request pattern to catch regressions. #[tokio::test] @@ -275,15 +265,11 @@ async fn query_csv_file_with_byte_range_partitions() { +---------+-------+-------+ ------- Object Store Request Summary ------- RequestCountingObjectStore() - Total Requests: 8 + Total Requests: 4 - GET (opts) path=csv_range_table.csv head=true + - GET (opts) path=csv_range_table.csv range=0-129 - GET (opts) path=csv_range_table.csv range=42-129 - - GET (opts) path=csv_range_table.csv range=0-49 - - GET (opts) path=csv_range_table.csv range=42-129 - - GET (opts) path=csv_range_table.csv range=85-129 - - GET (opts) path=csv_range_table.csv range=49-89 - GET (opts) path=csv_range_table.csv range=85-129 - - GET (opts) path=csv_range_table.csv range=89-129 " ); } diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index 638279f827344..25ec311880405 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -17,24 +17,23 @@ //! Execution plan for reading CSV files +use datafusion_datasource::boundary_stream::AlignedBoundaryStream; use datafusion_datasource::projection::{ProjectionOpener, SplitProjection}; use datafusion_physical_plan::projection::ProjectionExprs; use std::fmt; -use std::io::{Read, Seek, SeekFrom}; +use std::io::Read; use std::sync::Arc; -use std::task::Poll; use datafusion_datasource::decoder::{DecoderDeserializer, deserialize_stream}; use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::{ - FileRange, ListingTableUrl, PartitionedFile, RangeCalculation, TableSchema, - as_file_source, calculate_range, + FileRange, ListingTableUrl, PartitionedFile, TableSchema, as_file_source, }; use arrow::csv; use datafusion_common::config::CsvOptions; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::{DataFusionError, Result, exec_datafusion_err}; use datafusion_common_runtime::JoinSet; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; @@ -367,43 +366,53 @@ impl FileOpener for CsvOpener { Ok(Box::pin(async move { // Current partition contains bytes [start_byte, end_byte) (might contain incomplete lines at boundaries) + let file_size = partitioned_file.object_meta.size; + let location = partitioned_file.object_meta.location; + + if let Some(file_range) = partitioned_file.range.as_ref() { + let raw_start: u64 = file_range.start.try_into().map_err(|_| { + exec_datafusion_err!( + "Expected start range to fit in u64, got {}", + file_range.start + ) + })?; + let raw_end: u64 = file_range.end.try_into().map_err(|_| { + exec_datafusion_err!( + "Expected end range to fit in u64, got {}", + file_range.end + ) + })?; + + let aligned_stream = AlignedBoundaryStream::new( + Arc::clone(&store), + location.clone(), + raw_start, + raw_end, + file_size, + terminator.unwrap_or(b'\n'), + ) + .await? + .map_err(DataFusionError::from); + + let decoder = config.builder().build_decoder(); + let input = file_compression_type + .convert_stream(aligned_stream.boxed())? + .fuse(); + let stream = deserialize_stream( + input, + DecoderDeserializer::new(CsvDecoder::new(decoder)), + ); + return Ok(stream.map_err(Into::into).boxed()); + } - let calculated_range = - calculate_range(&partitioned_file, &store, terminator).await?; - - let range = match calculated_range { - RangeCalculation::Range(None) => None, - RangeCalculation::Range(Some(range)) => Some(range.into()), - RangeCalculation::TerminateEarly => { - return Ok( - futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed() - ); - } - }; - - let options = GetOptions { - range, - ..Default::default() - }; - - let result = store - .get_opts(&partitioned_file.object_meta.location, options) - .await?; + // No range specified — read the entire file + let options = GetOptions::default(); + let result = store.get_opts(&location, options).await?; match result.payload { #[cfg(not(target_arch = "wasm32"))] - GetResultPayload::File(mut file, _) => { - let is_whole_file_scanned = partitioned_file.range.is_none(); - let decoder = if is_whole_file_scanned { - // Don't seek if no range as breaks FIFO files - file_compression_type.convert_read(file)? - } else { - file.seek(SeekFrom::Start(result.range.start as _))?; - file_compression_type.convert_read( - file.take((result.range.end - result.range.start) as u64), - )? - }; - + GetResultPayload::File(file, _) => { + let decoder = file_compression_type.convert_read(file)?; let mut reader = config.open(decoder)?; // Use std::iter::from_fn to wrap execution of iterator's next() method. diff --git a/datafusion/datasource-json/src/mod.rs b/datafusion/datasource-json/src/mod.rs index f7932c8a21d95..ec93fc9c387e2 100644 --- a/datafusion/datasource-json/src/mod.rs +++ b/datafusion/datasource-json/src/mod.rs @@ -20,7 +20,6 @@ // https://github.com/apache/datafusion/issues/11143 #![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))] -pub mod boundary_stream; pub mod file_format; pub mod source; pub mod utils; diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index 179870673d426..8632d6b942bc1 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -25,11 +25,10 @@ use std::task::{Context, Poll}; use crate::file_format::JsonDecoder; use crate::utils::{ChannelReader, JsonArrayToNdjsonReader}; -use crate::boundary_stream::AlignedBoundaryStream; - use datafusion_common::error::{DataFusionError, Result}; use datafusion_common::exec_datafusion_err; use datafusion_common_runtime::{JoinSet, SpawnedTask}; +use datafusion_datasource::boundary_stream::AlignedBoundaryStream; use datafusion_datasource::decoder::{DecoderDeserializer, deserialize_stream}; use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; diff --git a/datafusion/datasource-json/src/boundary_stream.rs b/datafusion/datasource/src/boundary_stream.rs similarity index 99% rename from datafusion/datasource-json/src/boundary_stream.rs rename to datafusion/datasource/src/boundary_stream.rs index 847c80279a53e..7b1cfb814df31 100644 --- a/datafusion/datasource-json/src/boundary_stream.rs +++ b/datafusion/datasource/src/boundary_stream.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Streaming boundary-aligned wrapper for newline-delimited JSON range reads. +//! Streaming boundary-aligned wrapper for newline-delimited JSON and CSV range reads. //! //! [`AlignedBoundaryStream`] wraps a raw byte stream and lazily aligns to //! record (newline) boundaries, avoiding the need for separate `get_opts` @@ -398,7 +398,7 @@ impl Stream for AlignedBoundaryStream { #[cfg(test)] mod tests { use super::*; - use crate::test_utils::{CHUNK_SIZES, make_chunked_store}; + use crate::test_util::{CHUNK_SIZES, make_chunked_store}; use futures::TryStreamExt; async fn collect_stream(stream: AlignedBoundaryStream) -> Vec { diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index 82030e545a42e..98204adbed9f1 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -28,6 +28,7 @@ //! A table that uses the `ObjectStore` listing capability //! to get the list of files to process. +pub mod boundary_stream; pub mod decoder; pub mod display; pub mod file; @@ -56,11 +57,10 @@ pub use self::url::ListingTableUrl; use crate::file_groups::FileGroup; use chrono::TimeZone; use datafusion_common::stats::Precision; -use datafusion_common::{ColumnStatistics, Result, TableReference, exec_datafusion_err}; +use datafusion_common::{ColumnStatistics, Result, TableReference}; use datafusion_common::{ScalarValue, Statistics}; use datafusion_physical_expr::LexOrdering; -use futures::{Stream, StreamExt}; -use object_store::{GetOptions, GetRange, ObjectStore}; +use futures::Stream; use object_store::{ObjectMeta, path::Path}; pub use table_schema::{TableSchema, TableSchemaBuilder}; // Remove when add_row_stats is remove @@ -429,101 +429,6 @@ pub enum RangeCalculation { TerminateEarly, } -/// Calculates an appropriate byte range for reading from an object based on the -/// provided metadata. -/// -/// This asynchronous function examines the [`PartitionedFile`] of an object in an object store -/// and determines the range of bytes to be read. The range calculation may adjust -/// the start and end points to align with meaningful data boundaries (like newlines). -/// -/// Returns a `Result` wrapping a [`RangeCalculation`], which is either a calculated byte range or an indication to terminate early. -/// -/// Returns an `Error` if any part of the range calculation fails, such as issues in reading from the object store or invalid range boundaries. -pub async fn calculate_range( - file: &PartitionedFile, - store: &Arc, - terminator: Option, -) -> Result { - let location = &file.object_meta.location; - let file_size = file.object_meta.size; - let newline = terminator.unwrap_or(b'\n'); - - match file.range { - None => Ok(RangeCalculation::Range(None)), - Some(FileRange { start, end }) => { - let start: u64 = start.try_into().map_err(|_| { - exec_datafusion_err!("Expect start range to fit in u64, got {start}") - })?; - let end: u64 = end.try_into().map_err(|_| { - exec_datafusion_err!("Expect end range to fit in u64, got {end}") - })?; - - let start_delta = if start != 0 { - find_first_newline(store, location, start - 1, file_size, newline).await? - } else { - 0 - }; - - if start + start_delta > end { - return Ok(RangeCalculation::TerminateEarly); - } - - let end_delta = if end != file_size { - find_first_newline(store, location, end - 1, file_size, newline).await? - } else { - 0 - }; - - let range = start + start_delta..end + end_delta; - - if range.start >= range.end { - return Ok(RangeCalculation::TerminateEarly); - } - - Ok(RangeCalculation::Range(Some(range))) - } - } -} - -/// Asynchronously finds the position of the first newline character in a specified byte range -/// within an object, such as a file, in an object store. -/// -/// This function scans the contents of the object starting from the specified `start` position -/// up to the `end` position, looking for the first occurrence of a newline character. -/// It returns the position of the first newline relative to the start of the range. -/// -/// Returns a `Result` wrapping a `usize` that represents the position of the first newline character found within the specified range. If no newline is found, it returns the length of the scanned data, effectively indicating the end of the range. -/// -/// The function returns an `Error` if any issues arise while reading from the object store or processing the data stream. -async fn find_first_newline( - object_store: &Arc, - location: &Path, - start: u64, - end: u64, - newline: u8, -) -> Result { - let options = GetOptions { - range: Some(GetRange::Bounded(start..end)), - ..Default::default() - }; - - let result = object_store.get_opts(location, options).await?; - let mut result_stream = result.into_stream(); - - let mut index = 0; - - while let Some(chunk) = result_stream.next().await.transpose()? { - if let Some(position) = chunk.iter().position(|&byte| byte == newline) { - let position = position as u64; - return Ok(index + position); - } - - index += chunk.len() as u64; - } - - Ok(index) -} - /// Generates test files with min-max statistics in different overlap patterns. /// /// Used by tests and benchmarks. @@ -647,7 +552,7 @@ mod tests { use datafusion_execution::object_store::{ DefaultObjectStoreRegistry, ObjectStoreRegistry, }; - use object_store::{ObjectStoreExt, local::LocalFileSystem, path::Path}; + use object_store::{local::LocalFileSystem, path::Path}; use std::{collections::HashMap, ops::Not, sync::Arc}; use url::Url; @@ -844,31 +749,4 @@ mod tests { // testing an empty path with `ignore_subdirectory` set to false assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), false)); } - - /// Regression test for - #[tokio::test] - async fn test_calculate_range_single_line_file() { - use super::{PartitionedFile, RangeCalculation, calculate_range}; - use object_store::ObjectStore; - use object_store::memory::InMemory; - - let content = r#"{"id":1,"data":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}"#; - let file_size = content.len() as u64; - - let store: Arc = Arc::new(InMemory::new()); - let path = Path::from("test.json"); - store.put(&path, content.into()).await.unwrap(); - - let mid = file_size / 2; - let partitioned_file = PartitionedFile::new_with_range( - path.to_string(), - file_size, - mid as i64, - file_size as i64, - ); - - let result = calculate_range(&partitioned_file, &store, None).await; - - assert!(matches!(result, Ok(RangeCalculation::TerminateEarly))); - } } diff --git a/datafusion/datasource/src/test_util.rs b/datafusion/datasource/src/test_util.rs index d35ed5feb51de..5bef6d44e1408 100644 --- a/datafusion/datasource/src/test_util.rs +++ b/datafusion/datasource/src/test_util.rs @@ -131,3 +131,32 @@ impl FileSource for MockSource { pub(crate) fn col(name: &str, schema: &Schema) -> Result> { Ok(Arc::new(Column::new_with_schema(name, schema)?)) } + +/// Chunk sizes exercised by every parameterised test. +/// +/// `usize::MAX` is intentionally included: `ChunkedStore` treats it as +/// "one chunk containing everything", giving the single-chunk fast path. +pub(crate) const CHUNK_SIZES: &[usize] = &[1, 2, 3, 4, 5, 7, 8, 11, 13, 16, usize::MAX]; + +/// Seed a fresh `InMemory` store with `data` and wrap it in a +/// [`ChunkedStore`] that splits every GET response into `chunk_size`-byte +/// pieces. +pub(crate) async fn make_chunked_store( + data: &[u8], + chunk_size: usize, +) -> (Arc, object_store::path::Path) { + use bytes::Bytes; + use object_store::ObjectStoreExt; + use object_store::PutPayload; + use object_store::chunked::ChunkedStore; + use object_store::memory::InMemory; + use object_store::path::Path; + + let inner = Arc::new(InMemory::new()); + let path = Path::from("test"); + inner + .put(&path, PutPayload::from(Bytes::copy_from_slice(data))) + .await + .unwrap(); + (Arc::new(ChunkedStore::new(inner, chunk_size)), path) +}