Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 7 additions & 21 deletions datafusion/core/tests/datasource/object_store_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,23 +231,13 @@ async fn query_multi_csv_file() {
);
}

/// Test that a CSV file split into byte ranges via repartitioning exercises
/// range-based object store access.
/// Test that a CSV file split into byte ranges via repartitioning produces
/// exactly one GET request per byte range — no extra requests for boundary seeking.
///
/// With a single file and `target_partitions=3`, the repartitioner produces
/// exactly 3 ranges. For each range, `calculate_range` calls
/// `find_first_newline` via a GET for every non-file boundary it touches
/// (the start boundary if `start > 0`, the end boundary if `end < file_size`),
/// plus one GET for the actual data — so 2 GETs for the first range (end scan
/// + data), 3 for the middle range (start scan + end scan + data), and 2 for
/// the last range (start scan + data) = 7 data GETs total. Additionally,
/// adjacent ranges share a boundary position, so each shared boundary is scanned
/// twice — once as the left range's end and again as the right range's start —
/// producing the duplicate GETs visible in the snapshot. Add the 1 HEAD for
/// file-size metadata = **8 total**.
///
/// This differs from the JSON reader which uses [`AlignedBoundaryStream`] and
/// needs only 1 GET per range.
/// exactly 3 ranges. Each range is served by a single [`AlignedBoundaryStream`]
/// which issues exactly one bounded `get_opts` call, so there are 3 data GETs
/// plus 1 HEAD (to determine file size) = **4 total**.
///
/// This test documents the current request pattern to catch regressions.
#[tokio::test]
Expand Down Expand Up @@ -275,15 +265,11 @@ async fn query_csv_file_with_byte_range_partitions() {
+---------+-------+-------+
------- Object Store Request Summary -------
RequestCountingObjectStore()
Total Requests: 8
Total Requests: 4
- GET (opts) path=csv_range_table.csv head=true
- GET (opts) path=csv_range_table.csv range=0-129
- GET (opts) path=csv_range_table.csv range=42-129
- GET (opts) path=csv_range_table.csv range=0-49
- GET (opts) path=csv_range_table.csv range=42-129
- GET (opts) path=csv_range_table.csv range=85-129
- GET (opts) path=csv_range_table.csv range=49-89
- GET (opts) path=csv_range_table.csv range=85-129
- GET (opts) path=csv_range_table.csv range=89-129
"
);
}
Expand Down
85 changes: 47 additions & 38 deletions datafusion/datasource-csv/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,23 @@

//! Execution plan for reading CSV files

use datafusion_datasource::boundary_stream::AlignedBoundaryStream;
use datafusion_datasource::projection::{ProjectionOpener, SplitProjection};
use datafusion_physical_plan::projection::ProjectionExprs;
use std::fmt;
use std::io::{Read, Seek, SeekFrom};
use std::io::Read;
use std::sync::Arc;
use std::task::Poll;

use datafusion_datasource::decoder::{DecoderDeserializer, deserialize_stream};
use datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
use datafusion_datasource::{
FileRange, ListingTableUrl, PartitionedFile, RangeCalculation, TableSchema,
as_file_source, calculate_range,
FileRange, ListingTableUrl, PartitionedFile, TableSchema, as_file_source,
};

