[flink] Fix COUNT(column) aggregate pushdown to reject nullable columns#3271
[flink] Fix COUNT(column) aggregate pushdown to reject nullable columns#3271beryllw wants to merge 2 commits into
Conversation
|
@luoyuxia cc |
There was a problem hiding this comment.
Pull request overview
This PR fixes Flink aggregate pushdown in FlinkTableSource so that COUNT(column) is only pushed down to Fluss’ row-count API when the argument cannot be NULL (avoiding incorrect results for nullable columns, per #3270).
Changes:
- Update aggregate pushdown logic to reject
COUNT(expr)when the COUNT argument’s type is nullable. - Extend batch IT coverage to exercise
COUNT(id)pushdown and to ensureCOUNT(address)on a nullable column is not pushed down. - Modify log-table test data to include NULLs in the
addresscolumn.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java |
Tightens COUNT aggregate pushdown to avoid pushing down nullable-argument COUNT(expr) as a row-count. |
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java |
Adds/extends IT assertions around COUNT pushdown behavior and adjusts log-table test data to include NULL addresses. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // test COUNT(column) with NULL values - should NOT push down for nullable columns | ||
| // This will fail because log table doesn't support full scan in batch mode | ||
| assertThatThrownBy( | ||
| () -> | ||
| tEnv.explainSql( | ||
| String.format("SELECT COUNT(address) FROM %s", tableName))) | ||
| .hasMessageContaining( | ||
| "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); | ||
| assertThatThrownBy( | ||
| () -> | ||
| tEnv.explainSql( | ||
| String.format( | ||
| "SELECT COUNT(DISTINCT address) FROM %s", | ||
| tableName))) | ||
| .hasMessageContaining( | ||
| "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); |
| // For COUNT(column), reject if column is nullable (cannot handle NULL filtering) | ||
| if (isCountAgg) { | ||
| List<org.apache.flink.table.expressions.Expression> args = aggExpr.getChildren(); | ||
| if (!args.isEmpty() && args.get(0) instanceof ResolvedExpression) { | ||
| ResolvedExpression arg = (ResolvedExpression) args.get(0); | ||
| if (arg.getOutputDataType().getLogicalType().isNullable()) { | ||
| return false; | ||
| } | ||
| } | ||
| } |
@beryllw test fails
|
fixed |
Is this a bug in flink sql? Maybe we can take a flink jira for it? |
It's a Flink Fluss SQL Source bug. |


Purpose
Linked issue: close #3270
Fixes the aggregate pushdown logic to correctly handle COUNT(column) on nullable columns.
Brief change log
When the aggregate is CountAggFunction with a nullable column argument, the pushdown is rejected so that Flink handles the NULL-excluding count correctly.
Tests
API and Format
Documentation