Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions vortex-duckdb/cpp/include/duckdb_vx/vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ void duckdb_vx_vector_set_vector_data_buffer(duckdb_vector ffi_vector, duckdb_vx
// Set the data pointer for the vector. This is the start of the values array in the vector.
void duckdb_vx_vector_set_data_ptr(duckdb_vector ffi_vector, void *ptr);

// Set the validity pointer for the vector to external data, and store the buffer in auxiliary
// to keep it alive. This enables zero-copy export of validity masks.
void duckdb_vx_vector_set_validity_data(duckdb_vector ffi_vector, void *validity_ptr, idx_t capacity,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If validity_ptr points to buffer, just pass the buffer

duckdb_vx_vector_buffer buffer);

// Converts a duckdb flat vector into a Sequence vector.
void duckdb_vx_sequence_vector(duckdb_vector c_vector, int64_t start, int64_t step, idx_t capacity);

Expand Down
35 changes: 35 additions & 0 deletions vortex-duckdb/cpp/vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,21 @@ class DataVector : public Vector {
inline void SetDataPtr(data_ptr_t ptr) {
data = ptr;
};

inline ValidityMask &GetValidity() {
return validity;
};
};

// Same hack for ValidityMask: access protected fields via inheritance.
class ExternalValidityMask : public ValidityMask {
public:
inline void SetExternal(validity_t *ptr, idx_t cap,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, pass just the buffer and derive ptr from it

buffer_ptr<ValidityBuffer> keeper) {
validity_mask = ptr;
capacity = cap;
validity_data = std::move(keeper);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is right as TemplatedValidityMask(V* ptr, idx_t capacity) constructor sets the pointer but doesn't change validity_data, so these two aren't derived.

};
};

} // namespace vortex
Expand All @@ -81,6 +96,26 @@ extern "C" void duckdb_vx_vector_set_data_ptr(duckdb_vector ffi_vector, void *pt
dvector->SetDataPtr((data_ptr_t)ptr);
}

extern "C" void duckdb_vx_vector_set_validity_data(duckdb_vector ffi_vector,
void *validity_ptr,
idx_t capacity,
duckdb_vx_vector_buffer buffer) {
auto dvector = reinterpret_cast<vortex::DataVector *>(ffi_vector);
auto &validity = dvector->GetValidity();
auto ext_validity = reinterpret_cast<vortex::ExternalValidityMask *>(&validity);

// Use the shared_ptr aliasing constructor: the control block ref-counts the
// ExternalVectorBuffer (preventing the Rust buffer from being freed),
// while the stored pointer satisfies ValidityMask's buffer_ptr<ValidityBuffer> type.
auto ext_buf = reinterpret_cast<shared_ptr<vortex::ExternalVectorBuffer> *>(buffer);
auto keeper = shared_ptr<TemplatedValidityData<validity_t>>(
*ext_buf, reinterpret_cast<TemplatedValidityData<validity_t> *>(ext_buf->get()));

// Set validity_mask, capacity, and validity_data (which keeps the buffer alive).
ext_validity->SetExternal(reinterpret_cast<validity_t *>(validity_ptr), capacity,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically this will slice the class to base's validity, but as derived class doesn't have any members, it's fine. Worth adding a comment

std::move(keeper));
}