use arrow::csv;
use datafusion_common::config::CsvOptions;
use datafusion_common::{DataFusionError, Result};
use datafusion_common::{DataFusionError, Result, exec_datafusion_err};
use datafusion_common_runtime::JoinSet;
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
Expand Down Expand Up @@ -367,43 +366,53 @@ impl FileOpener for CsvOpener {

Ok(Box::pin(async move {
// Current partition contains bytes [start_byte, end_byte) (might contain incomplete lines at boundaries)
let file_size = partitioned_file.object_meta.size;
let location = partitioned_file.object_meta.location;

if let Some(file_range) = partitioned_file.range.as_ref() {
let raw_start: u64 = file_range.start.try_into().map_err(|_| {
exec_datafusion_err!(
"Expected start range to fit in u64, got {}",
file_range.start
)
})?;
let raw_end: u64 = file_range.end.try_into().map_err(|_| {
exec_datafusion_err!(
"Expected end range to fit in u64, got {}",
file_range.end
)
})?;

let aligned_stream = AlignedBoundaryStream::new(
Arc::clone(&store),
location.clone(),
raw_start,
raw_end,
file_size,
terminator.unwrap_or(b'\n'),
)
.await?
.map_err(DataFusionError::from);

let decoder = config.builder().build_decoder();
let input = file_compression_type
.convert_stream(aligned_stream.boxed())?
.fuse();
let stream = deserialize_stream(
input,
DecoderDeserializer::new(CsvDecoder::new(decoder)),
);
return Ok(stream.map_err(Into::into).boxed());
}

let calculated_range =
calculate_range(&partitioned_file, &store, terminator).await?;

let range = match calculated_range {
RangeCalculation::Range(None) => None,
RangeCalculation::Range(Some(range)) => Some(range.into()),
RangeCalculation::TerminateEarly => {
return Ok(
futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed()
);
}
};

let options = GetOptions {
range,
..Default::default()
};

let result = store
.get_opts(&partitioned_file.object_meta.location, options)
.await?;
// No range specified — read the entire file
let options = GetOptions::default();
let result = store.get_opts(&location, options).await?;

match result.payload {
#[cfg(not(target_arch = "wasm32"))]
GetResultPayload::File(mut file, _) => {
let is_whole_file_scanned = partitioned_file.range.is_none();
let decoder = if is_whole_file_scanned {
// Don't seek if no range as breaks FIFO files
file_compression_type.convert_read(file)?
} else {
file.seek(SeekFrom::Start(result.range.start as _))?;
file_compression_type.convert_read(
file.take((result.range.end - result.range.start) as u64),
)?
};

GetResultPayload::File(file, _) => {
let decoder = file_compression_type.convert_read(file)?;
let mut reader = config.open(decoder)?;

// Use std::iter::from_fn to wrap execution of iterator's next() method.
Expand Down
1 change: 0 additions & 1 deletion datafusion/datasource-json/src/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
// https://github.com/apache/datafusion/issues/11143
#![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))]

pub mod boundary_stream;
pub mod file_format;
pub mod source;
pub mod utils;
Expand Down
3 changes: 1 addition & 2 deletions datafusion/datasource-json/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ use std::task::{Context, Poll};
use crate::file_format::JsonDecoder;
use crate::utils::{ChannelReader, JsonArrayToNdjsonReader};

use crate::boundary_stream::AlignedBoundaryStream;

use datafusion_common::error::{DataFusionError, Result};
use datafusion_common::exec_datafusion_err;
use datafusion_common_runtime::{JoinSet, SpawnedTask};
use datafusion_datasource::boundary_stream::AlignedBoundaryStream;
use datafusion_datasource::decoder::{DecoderDeserializer, deserialize_stream};
use datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Streaming boundary-aligned wrapper for newline-delimited JSON range reads.
//! Streaming boundary-aligned wrapper for newline-delimited JSON and CSV range reads.
//!
//! [`AlignedBoundaryStream`] wraps a raw byte stream and lazily aligns to
//! record (newline) boundaries, avoiding the need for separate `get_opts`
Expand Down Expand Up @@ -398,7 +398,7 @@ impl Stream for AlignedBoundaryStream {
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::{CHUNK_SIZES, make_chunked_store};
use crate::test_util::{CHUNK_SIZES, make_chunked_store};
use futures::TryStreamExt;

async fn collect_stream(stream: AlignedBoundaryStream) -> Vec<u8> {
Expand Down
130 changes: 4 additions & 126 deletions datafusion/datasource/src/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
//! A table that uses the `ObjectStore` listing capability
//! to get the list of files to process.

pub mod boundary_stream;
pub mod decoder;
pub mod display;
pub mod file;
Expand Down Expand Up @@ -56,11 +57,10 @@ pub use self::url::ListingTableUrl;
use crate::file_groups::FileGroup;
use chrono::TimeZone;
use datafusion_common::stats::Precision;
use datafusion_common::{ColumnStatistics, Result, TableReference, exec_datafusion_err};
use datafusion_common::{ColumnStatistics, Result, TableReference};
use datafusion_common::{ScalarValue, Statistics};
use datafusion_physical_expr::LexOrdering;
use futures::{Stream, StreamExt};
use object_store::{GetOptions, GetRange, ObjectStore};
use futures::Stream;
use object_store::{ObjectMeta, path::Path};
pub use table_schema::{TableSchema, TableSchemaBuilder};
// Remove when add_row_stats is remove
Expand Down Expand Up @@ -429,101 +429,6 @@ pub enum RangeCalculation {
TerminateEarly,
}

/// Calculates an appropriate byte range for reading from an object based on the
/// provided metadata.
///
/// This asynchronous function examines the [`PartitionedFile`] of an object in an object store
/// and determines the range of bytes to be read. The range calculation may adjust
/// the start and end points to align with meaningful data boundaries (like newlines).
///
/// Returns a `Result` wrapping a [`RangeCalculation`], which is either a calculated byte range or an indication to terminate early.
///
/// Returns an `Error` if any part of the range calculation fails, such as issues in reading from the object store or invalid range boundaries.
pub async fn calculate_range(
file: &PartitionedFile,
store: &Arc<dyn ObjectStore>,
terminator: Option<u8>,
) -> Result<RangeCalculation> {
let location = &file.object_meta.location;
let file_size = file.object_meta.size;
let newline = terminator.unwrap_or(b'\n');

match file.range {
None => Ok(RangeCalculation::Range(None)),
Some(FileRange { start, end }) => {
let start: u64 = start.try_into().map_err(|_| {
exec_datafusion_err!("Expect start range to fit in u64, got {start}")
})?;
let end: u64 = end.try_into().map_err(|_| {
exec_datafusion_err!("Expect end range to fit in u64, got {end}")
})?;

let start_delta = if start != 0 {
find_first_newline(store, location, start - 1, file_size, newline).await?
} else {
0
};

if start + start_delta > end {
return Ok(RangeCalculation::TerminateEarly);
}

let end_delta = if end != file_size {
find_first_newline(store, location, end - 1, file_size, newline).await?
} else {
0
};

let range = start + start_delta..end + end_delta;

if range.start >= range.end {
return Ok(RangeCalculation::TerminateEarly);
}

Ok(RangeCalculation::Range(Some(range)))
}
}
}

/// Asynchronously finds the position of the first newline character in a specified byte range
/// within an object, such as a file, in an object store.
///
/// This function scans the contents of the object starting from the specified `start` position
/// up to the `end` position, looking for the first occurrence of a newline character.
/// It returns the position of the first newline relative to the start of the range.
///
/// Returns a `Result` wrapping a `usize` that represents the position of the first newline character found within the specified range. If no newline is found, it returns the length of the scanned data, effectively indicating the end of the range.
///
/// The function returns an `Error` if any issues arise while reading from the object store or processing the data stream.
async fn find_first_newline(
object_store: &Arc<dyn ObjectStore>,
location: &Path,
start: u64,
end: u64,
newline: u8,
) -> Result<u64> {
let options = GetOptions {
range: Some(GetRange::Bounded(start..end)),
..Default::default()
};

let result = object_store.get_opts(location, options).await?;
let mut result_stream = result.into_stream();

let mut index = 0;

while let Some(chunk) = result_stream.next().await.transpose()? {
if let Some(position) = chunk.iter().position(|&byte| byte == newline) {
let position = position as u64;
return Ok(index + position);
}

index += chunk.len() as u64;
}

Ok(index)
}

/// Generates test files with min-max statistics in different overlap patterns.
///
/// Used by tests and benchmarks.
Expand Down Expand Up @@ -647,7 +552,7 @@ mod tests {
use datafusion_execution::object_store::{
DefaultObjectStoreRegistry, ObjectStoreRegistry,
};
use object_store::{ObjectStoreExt, local::LocalFileSystem, path::Path};
use object_store::{local::LocalFileSystem, path::Path};
use std::{collections::HashMap, ops::Not, sync::Arc};
use url::Url;

Expand Down Expand Up @@ -844,31 +749,4 @@ mod tests {
// testing an empty path with `ignore_subdirectory` set to false
assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), false));
}

/// Regression test for <https://github.com/apache/datafusion/issues/19605>
#[tokio::test]
async fn test_calculate_range_single_line_file() {
use super::{PartitionedFile, RangeCalculation, calculate_range};
use object_store::ObjectStore;
use object_store::memory::InMemory;

let content = r#"{"id":1,"data":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}"#;
let file_size = content.len() as u64;

let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = Path::from("test.json");
store.put(&path, content.into()).await.unwrap();

let mid = file_size / 2;
let partitioned_file = PartitionedFile::new_with_range(
path.to_string(),
file_size,
mid as i64,
file_size as i64,
);

let result = calculate_range(&partitioned_file, &store, None).await;

assert!(matches!(result, Ok(RangeCalculation::TerminateEarly)));
}
}
29 changes: 29 additions & 0 deletions datafusion/datasource/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,32 @@ impl FileSource for MockSource {
pub(crate) fn col(name: &str, schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::new(Column::new_with_schema(name, schema)?))
}

/// Chunk sizes exercised by every parameterised test.
///
/// `usize::MAX` is intentionally included: `ChunkedStore` treats it as
/// "one chunk containing everything", giving the single-chunk fast path.
pub(crate) const CHUNK_SIZES: &[usize] = &[1, 2, 3, 4, 5, 7, 8, 11, 13, 16, usize::MAX];

/// Seed a fresh `InMemory` store with `data` and wrap it in a
/// [`ChunkedStore`] that splits every GET response into `chunk_size`-byte
/// pieces.
pub(crate) async fn make_chunked_store(
data: &[u8],
chunk_size: usize,
) -> (Arc<dyn ObjectStore>, object_store::path::Path) {
use bytes::Bytes;
use object_store::ObjectStoreExt;
use object_store::PutPayload;
use object_store::chunked::ChunkedStore;
use object_store::memory::InMemory;
use object_store::path::Path;

let inner = Arc::new(InMemory::new());
let path = Path::from("test");
inner
.put(&path, PutPayload::from(Bytes::copy_from_slice(data)))
.await
.unwrap();
(Arc::new(ChunkedStore::new(inner, chunk_size)), path)
}