feat: accept iterables in RecordBatchReader.from_batches#495
feat: accept iterables in RecordBatchReader.from_batches#495kevinjacobs-delfi wants to merge 6 commits intokylebarron:mainfrom
Conversation
Changes from_batches to accept any Python iterable (list, generator, etc.) instead of requiring a Sequence. Generators are consumed lazily, enabling streaming writes without materializing all batches in memory. Closes kylebarron#493 Signed-off-by: Kevin Jacobs <kevin.jacobs@delfidiagnostics.com>
Signed-off-by: Kevin Jacobs <kevin.jacobs@delfidiagnostics.com>
| /// A RecordBatchReader that lazily pulls batches from a Python iterator. | ||
| /// | ||
| /// The `iter` field holds a Python object that has already been converted to | ||
| /// an iterator (via `__iter__`). Each call to `next()` acquires the GIL and | ||
| /// calls `__next__` on it. | ||
| struct PyIteratorRecordBatchReader { | ||
| schema: SchemaRef, | ||
| iter: Py<PyAny>, | ||
| } |
There was a problem hiding this comment.
This is a performance regression for any existing case of a sequence of batches, because now it needs to acquire the GIL for every iteration
There was a problem hiding this comment.
In a micro-benchmark, absolutely. In real world cases where record batches contain a meaningful amount of data, probably not. If you feel strongly, then I can probably special case Sequence from Iterable to win very microsecond back.
There was a problem hiding this comment.
I think acquiring the GIL can have real-world performance overhead and I'd rather not introduce it for the existing approach where it doesn't need it.
You can define an enum like
enum PyRecordBatchReaderInput {
Sequence(Vec<PyRecordBatch>)
Iterator(PyRecordBatchIterator)
}and then implement FromPyObject on that where first it checks if it's a sequence, and then extracts to the Sequence variant. And falls back to Iterator
| // Safety: Py<PyAny> is Send and only accessed while holding the GIL. | ||
| unsafe impl Send for PyIteratorRecordBatchReader {} |
There was a problem hiding this comment.
Is it? If Py<PyAny> were Send then I don't think you'd need this unsafe impl.
PyAny is not Send, so Py<PyAny> is not Send
There was a problem hiding this comment.
You're right-- sorry. My Rust isn't very advanced. I'll fix.
Use eager extraction for sequences (no per-batch GIL overhead), fall back to lazy iteration for generators and other iterables. Signed-off-by: Kevin Jacobs <kevin.jacobs@delfidiagnostics.com>
Py<PyAny> is already Send in PyO3 0.28, so PyIteratorRecordBatchReader auto-derives Send. The Send bound is enforced at the call site by Box<dyn RecordBatchReader + Send>. Signed-off-by: Kevin Jacobs <kevin.jacobs@delfidiagnostics.com>
Per review feedback, use a RecordBatchInput enum with FromPyObject to dispatch between sequence (eager) and iterator (lazy) paths. Signed-off-by: Kevin Jacobs <kevin.jacobs@delfidiagnostics.com>
Signed-off-by: Kevin Jacobs <kevin.jacobs@delfidiagnostics.com>
|
Please let me know what else you need before merging this PR. I'm finding |
| Sequence(Vec<PyRecordBatch>), | ||
| Iterator(Py<PyAny>), |
There was a problem hiding this comment.
It would be good to double-check here what pyo3 auto-converts to a sequence.
It'll convert anything with an __iter__ and __len__ to a Vec<T>?
Perhaps it makes sense to make a one-off test function just to validate how it handles inputs. We don't want it automatically materializing iterators to a sequence.
| if let Ok(vec) = ob.extract::<Vec<PyRecordBatch>>() { | ||
| Ok(Self::Sequence(vec)) | ||
| } else { | ||
| let iter = ob.call_method0(intern!(ob.py(), "__iter__"))?; |
There was a problem hiding this comment.
Here we should have a better error message. If __iter__ doesn't exist, we should print something like "RecordBatchInput requires a sequence or iterator of record batches"
There was a problem hiding this comment.
By default the ? will give a somewhat opaque error message of just "__iter__ does not exist" or something like that.
| batches: The existing batches. Can be a list, generator, or any | ||
| iterable. Generators are consumed lazily. |
There was a problem hiding this comment.
We should clarify; iterators are probably also consumed lazily.
| RecordBatchInput::Sequence(vec) => { | ||
| let batches = vec | ||
| .into_iter() | ||
| .map(|batch| batch.into_inner()) | ||
| .collect::<Vec<_>>(); | ||
| Self::new(Box::new(RecordBatchIterator::new( | ||
| batches.into_iter().map(Ok), | ||
| schema, | ||
| ))) | ||
| } | ||
| RecordBatchInput::Iterator(iter) => { | ||
| Self::new(Box::new(PyIteratorRecordBatchReader { schema, iter })) | ||
| } | ||
| } |
There was a problem hiding this comment.
Nit: I'd move this onto a private method on RecordBatchInput named into_record_batch_iterator which returns a Box<dyn RecordBatchIterator>.
Then this call site can stay clean
Changes
from_batchesto accept any Python iterable (list, generator, etc.) instead of requiring aSequence. Generators are consumed lazily, enabling streaming writes without materializing all batches in memory.Closes #493