Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions arrow-json/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-cast = { workspace = true }
arrow-data = { workspace = true }
arrow-ord = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
half = { version = "2.1", default-features = false }
indexmap = { version = "2.0", default-features = false, features = ["std"] }
num-traits = { version = "0.2.19", default-features = false, features = ["std"] }
Expand Down
80 changes: 29 additions & 51 deletions arrow-json/src/reader/run_end_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
// under the License.

use std::marker::PhantomData;
use std::ops::Range;
use std::slice::from_ref;
use std::sync::Arc;

use arrow_array::types::RunEndIndexType;
use arrow_array::{Array, ArrayRef, PrimitiveArray, make_array, new_empty_array};
use arrow_buffer::{ArrowNativeType, ScalarBuffer};
use arrow_data::transform::MutableArrayData;
use arrow_data::{ArrayData, ArrayDataBuilder};
use arrow_array::{ArrayRef, RunArray, UInt32Array, new_empty_array};
use arrow_buffer::{ArrowNativeType, RunEndBuffer, ScalarBuffer};
use arrow_ord::partition::partition;
use arrow_schema::{ArrowError, DataType};
use arrow_select::take::take;

use crate::reader::tape::Tape;
use crate::reader::{ArrayDecoder, DecoderContext};
Expand Down Expand Up @@ -63,58 +66,33 @@ impl<R: RunEndIndexType + Send> ArrayDecoder for RunEndEncodedArrayDecoder<R> {
return Ok(new_empty_array(&self.data_type));
}

let flat_data = self.decoder.decode(tape, pos)?.to_data();
let flat_array = self.decoder.decode(tape, pos)?;

let mut run_ends: Vec<R::Native> = Vec::new();
let mut mutable = MutableArrayData::new(vec![&flat_data], false, len);
let partitions = partition(from_ref(&flat_array))?;
let size = partitions.len();
let mut run_ends = Vec::with_capacity(size);
let mut indices = Vec::with_capacity(size);

let mut run_start = 0;
for i in 1..len {
if !same_run(&flat_data, run_start, i) {
let run_end = R::Native::from_usize(i).ok_or_else(|| {
ArrowError::JsonError(format!(
"Run end value {i} exceeds {:?} range",
R::DATA_TYPE
))
})?;
run_ends.push(run_end);
mutable.extend(0, run_start, run_start + 1);
run_start = i;
}
for Range { start, end } in partitions.ranges() {
let run_end = R::Native::from_usize(end).ok_or_else(|| {
ArrowError::JsonError(format!(
"Run end value {end} exceeds {:?} range",
R::DATA_TYPE
))
})?;
run_ends.push(run_end);
indices.push(start);
}
let run_end = R::Native::from_usize(len).ok_or_else(|| {
ArrowError::JsonError(format!(
"Run end value {len} exceeds {:?} range",
R::DATA_TYPE
))
})?;
run_ends.push(run_end);
mutable.extend(0, run_start, run_start + 1);

let values_data = mutable.freeze();
let run_ends_data =
PrimitiveArray::<R>::new(ScalarBuffer::from(run_ends), None).into_data();
let indices = UInt32Array::from_iter_values(indices.into_iter().map(|i| i as u32));
let values = take(flat_array.as_ref(), &indices, None)?;

let data = ArrayDataBuilder::new(self.data_type.clone())
.len(len)
.add_child_data(run_ends_data)
.add_child_data(values_data);
// SAFETY: run_ends are strictly increasing with the last value equal to len
let run_ends = unsafe { RunEndBuffer::new_unchecked(ScalarBuffer::from(run_ends), 0, len) };

// Safety:
// run_ends are strictly increasing with the last value equal to len,
// and values has the same length as run_ends
Ok(make_array(unsafe { data.build_unchecked() }))
// SAFETY: run_ends are valid and values has the same length as run_ends
let array =
unsafe { RunArray::<R>::new_unchecked(self.data_type.clone(), run_ends, values) };
Ok(Arc::new(array))
}
}

fn same_run(data: &ArrayData, i: usize, j: usize) -> bool {
let null_i = data.is_null(i);
let null_j = data.is_null(j);
if null_i != null_j {
return false;
}
if null_i {
return true;
}
data.slice(i, 1) == data.slice(j, 1)
}
Loading