Skip to content

fix: recompute Union schema when field names or types change#22640

Open
Brijesh-Thakkar wants to merge 5 commits into
apache:mainfrom
Brijesh-Thakkar:fix/22447-recompute-schema-union
Open

fix: recompute Union schema when field names or types change#22640
Brijesh-Thakkar wants to merge 5 commits into
apache:mainfrom
Brijesh-Thakkar:fix/22447-recompute-schema-union

Conversation

@Brijesh-Thakkar
Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Rationale for this change

LogicalPlan::recompute_schema() contains special handling for LogicalPlan::Union to avoid unnecessarily rebuilding the schema when inputs have not changed.

However, the current implementation only checks whether the cached schema and the first input schema have the same number of fields. This can leave the cached schema stale when optimizer rewrites modify field types, names, or qualifiers without changing the schema width.

For example, after a type coercion rewrite, the input schemas may change from Int32 to Int64 while preserving the same column count. In this case, recompute_schema() incorrectly considers the schema unchanged and returns the stale cached schema.

What changes are included in this PR?

This PR updates the LogicalPlan::Union branch in recompute_schema() to compare schema structure rather than only schema width.

The comparison now verifies:

  • Field count
  • Field data types
  • Field names
  • Field qualifiers

If any of these differ from the current input schema, the Union schema is recomputed using Union::try_new().

Additionally, two regression tests were added:

  • test_recompute_schema_union_type_mismatch
  • test_recompute_schema_union_name_mismatch

These tests verify that schema recomputation occurs when input field types or names change while the schema width remains unchanged.

Are these changes tested?

Yes.

Added regression tests covering:

  1. Input type changes (Int32Int64) with identical schema width.
  2. Input column name changes with identical schema width.

The new tests fail with the previous width-only validation logic and pass with this change.

The following test suites were also executed successfully:

  • cargo test -p datafusion-expr
  • cargo test -p datafusion-optimizer

Are there any user-facing changes?

No user-facing API changes.

This change fixes internal schema propagation for LogicalPlan::Union after optimizer rewrites and ensures cached schemas remain consistent with rewritten inputs.

Copilot AI review requested due to automatic review settings May 30, 2026 07:19
@github-actions github-actions Bot added the logical-expr Logical plan and expressions label May 30, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Updates LogicalPlan::recompute_schema for Union plans to detect stale cached schemas beyond just column count, and adds regression tests for type/name mismatches.

Changes:

  • Strengthen Union schema equivalence check to include qualifiers, field names, and data types.
  • Add tests that reproduce stale Union schema behavior when input types or names change after rewrites.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +710 to +722
