Skip to content

feat: accept iterables in RecordBatchReader.from_batches#495

Open
kevinjacobs-delfi wants to merge 6 commits intokylebarron:mainfrom
kevinjacobs-delfi:from-batches-iterable
Open

feat: accept iterables in RecordBatchReader.from_batches#495
kevinjacobs-delfi wants to merge 6 commits intokylebarron:mainfrom
kevinjacobs-delfi:from-batches-iterable

Conversation

@kevinjacobs-delfi
Copy link
Copy Markdown

Changes from_batches to accept any Python iterable (list, generator, etc.) instead of requiring a Sequence. Generators are consumed lazily, enabling streaming writes without materializing all batches in memory.

def batch_gen():
    for chunk in chunks:
        yield RecordBatch(...)

reader = RecordBatchReader.from_batches(schema, batch_gen())
write_parquet(reader, path, ...)

Closes #493

Changes from_batches to accept any Python iterable (list, generator,
etc.) instead of requiring a Sequence. Generators are consumed lazily,
enabling streaming writes without materializing all batches in memory.

Closes kylebarron#493

Signed-off-by: Kevin Jacobs <kevin.jacobs@delfidiagnostics.com>
Signed-off-by: Kevin Jacobs <kevin.jacobs@delfidiagnostics.com>
@kevinjacobs-delfi kevinjacobs-delfi changed the title Accept iterables in RecordBatchReader.from_batches feat: accept iterables in RecordBatchReader.from_batches Mar 27, 2026
@github-actions github-actions Bot added the feat label Mar 27, 2026
Comment on lines +22 to +30
/// A RecordBatchReader that lazily pulls batches from a Python iterator.
///
/// The `iter` field holds a Python object that has already been converted to
/// an iterator (via `__iter__`). Each call to `next()` acquires the GIL and
/// calls `__next__` on it.
struct PyIteratorRecordBatchReader {
schema: SchemaRef,
iter: Py<PyAny>,
}
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a performance regression for any existing case of a sequence of batches, because now it needs to acquire the GIL for every iteration

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a micro-benchmark, absolutely. In real world cases where record batches contain a meaningful amount of data, probably not. If you feel strongly, then I can probably special case Sequence from Iterable to win very microsecond back.

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think acquiring the GIL can have real-world performance overhead and I'd rather not introduce it for the existing approach where it doesn't need it.

You can define an enum like

enum PyRecordBatchReaderInput {
    Sequence(Vec<PyRecordBatch>)
    Iterator(PyRecordBatchIterator)
}

and then implement FromPyObject on that where first it checks if it's a sequence, and then extracts to the Sequence variant. And falls back to Iterator

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries. Done.

Comment thread pyo3-arrow/src/record_batch_reader.rs Outdated
Comment on lines +32 to +33
// Safety: Py<PyAny> is Send and only accessed while holding the GIL.
unsafe impl Send for PyIteratorRecordBatchReader {}
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it? If Py<PyAny> were Send then I don't think you'd need this unsafe impl.

PyAny is not Send, so Py<PyAny> is not Send

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right-- sorry. My Rust isn't very advanced. I'll fix.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Use eager extraction for sequences (no per-batch GIL overhead),
fall back to lazy iteration for generators and other iterables.

Signed-off-by: Kevin Jacobs <kevin.jacobs@delfidiagnostics.com>
Py<PyAny> is already Send in PyO3 0.28, so PyIteratorRecordBatchReader
auto-derives Send. The Send bound is enforced at the call site by
Box<dyn RecordBatchReader + Send>.

Signed-off-by: Kevin Jacobs <kevin.jacobs@delfidiagnostics.com>
Per review feedback, use a RecordBatchInput enum with FromPyObject
to dispatch between sequence (eager) and iterator (lazy) paths.

Signed-off-by: Kevin Jacobs <kevin.jacobs@delfidiagnostics.com>
Signed-off-by: Kevin Jacobs <kevin.jacobs@delfidiagnostics.com>
@kevinjacobs-delfi
Copy link
Copy Markdown
Author

Please let me know what else you need before merging this PR. I'm finding arro3 extremely useful and have more small fixes and improvements in the works. Thanks!

Comment on lines +26 to +27
Sequence(Vec<PyRecordBatch>),
Iterator(Py<PyAny>),
Copy link
Copy Markdown
Owner

@kylebarron kylebarron Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to double-check here what pyo3 auto-converts to a sequence.

It'll convert anything with an __iter__ and __len__ to a Vec<T>?

Perhaps it makes sense to make a one-off test function just to validate how it handles inputs. We don't want it automatically materializing iterators to a sequence.

if let Ok(vec) = ob.extract::<Vec<PyRecordBatch>>() {
Ok(Self::Sequence(vec))
} else {
let iter = ob.call_method0(intern!(ob.py(), "__iter__"))?;
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we should have a better error message. If __iter__ doesn't exist, we should print something like "RecordBatchInput requires a sequence or iterator of record batches"

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default the ? will give a somewhat opaque error message of just "__iter__ does not exist" or something like that.

Comment on lines +62 to +63
batches: The existing batches. Can be a list, generator, or any
iterable. Generators are consumed lazily.
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should clarify; iterators are probably also consumed lazily.

Comment on lines +277 to +290
RecordBatchInput::Sequence(vec) => {
let batches = vec
.into_iter()
.map(|batch| batch.into_inner())
.collect::<Vec<_>>();
Self::new(Box::new(RecordBatchIterator::new(
batches.into_iter().map(Ok),
schema,
)))
}
RecordBatchInput::Iterator(iter) => {
Self::new(Box::new(PyIteratorRecordBatchReader { schema, iter }))
}
}
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I'd move this onto a private method on RecordBatchInput named into_record_batch_iterator which returns a Box<dyn RecordBatchIterator>.

Then this call site can stay clean

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

RecordBatchReader.from_batches: accept Iterator in addition to Sequence

2 participants