extern "C" duckdb_value duckdb_vx_vector_get_value(duckdb_vector ffi_vector, idx_t index) {
auto vector = reinterpret_cast<Vector *>(ffi_vector);
auto value = duckdb::make_uniq<Value>(vector->GetValue(index));
Expand Down
23 changes: 23 additions & 0 deletions vortex-duckdb/src/duckdb/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,29 @@ impl VectorRef {
unsafe { cpp::duckdb_vx_vector_set_data_ptr(self.as_ptr(), ptr as *mut c_void) }
}

/// Sets the validity pointer for the vector to external data, and stores the buffer in
/// auxiliary to keep it alive. This enables zero-copy export of validity masks.
///
/// # Safety
///
/// The `validity_ptr` must point to a valid `u64` array with at least
/// `capacity.div_ceil(64)` elements. The buffer must keep this memory alive.
pub unsafe fn set_validity_data(
&self,
validity_ptr: *mut u64,
capacity: usize,
buffer: &VectorBufferRef,
) {
unsafe {
cpp::duckdb_vx_vector_set_validity_data(
self.as_ptr(),
validity_ptr as *mut c_void,
capacity as idx_t,
buffer.as_ptr(),
)
}
}

/// Assigns the element at the specified index with a string value.
/// FIXME(ngates): remove this.
pub fn assign_string_element(&self, idx: usize, value: &CStr) {
Expand Down
2 changes: 1 addition & 1 deletion vortex-duckdb/src/exporter/constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl ColumnExporter for ConstantExporter {
match self.value.as_ref() {
None => {
// TODO(ngates): would be good if DuckDB supported constant null vectors.
unsafe { vector.set_validity(&Mask::AllFalse(len), 0, len) };
unsafe { vector.set_validity(&Mask::AllFalse(len), 0, len, None) };
}
Some(value) => {
vector.reference_value(value);
Expand Down
2 changes: 1 addition & 1 deletion vortex-duckdb/src/exporter/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl<O: IntegerPType> ColumnExporter for ListExporter<O> {
);

// Set validity if necessary.
if unsafe { vector.set_validity(&self.validity, offset, len) } {
if unsafe { vector.set_validity(&self.validity, offset, len, None) } {
// All values are null, so no point copying the data.
return Ok(());
}
Expand Down
2 changes: 1 addition & 1 deletion vortex-duckdb/src/exporter/list_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl<O: IntegerPType, S: IntegerPType> ColumnExporter for ListViewExporter<O, S>
);

// Set validity if necessary.
if unsafe { vector.set_validity(&self.validity, offset, len) } {
if unsafe { vector.set_validity(&self.validity, offset, len, None) } {
// All values are null, so no point copying the data.
return Ok(());
}
Expand Down
16 changes: 8 additions & 8 deletions vortex-duckdb/src/exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ mod tests {
let mut vector = Vector::with_capacity(&logical_type, 100);

let mask = Mask::AllTrue(10);
let all_null = unsafe { vector.set_validity(&mask, 0, 10) };
let all_null = unsafe { vector.set_validity(&mask, 0, 10, None) };

assert!(!all_null);
}
Expand All @@ -228,7 +228,7 @@ mod tests {
let len = 10;

let mask = Mask::AllFalse(len);
let all_null = unsafe { vector.set_validity(&mask, 0, len) };
let all_null = unsafe { vector.set_validity(&mask, 0, len, None) };

assert!(all_null);

Expand All @@ -246,7 +246,7 @@ mod tests {

let mask = Mask::from(BitBuffer::from(vec![true; 10]));

let all_null = unsafe { vector.set_validity(&mask, 0, 10) };
let all_null = unsafe { vector.set_validity(&mask, 0, 10, None) };

assert!(!all_null);

Expand All @@ -266,7 +266,7 @@ mod tests {
let bits = vec![false; LEN];
let mask = Mask::from(BitBuffer::from(bits));

let all_null = unsafe { vector.set_validity(&mask, 0, LEN) };
let all_null = unsafe { vector.set_validity(&mask, 0, LEN, None) };

assert!(all_null);

Expand All @@ -286,7 +286,7 @@ mod tests {
];
let mask = Mask::from(BitBuffer::from(bits.as_slice()));

let all_null = unsafe { vector.set_validity(&mask, 0, 10) };
let all_null = unsafe { vector.set_validity(&mask, 0, 10, None) };

assert!(!all_null);

Expand All @@ -306,7 +306,7 @@ mod tests {
];
let mask = Mask::from(BitBuffer::from(bits.as_slice()));

let all_null = unsafe { vector.set_validity(&mask, 2, 8) };
let all_null = unsafe { vector.set_validity(&mask, 2, 8, None) };

assert!(!all_null);

Expand All @@ -327,7 +327,7 @@ mod tests {
];
let mask = Mask::from(BitBuffer::from(bits.as_slice()));

let all_null = unsafe { vector.set_validity(&mask, 3, 5) };
let all_null = unsafe { vector.set_validity(&mask, 3, 5, None) };

assert!(!all_null);

Expand All @@ -345,7 +345,7 @@ mod tests {
let bits = (0..70).map(|i| i % 3 == 0).collect::<Vec<_>>();
let mask = Mask::from(BitBuffer::from(bits.as_slice()));

let all_null = unsafe { vector.set_validity(&mask, 5, 60) };
let all_null = unsafe { vector.set_validity(&mask, 5, 60, None) };

assert!(!all_null);

Expand Down
72 changes: 72 additions & 0 deletions vortex-duckdb/src/exporter/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,78 @@ mod tests {
);
}

#[test]
fn test_primitive_exporter_with_nulls() {
let arr = PrimitiveArray::from_option_iter([Some(10i32), None, Some(30), None, Some(50)]);

let mut chunk = DataChunk::new([LogicalType::new(cpp::duckdb_type::DUCKDB_TYPE_INTEGER)]);
let mut ctx = SESSION.create_execution_ctx();

new_exporter(arr, &mut ctx)
.unwrap()
.export(0, 5, chunk.get_vector_mut(0), &mut ctx)
.unwrap();
chunk.set_len(5);

assert_eq!(
format!("{}", String::try_from(&*chunk).unwrap()),
r#"Chunk - [1 Columns]
- FLAT INTEGER: 5 = [ 10, NULL, 30, NULL, 50]
"#
);
}

/// Export a large nullable primitive array over many chunks to exercise the
/// zero-copy validity path. The non-zero-copy fallback currently panics,
/// so this test proves every chunk goes through the zero-copy branch.
#[test]
fn test_primitive_exporter_with_nulls_zero_copy() {
let vector_size = duckdb_vector_size();
const NUM_CHUNKS: usize = 8;
let len = vector_size * NUM_CHUNKS;

// Every 3rd element is null — guarantees mixed validity in every chunk.
#[expect(clippy::cast_possible_truncation, reason = "test data fits in i32")]
let arr = PrimitiveArray::from_option_iter(
(0..len).map(|i| if i % 3 == 1 { None } else { Some(i as i32) }),
);

let mut ctx = SESSION.create_execution_ctx();
let exporter = new_exporter(arr, &mut ctx).unwrap();

for chunk_idx in 0..NUM_CHUNKS {
let mut chunk =
DataChunk::new([LogicalType::new(cpp::duckdb_type::DUCKDB_TYPE_INTEGER)]);

// This will panic if the non-zero-copy path is hit.
exporter
.export(
chunk_idx * vector_size,
vector_size,
chunk.get_vector_mut(0),
&mut ctx,
)
.unwrap();
chunk.set_len(vector_size);

let vec = chunk.get_vector(0);
for i in 0..vector_size {
let global_idx = chunk_idx * vector_size + i;
if global_idx % 3 == 1 {
assert!(
vec.row_is_null(i as u64),
"expected null at global index {global_idx}"
);
} else {
assert!(
!vec.row_is_null(i as u64),
"expected non-null at global index {global_idx}"
);
}
}
}
}

#[test]
fn test_long_primitive_exporter() {
let vector_size = duckdb_vector_size();
Expand Down
48 changes: 46 additions & 2 deletions vortex-duckdb/src/exporter/validity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,69 @@
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use vortex::array::ExecutionCtx;
use vortex::buffer::ByteBuffer;
use vortex::error::VortexResult;
use vortex::mask::Mask;

use crate::duckdb::VectorBuffer;
use crate::duckdb::VectorRef;
use crate::exporter::ColumnExporter;

struct ValidityExporter {
mask: Mask,
/// If the mask's bit buffer is u64-aligned with no sub-byte offset,
/// we can zero-copy it into DuckDB. We hold the VectorBuffer to keep
/// the underlying memory alive via DuckDB's ref-counting.
zero_copy: Option<ZeroCopyValidity>,
exporter: Box<dyn ColumnExporter>,
}

pub(super) struct ZeroCopyValidity {
/// The underlying byte buffer backing the validity bits.
pub(super) buffer: ByteBuffer,
pub(super) shared_buffer: VectorBuffer,
}

/// Returns true if the bit buffer can be zero-copied as a DuckDB validity mask.
///
/// Requirements:
/// - No sub-byte bit offset (offset == 0)
/// - The underlying byte buffer is u64-aligned
fn can_zero_copy_validity(mask: &Mask) -> bool {
let Mask::Values(values) = mask else {
return false;
};
let bit_buf = values.bit_buffer();
if bit_buf.offset() != 0 {
return false;
}
let inner = bit_buf.inner();
// Check u64 alignment of the underlying data pointer
(inner.as_slice().as_ptr() as usize).is_multiple_of(size_of::<u64>())
}

pub(crate) fn new_exporter(
mask: Mask,
exporter: Box<dyn ColumnExporter>,
) -> Box<dyn ColumnExporter> {
if mask.all_true() {
exporter
} else {
Box::new(ValidityExporter { mask, exporter })
let zero_copy = can_zero_copy_validity(&mask).then(|| {
let Mask::Values(values) = &mask else {
unreachable!()
};
let buffer = values.bit_buffer().inner().clone();
ZeroCopyValidity {
shared_buffer: VectorBuffer::new(buffer.clone()),
buffer,
}
});
Box::new(ValidityExporter {
mask,
zero_copy,
exporter,
})
}
}

Expand All @@ -36,7 +80,7 @@ impl ColumnExporter for ValidityExporter {
offset + len <= self.mask.len(),
"cannot access outside of array"
);
if unsafe { vector.set_validity(&self.mask, offset, len) } {
if unsafe { vector.set_validity(&self.mask, offset, len, self.zero_copy.as_ref()) } {
// All values are null, so no point copying the data.
return Ok(());
}
Expand Down
26 changes: 25 additions & 1 deletion vortex-duckdb/src/exporter/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,17 @@ use vortex::mask::Mask;
use crate::duckdb::Value;
use crate::duckdb::VectorRef;
use crate::exporter::copy_from_slice;
use crate::exporter::validity::ZeroCopyValidity;

impl VectorRef {
pub(super) unsafe fn set_validity(&mut self, mask: &Mask, offset: usize, len: usize) -> bool {
/// Returns true if all values are null (caller can skip data export).
pub(super) unsafe fn set_validity(
&mut self,
mask: &Mask,
offset: usize,
len: usize,
zero_copy: Option<&ZeroCopyValidity>,
) -> bool {
match mask {
Mask::AllTrue(_) => {
// We only need to blank out validity if there is already a slice allocated.
Expand All @@ -27,7 +35,23 @@ impl VectorRef {
unsafe { self.set_all_true_validity(len) }
} else if true_count == 0 {
self.set_all_false_validity()
} else if let Some(zc) = zero_copy.filter(|_| offset.is_multiple_of(64)) {
let u64_offset = offset / 64;
// SAFETY: the underlying buffer is u64-aligned (checked in
// can_zero_copy_validity) and we only read through this pointer.
// The cast to *mut is an artifact of the DuckDB C API.
let ptr = zc.buffer.as_slice().as_ptr().cast_mut().cast::<u64>();
// SAFETY: we verified alignment in can_zero_copy_validity
// and the VectorBuffer keeps the data alive.
unsafe { self.set_validity_data(ptr.add(u64_offset), len, &zc.shared_buffer) };
} else {
// If zero_copy is available and offset is aligned, we should
// have taken the branch above. Assert this invariant.
assert!(
zero_copy.is_none() || !offset.is_multiple_of(64),
"zero-copy validity available and offset {offset} is aligned \
but copy path was taken"
);
let source = arr.bit_buffer().inner().as_slice();
copy_from_slice(
unsafe { self.ensure_validity_slice(len) },
Expand Down
Loading