fix: recompute Union schema when field names or types change#22640
fix: recompute Union schema when field names or types change#22640Brijesh-Thakkar wants to merge 5 commits into
Conversation
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Updates LogicalPlan::recompute_schema for Union plans to detect stale cached schemas beyond just column count, and adds regression tests for type/name mismatches.
Changes:
- Strengthen
Unionschema equivalence check to include qualifiers, field names, and data types. - Add tests that reproduce stale
Unionschema behavior when input types or names change after rewrites.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| LogicalPlan::Union(Union { inputs, schema }) => { | ||
| let first_input_schema = inputs[0].schema(); | ||
| if schema.fields().len() == first_input_schema.fields().len() { | ||
| // If inputs are not pruned do not change schema | ||
| // Check field count AND field types AND field names/qualifiers. | ||
| // A width-only check misses cases where inputs were rewritten with | ||
| // different types or aliases (e.g. after type-coercion rewrites). | ||
| let schemas_match = schema.fields().len() == first_input_schema.fields().len() | ||
| && (0..schema.fields().len()).all(|i| { | ||
| let (q1, f1) = schema.qualified_field(i); | ||
| let (q2, f2) = first_input_schema.qualified_field(i); | ||
| q1 == q2 | ||
| && f1.data_type() == f2.data_type() | ||
| && f1.name() == f2.name() | ||
| }); |
| q1 == q2 | ||
| && f1.data_type() == f2.data_type() | ||
| && f1.name() == f2.name() |
| let schemas_match = schema.fields().len() == first_input_schema.fields().len() | ||
| && (0..schema.fields().len()).all(|i| { |
|
@comphead Heyy could you please review this PR, the issue is addressed properly and all ci tests are also passing |
| // `Union` was created `BY NAME`, and can safely rely on the | ||
| // `try_new` initializer to derive the new schema based on | ||
| // column positions. | ||
| let recomputed = Union::try_new(inputs)?; |
There was a problem hiding this comment.
Here we always pay the allocation cost it. I don't think we need to do that. Instead we can do structural comparison first which can short circuit sooner. Like:
let schemas_match = inputs.iter().all(|input| {
let input_schema = input.schema();
schema.fields().len() == input_schema.fields().len()
&& schema
.iter()
.zip(input_schema.iter())
.all(|((q1, f1), (q2, f2))| {
q1 == q2
&& f1.name() == f2.name()
&& f1.data_type() == f2.data_type()
&& f1.is_nullable() == f2.is_nullable()
})
});This way we avoid allocation if no type-coercion.
| // `Union` was created `BY NAME`, and can safely rely on the | ||
| // `try_new` initializer to derive the new schema based on | ||
| // column positions. | ||
| let recomputed = Union::try_new(inputs)?; |
There was a problem hiding this comment.
Also, after looking at Union::try_new more closely, I see that it always loses metadata.
Metadata may be useful since I see that it is preserved in coerce_union_schema_with_schema (https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/analyzer/type_coercion.rs)
/// **Schema-level metadata merging**: Later schemas take precedence for duplicate keys.
///
/// **Field-level metadata merging**: Later fields take precedence for duplicate metadata keys.
So we might need an API alternative to try_new or a change to it so that we can preserve schema-level and field-level metadata in the case of type coercion
|
@nathanb9 can u please review it and check if its perfect for merge |
…nion Previously the Union arm only validated field count (width), causing stale cached schemas when inputs were rewritten with different types or column names (e.g. after type-coercion). Fix uses qualified_field() to deep-compare qualifier + data_type + name. Fixes apache#22447
Previously the Union arm only checked field count (width), causing stale cached schemas when inputs were rewritten with different types, names, qualifiers, or nullability but the same column count. Fix: call Union::try_new(inputs) to get the authoritative recomputed schema, then compare against the cached schema via DFSchema PartialEq. This handles all field properties in one place and is future-proof. Added three unit tests covering type, name, and nullability mismatches. Fixes apache#22447
Previously the Union arm only checked field count (width), causing stale cached schemas when inputs were rewritten with different types, names, qualifiers, or nullability but the same column count. Fix: - Fast path: structural comparison across ALL inputs (field count, type, name, qualifier, nullability) with zero allocation on the common no-change case, as suggested by the reviewer. - Slow path: Union::try_new() for structure, then HashMap::extend() semantics for schema-level and field-level metadata preservation, matching the behavior of coerce_union_schema_with_schema in type_coercion.rs. Added four unit tests covering type mismatch, name mismatch, nullability mismatch, and metadata preservation. Fixes apache#22447
b819ec5 to
a5216a7
Compare
If we are checking all of these, can’t we just compare the schemas themselves? What are we saving? And if we need to compare not just the first item in the union but all… is there even anything to cache? |
|
@adriangb There are two reasons why comparing the structure of something is better than checking if two schemasre exactly the same.
Since it checks every part of the schema including information about the schema itself and the fields it will say they are different if anything is different even if it is not important. If we just checked if the schema was the same as the input schema and the input had any differences in the information we would incorrectly go down a slow path. This slow path would do a lot of work like trying to create a new Union and merging all the extra information for every part of the optimization process. The structural comparison is done on purpose so that if the structure is the same but the extra information is different it stays on the path.
The function to calculate the schema again is called for every part of the plan tree like when we change the type of something or remove parts. Most of the time the inputs to the Union nodes will not have changed. The structural comparison is simple. Does not use any extra memory. However trying to create an Union always uses extra memory even if nothing has changed. The extra work of using memory for every part of the plan tree is what the cache is trying to prevent. So to answer your question directly: what we are saving is the work of using memory for every time we calculate the schema again for any Union node that has not changed which is the common case, for every optimization pass that does not touch that specific node. |
|
@nathanb9 Please review it and if everything seems good , we can merge it |
|
@Brijesh-Thakkar please be less aggressive about pinging reviewers. Time split between writing code (especially with LLMs) and reviewing is massive. As a rule of thumb I'd say please do not ping more than once per day or so, it won't help your PR get merged faster. |
Okk , sorry for this behavior |
Which issue does this PR close?
recompute_schema()is broken withLogicalPlan::Union#22447Rationale for this change
LogicalPlan::recompute_schema()contains special handling forLogicalPlan::Unionto avoid unnecessarily rebuilding the schema when inputs have not changed.However, the current implementation only checks whether the cached schema and the first input schema have the same number of fields. This can leave the cached schema stale when optimizer rewrites modify field types, names, or qualifiers without changing the schema width.
For example, after a type coercion rewrite, the input schemas may change from
Int32toInt64while preserving the same column count. In this case,recompute_schema()incorrectly considers the schema unchanged and returns the stale cached schema.What changes are included in this PR?
This PR updates the
LogicalPlan::Unionbranch inrecompute_schema()to compare schema structure rather than only schema width.The comparison now verifies:
If any of these differ from the current input schema, the
Unionschema is recomputed usingUnion::try_new().Additionally, two regression tests were added:
test_recompute_schema_union_type_mismatchtest_recompute_schema_union_name_mismatchThese tests verify that schema recomputation occurs when input field types or names change while the schema width remains unchanged.
Are these changes tested?
Yes.
Added regression tests covering:
Int32→Int64) with identical schema width.The new tests fail with the previous width-only validation logic and pass with this change.
The following test suites were also executed successfully:
cargo test -p datafusion-exprcargo test -p datafusion-optimizerAre there any user-facing changes?
No user-facing API changes.
This change fixes internal schema propagation for
LogicalPlan::Unionafter optimizer rewrites and ensures cached schemas remain consistent with rewritten inputs.