Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,7 @@ fn get_fsb_array_slice(
#[cfg(test)]
mod tests {
use super::*;
use std::cmp::Ordering;
use std::collections::HashMap;

use std::fs::File;
Expand Down Expand Up @@ -2918,8 +2919,29 @@ mod tests {
for column in row_group.columns() {
assert!(column.offset_index_offset().is_some());
assert!(column.offset_index_length().is_some());
assert!(column.column_index_offset().is_none());
assert!(column.column_index_length().is_none());
assert!(column.column_index_offset().is_some());
assert!(column.column_index_length().is_some());
}
}
assert!(file_meta_data.column_index().is_some());
if let Some(col_indexes) = file_meta_data.column_index() {
for rg_idx in col_indexes {
for idx in rg_idx {
let float_idx = match idx {
ColumnIndexMetaData::DOUBLE(idx) => idx,
_ => panic!("expected double statistics"),
};
for i in 0..idx.num_pages() as usize {
assert_eq!(
f64::NAN.total_cmp(float_idx.min_value(i).unwrap()),
Ordering::Equal
);
assert_eq!(
f64::NAN.total_cmp(float_idx.max_value(i).unwrap()),
Ordering::Equal
);
}
}
}
}
}
Expand Down
126 changes: 103 additions & 23 deletions parquet/src/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,8 @@ pub enum SortOrder {
UNSIGNED,
/// Comparison is undefined.
UNDEFINED,
/// Use IEEE 754 total order.
TOTAL_ORDER,
}

impl SortOrder {
Expand All @@ -1179,6 +1181,8 @@ pub enum ColumnOrder {
/// Column uses the order defined by its logical or physical type
/// (if there is no logical type), parquet-format 2.4.0+.
TYPE_DEFINED_ORDER(SortOrder),
/// Column ordering to use for floating point types.
IEEE_754_TOTAL_ORDER,
// The following are not defined in the Parquet spec and should always be last.
/// Undefined column order, means legacy behaviour before parquet-format 2.4.0.
/// Sort order is always SIGNED.
Expand All @@ -1199,14 +1203,36 @@ impl ColumnOrder {
converted_type: ConvertedType,
physical_type: Type,
) -> SortOrder {
Self::sort_order_for_type(logical_type.as_ref(), converted_type, physical_type)
Self::column_order_for_type(logical_type.as_ref(), converted_type, physical_type)
.sort_order()
}

/// Returns the `ColumnOrder` for a physical/logical type.
pub fn column_order_for_type(
logical_type: Option<&LogicalType>,
converted_type: ConvertedType,
physical_type: Type,
) -> ColumnOrder {
if Some(&LogicalType::Float16) == logical_type
|| matches!(physical_type, Type::FLOAT | Type::DOUBLE)
{
ColumnOrder::IEEE_754_TOTAL_ORDER
} else {
let sort_order =
Self::sort_order_for_type(logical_type, converted_type, physical_type, true);
ColumnOrder::TYPE_DEFINED_ORDER(sort_order)
}
}

/// Returns sort order for a physical/logical type.
///
/// `is_type_defined` indicates whether the column order for this type is
/// [`ColumnOrder::TYPE_DEFINED_ORDER`].
pub fn sort_order_for_type(
logical_type: Option<&LogicalType>,
converted_type: ConvertedType,
physical_type: Type,
is_type_defined: bool,
) -> SortOrder {
match logical_type {
Some(logical) => match logical {
Expand All @@ -1224,18 +1250,28 @@ impl ColumnOrder {
LogicalType::Timestamp { .. } => SortOrder::SIGNED,
LogicalType::Unknown => SortOrder::UNDEFINED,
LogicalType::Uuid => SortOrder::UNSIGNED,
LogicalType::Float16 => SortOrder::SIGNED,
LogicalType::Float16 => {
if is_type_defined {
SortOrder::SIGNED
} else {
SortOrder::TOTAL_ORDER
}
}
LogicalType::Variant { .. }
| LogicalType::Geometry { .. }
| LogicalType::Geography { .. }
| LogicalType::_Unknown { .. } => SortOrder::UNDEFINED,
},
// Fall back to converted type
None => Self::get_converted_sort_order(converted_type, physical_type),
None => Self::get_converted_sort_order(converted_type, physical_type, is_type_defined),
}
}

fn get_converted_sort_order(converted_type: ConvertedType, physical_type: Type) -> SortOrder {
fn get_converted_sort_order(
converted_type: ConvertedType,
physical_type: Type,
is_type_defined: bool,
) -> SortOrder {
match converted_type {
// Unsigned byte-wise comparison.
ConvertedType::UTF8
Expand Down Expand Up @@ -1270,24 +1306,35 @@ impl ColumnOrder {
}

// Fall back to physical type.
ConvertedType::NONE => Self::get_default_sort_order(physical_type),
ConvertedType::NONE => Self::get_default_sort_order(physical_type, is_type_defined),
}
}

/// Returns default sort order based on physical type.
fn get_default_sort_order(physical_type: Type) -> SortOrder {
fn get_default_sort_order(physical_type: Type, is_type_defined: bool) -> SortOrder {
match physical_type {
// Order: false, true
Type::BOOLEAN => SortOrder::UNSIGNED,
Type::INT32 | Type::INT64 => SortOrder::SIGNED,
Type::INT96 => SortOrder::UNDEFINED,
// Notes to remember when comparing float/double values:
// If the min is a NaN, it should be ignored.
// If the max is a NaN, it should be ignored.
// If the min is +0, the row group may contain -0 values as well.
// If the max is -0, the row group may contain +0 values as well.
// When looking for NaN values, min and max should be ignored.
Type::FLOAT | Type::DOUBLE => SortOrder::SIGNED,
// If legacy TYPE_DEFINED_ORDER is specified:
// If the min is a NaN, it should be ignored.
// If the max is a NaN, it should be ignored.
// If the min is +0, the row group may contain -0 values as well.
// If the max is -0, the row group may contain +0 values as well.
// When looking for NaN values, min and max should be ignored.
// If IEEE_754_TOTAL_ORDER:
// Examine nan_count to see if NaNs are present.
// If min/max are NaN, that means only NaNs are present.
// If min/max are not NaN, they are ordered according to total order.
Type::FLOAT | Type::DOUBLE => {
if is_type_defined {
SortOrder::SIGNED
} else {
SortOrder::TOTAL_ORDER
}
}
// Unsigned byte-wise comparison
Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => SortOrder::UNSIGNED,
}
Expand All @@ -1297,6 +1344,7 @@ impl ColumnOrder {
pub fn sort_order(&self) -> SortOrder {
match *self {
ColumnOrder::TYPE_DEFINED_ORDER(order) => order,
ColumnOrder::IEEE_754_TOTAL_ORDER => SortOrder::TOTAL_ORDER,
ColumnOrder::UNDEFINED => SortOrder::SIGNED,
ColumnOrder::UNKNOWN => SortOrder::UNDEFINED,
}
Expand All @@ -1315,6 +1363,10 @@ impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for ColumnOrder {
prot.skip_empty_struct()?;
Self::TYPE_DEFINED_ORDER(SortOrder::SIGNED)
}
2 => {
prot.skip_empty_struct()?;
Self::IEEE_754_TOTAL_ORDER
}
_ => {
prot.skip(field_ident.field_type)?;
Self::UNKNOWN
Expand All @@ -1339,6 +1391,10 @@ impl WriteThrift for ColumnOrder {
writer.write_field_begin(FieldType::Struct, 1, 0)?;
writer.write_struct_end()?;
}
Self::IEEE_754_TOTAL_ORDER => {
writer.write_field_begin(FieldType::Struct, 2, 0)?;
writer.write_struct_end()?;
}
_ => return Err(general_err!("Attempt to write undefined ColumnOrder")),
}
// write end of struct for this union
Expand Down Expand Up @@ -2181,6 +2237,7 @@ mod tests {
assert_eq!(SortOrder::SIGNED.to_string(), "SIGNED");
assert_eq!(SortOrder::UNSIGNED.to_string(), "UNSIGNED");
assert_eq!(SortOrder::UNDEFINED.to_string(), "UNDEFINED");
assert_eq!(SortOrder::TOTAL_ORDER.to_string(), "TOTAL_ORDER");
}

#[test]
Expand All @@ -2197,6 +2254,10 @@ mod tests {
ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNDEFINED).to_string(),
"TYPE_DEFINED_ORDER(UNDEFINED)"
);
assert_eq!(
ColumnOrder::IEEE_754_TOTAL_ORDER.to_string(),
"IEEE_754_TOTAL_ORDER"
);
assert_eq!(ColumnOrder::UNDEFINED.to_string(), "UNDEFINED");
}

Expand All @@ -2213,7 +2274,12 @@ mod tests {
fn check_sort_order(types: Vec<LogicalType>, expected_order: SortOrder) {
for tpe in types {
assert_eq!(
ColumnOrder::get_sort_order(Some(tpe), ConvertedType::NONE, Type::BYTE_ARRAY),
ColumnOrder::column_order_for_type(
Some(&tpe),
ConvertedType::NONE,
Type::BYTE_ARRAY
)
.sort_order(),
expected_order
);
}
Expand Down Expand Up @@ -2292,10 +2358,12 @@ mod tests {
is_adjusted_to_u_t_c: true,
unit: TimeUnit::NANOS,
},
LogicalType::Float16,
];
check_sort_order(signed, SortOrder::SIGNED);

let float = vec![LogicalType::Float16];
check_sort_order(float, SortOrder::TOTAL_ORDER);

// Undefined comparison
let undefined = vec![
LogicalType::List,
Expand All @@ -2316,7 +2384,7 @@ mod tests {
fn check_sort_order(types: Vec<ConvertedType>, expected_order: SortOrder) {
for tpe in types {
assert_eq!(
ColumnOrder::get_sort_order(None, tpe, Type::BYTE_ARRAY),
ColumnOrder::column_order_for_type(None, tpe, Type::BYTE_ARRAY).sort_order(),
expected_order
);
}
Expand Down Expand Up @@ -2368,35 +2436,43 @@ mod tests {
fn test_column_order_get_default_sort_order() {
// Comparison based on physical type
assert_eq!(
ColumnOrder::get_default_sort_order(Type::BOOLEAN),
ColumnOrder::get_default_sort_order(Type::BOOLEAN, true),
SortOrder::UNSIGNED
);
assert_eq!(
ColumnOrder::get_default_sort_order(Type::INT32),
ColumnOrder::get_default_sort_order(Type::INT32, true),
SortOrder::SIGNED
);
assert_eq!(
ColumnOrder::get_default_sort_order(Type::INT64),
ColumnOrder::get_default_sort_order(Type::INT64, true),
SortOrder::SIGNED
);
assert_eq!(
ColumnOrder::get_default_sort_order(Type::INT96),
ColumnOrder::get_default_sort_order(Type::INT96, true),
SortOrder::UNDEFINED
);
assert_eq!(
ColumnOrder::get_default_sort_order(Type::FLOAT),
ColumnOrder::get_default_sort_order(Type::FLOAT, false),
SortOrder::TOTAL_ORDER
);
assert_eq!(
ColumnOrder::get_default_sort_order(Type::DOUBLE, false),
SortOrder::TOTAL_ORDER
);
assert_eq!(
ColumnOrder::get_default_sort_order(Type::FLOAT, true),
SortOrder::SIGNED
);
assert_eq!(
ColumnOrder::get_default_sort_order(Type::DOUBLE),
ColumnOrder::get_default_sort_order(Type::DOUBLE, true),
SortOrder::SIGNED
);
assert_eq!(
ColumnOrder::get_default_sort_order(Type::BYTE_ARRAY),
ColumnOrder::get_default_sort_order(Type::BYTE_ARRAY, true),
SortOrder::UNSIGNED
);
assert_eq!(
ColumnOrder::get_default_sort_order(Type::FIXED_LEN_BYTE_ARRAY),
ColumnOrder::get_default_sort_order(Type::FIXED_LEN_BYTE_ARRAY, true),
SortOrder::UNSIGNED
);
}
Expand All @@ -2415,6 +2491,10 @@ mod tests {
ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNDEFINED).sort_order(),
SortOrder::UNDEFINED
);
assert_eq!(
ColumnOrder::IEEE_754_TOTAL_ORDER.sort_order(),
SortOrder::TOTAL_ORDER
);
assert_eq!(ColumnOrder::UNDEFINED.sort_order(), SortOrder::SIGNED);
}

Expand Down
47 changes: 4 additions & 43 deletions parquet/src/column/writer/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
// under the License.

use bytes::Bytes;
use half::f16;

use crate::basic::{ConvertedType, Encoding, LogicalType, Type};
use crate::basic::{ConvertedType, Encoding};
use crate::bloom_filter::Sbbf;
use crate::column::writer::{
compare_greater, fallback_encoding, has_dictionary_support, is_nan, update_max, update_min,
compare_greater, fallback_encoding, has_dictionary_support, update_max, update_min,
};
use crate::data_type::DataType;
use crate::data_type::private::ParquetValueType;
Expand Down Expand Up @@ -330,19 +329,11 @@ where
T: ParquetValueType + 'a,
I: Iterator<Item = &'a T>,
{
let first = loop {
let next = iter.next()?;
if !is_nan(descr, next) {
break next;
}
};
let first = iter.next()?;

let mut min = first;
let mut max = first;
for val in iter {
if is_nan(descr, val) {
continue;
}
if compare_greater(descr, min, val) {
min = val;
}
Expand All @@ -351,37 +342,7 @@ where
}
}

// Float/Double statistics have special case for zero.
//
// If computed min is zero, whether negative or positive,
// the spec states that the min should be written as -0.0
// (negative zero)
//
// For max, it has similar logic but will be written as 0.0
// (positive zero)
let min = replace_zero(min, descr, -0.0);
let max = replace_zero(max, descr, 0.0);

Some((min, max))
}

#[inline]
fn replace_zero<T: ParquetValueType>(val: &T, descr: &ColumnDescriptor, replace: f32) -> T {
match T::PHYSICAL_TYPE {
Type::FLOAT if f32::from_le_bytes(val.as_bytes().try_into().unwrap()) == 0.0 => {
T::try_from_le_slice(&f32::to_le_bytes(replace)).unwrap()
}
Type::DOUBLE if f64::from_le_bytes(val.as_bytes().try_into().unwrap()) == 0.0 => {
T::try_from_le_slice(&f64::to_le_bytes(replace as f64)).unwrap()
}
Type::FIXED_LEN_BYTE_ARRAY
if descr.logical_type_ref() == Some(LogicalType::Float16).as_ref()
&& f16::from_le_bytes(val.as_bytes().try_into().unwrap()) == f16::NEG_ZERO =>
{
T::try_from_le_slice(&f16::to_le_bytes(f16::from_f32(replace))).unwrap()
}
_ => val.clone(),
}
Some((min.clone(), max.clone()))
}

fn update_geo_stats_accumulator<'a, T, I>(bounder: &mut dyn GeoStatsAccumulator, iter: I)
Expand Down
Loading
Loading