LogicalPlan::Union(Union { inputs, schema }) => {
let first_input_schema = inputs[0].schema();
if schema.fields().len() == first_input_schema.fields().len() {
// If inputs are not pruned do not change schema
// Check field count AND field types AND field names/qualifiers.
// A width-only check misses cases where inputs were rewritten with
// different types or aliases (e.g. after type-coercion rewrites).
let schemas_match = schema.fields().len() == first_input_schema.fields().len()
&& (0..schema.fields().len()).all(|i| {
let (q1, f1) = schema.qualified_field(i);
let (q2, f2) = first_input_schema.qualified_field(i);
q1 == q2
&& f1.data_type() == f2.data_type()
&& f1.name() == f2.name()
});
Comment on lines +719 to +721
q1 == q2
&& f1.data_type() == f2.data_type()
&& f1.name() == f2.name()
Comment on lines +715 to +716
let schemas_match = schema.fields().len() == first_input_schema.fields().len()
&& (0..schema.fields().len()).all(|i| {
@Brijesh-Thakkar
Copy link
Copy Markdown
Contributor Author

@cetra3 @nathanb9 heyy all ci tests are passing and the issue is adressed properly
Please review it and if everything seems good then please merge it

@Brijesh-Thakkar
Copy link
Copy Markdown
Contributor Author

@comphead Heyy could you please review this PR, the issue is addressed properly and all ci tests are also passing
Thank you

// `Union` was created `BY NAME`, and can safely rely on the
// `try_new` initializer to derive the new schema based on
// column positions.
let recomputed = Union::try_new(inputs)?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we always pay the allocation cost it. I don't think we need to do that. Instead we can do structural comparison first which can short circuit sooner. Like:

let schemas_match = inputs.iter().all(|input| {
    let input_schema = input.schema();
    schema.fields().len() == input_schema.fields().len()
        && schema
            .iter()
            .zip(input_schema.iter())
            .all(|((q1, f1), (q2, f2))| {
                q1 == q2
                    && f1.name() == f2.name()
                    && f1.data_type() == f2.data_type()
                    && f1.is_nullable() == f2.is_nullable()
            })
});

This way we avoid allocation if no type-coercion.

// `Union` was created `BY NAME`, and can safely rely on the
// `try_new` initializer to derive the new schema based on
// column positions.
let recomputed = Union::try_new(inputs)?;
Copy link
Copy Markdown
Contributor

@nathanb9 nathanb9 May 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, after looking at Union::try_new more closely, I see that it always loses metadata.
Metadata may be useful since I see that it is preserved in coerce_union_schema_with_schema (https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/analyzer/type_coercion.rs)

/// **Schema-level metadata merging**: Later schemas take precedence for duplicate keys.
///
/// **Field-level metadata merging**: Later fields take precedence for duplicate metadata keys.

So we might need an API alternative to try_new or a change to it so that we can preserve schema-level and field-level metadata in the case of type coercion

@Brijesh-Thakkar
Copy link
Copy Markdown
Contributor Author

@nathanb9 can u please review it and check if its perfect for merge

…nion

Previously the Union arm only validated field count (width), causing
stale cached schemas when inputs were rewritten with different types
or column names (e.g. after type-coercion). Fix uses qualified_field()
to deep-compare qualifier + data_type + name.

Fixes apache#22447
Previously the Union arm only checked field count (width), causing
stale cached schemas when inputs were rewritten with different types,
names, qualifiers, or nullability but the same column count.

Fix: call Union::try_new(inputs) to get the authoritative recomputed
schema, then compare against the cached schema via DFSchema PartialEq.
This handles all field properties in one place and is future-proof.

Added three unit tests covering type, name, and nullability mismatches.

Fixes apache#22447
Previously the Union arm only checked field count (width), causing
stale cached schemas when inputs were rewritten with different types,
names, qualifiers, or nullability but the same column count.

Fix:
- Fast path: structural comparison across ALL inputs (field count,
  type, name, qualifier, nullability) with zero allocation on the
  common no-change case, as suggested by the reviewer.
- Slow path: Union::try_new() for structure, then HashMap::extend()
  semantics for schema-level and field-level metadata preservation,
  matching the behavior of coerce_union_schema_with_schema in
  type_coercion.rs.

Added four unit tests covering type mismatch, name mismatch,
nullability mismatch, and metadata preservation.

Fixes apache#22447
@Brijesh-Thakkar Brijesh-Thakkar force-pushed the fix/22447-recompute-schema-union branch from b819ec5 to a5216a7 Compare June 1, 2026 18:28
@adriangb
Copy link
Copy Markdown
Contributor

adriangb commented Jun 1, 2026

The comparison now verifies: Field count Field data types Field names Field qualifiers

If we are checking all of these, can’t we just compare the schemas themselves? What are we saving?

And if we need to compare not just the first item in the union but all… is there even anything to cache?

@Brijesh-Thakkar
Copy link
Copy Markdown
Contributor Author

@adriangb There are two reasons why comparing the structure of something is better than checking if two schemasre exactly the same.

  1. The way DFSchema checks for equality includes information.

Since it checks every part of the schema including information about the schema itself and the fields it will say they are different if anything is different even if it is not important.

If we just checked if the schema was the same as the input schema and the input had any differences in the information we would incorrectly go down a slow path.

This slow path would do a lot of work like trying to create a new Union and merging all the extra information for every part of the optimization process.

The structural comparison is done on purpose so that if the structure is the same but the extra information is different it stays on the path.

  1. The fast path is there to avoid doing work when it is not needed.

The function to calculate the schema again is called for every part of the plan tree like when we change the type of something or remove parts.

Most of the time the inputs to the Union nodes will not have changed.

The structural comparison is simple. Does not use any extra memory.

However trying to create an Union always uses extra memory even if nothing has changed.

The extra work of using memory for every part of the plan tree is what the cache is trying to prevent.

So to answer your question directly: what we are saving is the work of using memory for every time we calculate the schema again for any Union node that has not changed which is the common case, for every optimization pass that does not touch that specific node.

@Brijesh-Thakkar
Copy link
Copy Markdown
Contributor Author

@nathanb9 Please review it and if everything seems good , we can merge it
Thank you

@adriangb
Copy link
Copy Markdown
Contributor

adriangb commented Jun 2, 2026

@Brijesh-Thakkar please be less aggressive about pinging reviewers. Time split between writing code (especially with LLMs) and reviewing is massive. As a rule of thumb I'd say please do not ping more than once per day or so, it won't help your PR get merged faster.

@Brijesh-Thakkar
Copy link
Copy Markdown
Contributor Author

@Brijesh-Thakkar please be less aggressive about pinging reviewers. Time split between writing code (especially with LLMs) and reviewing is massive. As a rule of thumb I'd say please do not ping more than once per day or so, it won't help your PR get merged faster.

Okk , sorry for this behavior
I will keep it in mind
I wont repeat this type of things again
Thank you for pointing this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

logical-expr Logical plan and expressions

Projects

None yet

Development

Successfully merging this pull request may close these issues.

recompute_schema() is broken with LogicalPlan::Union

4 participants