Integrate window function optimization rules into IoTDB#16953
Integrate window function optimization rules into IoTDB#16953JackieTien97 merged 17 commits intomasterfrom
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #16953 +/- ##
============================================
+ Coverage 39.02% 39.19% +0.16%
- Complexity 207 282 +75
============================================
Files 5021 5125 +104
Lines 333377 342995 +9618
Branches 42431 43747 +1316
============================================
+ Hits 130110 134442 +4332
- Misses 203267 208553 +5286 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR integrates window function optimization rules into IoTDB, adding support for optimizing window functions through specialized plan nodes (TopKRankingNode, RowNumberNode, ValuesNode) and corresponding operators, along with optimization rules to transform and optimize window operations.
Changes:
- Added new plan nodes: TopKRankingNode, RowNumberNode, and ValuesNode for specialized window operations
- Implemented optimization rules: PruneWindowColumns, RemoveRedundantWindow, GatherAndMergeWindows, ReplaceWindowWithRowNumber, PushDownLimitIntoWindow, PushDownFilterIntoWindow
- Added operators: TopKRankingOperator, RowNumberOperator, ValuesOperator with supporting data structures
Reviewed changes
Copilot reviewed 35 out of 35 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| HeapTraversal.java | Utility for navigating binary heap structures |
| TopKRankingNode.java | Plan node for top-k ranking operations |
| RowNumberNode.java | Plan node for row numbering operations |
| ValuesNode.java | Plan node for constant value operations |
| RemoveRedundantWindow.java | Rule to remove empty window operations |
| ReplaceWindowWithRowNumber.java | Rule to replace window with row number (incomplete) |
| PushDownLimitIntoWindow.java | Rule to push limit into window operations |
| PushDownFilterIntoWindow.java | Rule to push filter into window operations |
| GatherAndMergeWindows.java | Rule to merge adjacent window operations |
| TopKRankingOperator.java | Operator for executing top-k ranking |
| RowNumberOperator.java | Operator for computing row numbers |
| ValuesOperator.java | Operator for constant values |
| Supporting data structures | NoChannelGroupByHash, FIFO queues, grouped TopN builders |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public List<Symbol> getOutputSymbols() { | ||
| return Collections.singletonList(rankingSymbol); | ||
| } |
There was a problem hiding this comment.
The method getOutputSymbols returns only the ranking symbol, but it should return all output symbols including those from the child node. This inconsistency with other node implementations (like RowNumberNode which properly handles output symbols) will cause incorrect query planning.
|
|
||
| @Override | ||
| public List<Symbol> getOutputSymbols() { | ||
| return Collections.singletonList(rowNumberSymbol); |
There was a problem hiding this comment.
The method getOutputSymbols returns only the row number symbol, but it should return all output symbols including those from the child node. This is inconsistent with how other operators handle output symbols and will cause query planning errors.
| return Collections.singletonList(rowNumberSymbol); | |
| return ImmutableList.<Symbol>builder() | |
| .addAll(getChild().getOutputSymbols()) | |
| .add(rowNumberSymbol) | |
| .build(); |
| Map<Symbol, Symbol> mapping = new HashMap<>(rewrittenSource.getMappings()); | ||
| SymbolMapper mapper = symbolMapper(mapping); | ||
|
|
||
| TopKRankingNode rewrittenTopNRanking = mapper.map(node, rewrittenSource.getRoot()); |
There was a problem hiding this comment.
The variable name 'rewrittenTopNRanking' (line 641) is inconsistent with the node type TopKRankingNode. The name should be 'rewrittenTopKRanking' to match the actual class name and maintain naming consistency.
| case 1036: | ||
| return ExceptNode.deserialize(buffer); | ||
| case 1037: | ||
| return TopKNode.deserialize(buffer); |
There was a problem hiding this comment.
The deserialization case for TABLE_TOPK_RANKING_NODE (1037) is calling TopKNode.deserialize(buffer) instead of TopKRankingNode.deserialize(buffer). This will cause runtime errors when deserializing TopKRankingNode instances.
|
|
||
| @Override | ||
| public Result apply(WindowNode node, Captures captures, Context context) { | ||
| return null; |
There was a problem hiding this comment.
The apply method returns null unconditionally. This rule will never perform any transformation, making it ineffective. The method should implement the actual transformation logic to replace the WindowNode with a RowNumberNode.
| return null; | |
| return Result.empty(); |
| return result; | ||
| } | ||
|
|
||
| private void processRow(TsBlock tsBlock, int position, long rowNumber) { |
There was a problem hiding this comment.
The method processRow accepts three parameters (TsBlock tsBlock, int position, long rowNumber) but is being called with (tsBlock, partitionId, rowCount + 1) at line 121. The second argument should be 'position', not 'partitionId'. This will cause incorrect column access and likely runtime errors.
|
|
||
| private void processRow(TsBlock tsBlock, int position, long rowNumber) { | ||
| // Check max rows per partition limit | ||
| if (maxRowsPerPartition.isPresent() && rowNumber >= maxRowsPerPartition.get()) { |
There was a problem hiding this comment.
The condition checks if rowNumber >= maxRowsPerPartition, but it should check rowNumber > maxRowsPerPartition. With the current logic, when rowNumber equals maxRowsPerPartition (which is the maximum allowed), the row is incorrectly skipped. For example, if maxRowsPerPartition is 5, row 5 will be skipped even though rows 1-5 should be included.
| if (maxRowsPerPartition.isPresent() && rowNumber >= maxRowsPerPartition.get()) { | |
| if (maxRowsPerPartition.isPresent() && rowNumber > maxRowsPerPartition.get()) { |
| boolean generateRanking, | ||
| Optional<Integer> hashChannel, | ||
| int expectedPositions, | ||
| Optional<Long> maxPartialMemory) { | ||
| this.operatorContext = operatorContext; | ||
| this.inputOperator = inputOperator; | ||
| this.rankingType = rankingType; | ||
| this.inputTypes = inputTypes; | ||
| this.partitionChannels = partitionChannels; | ||
| this.partitionTSDataTypes = partitionTSDataTypes; | ||
| this.sortChannels = sortChannels; | ||
| this.sortOrders = sortOrders; | ||
| this.maxRowCountPerPartition = maxRowCountPerPartition; | ||
| this.partial = !generateRanking; | ||
| this.generateRanking = generateRanking; |
There was a problem hiding this comment.
The TopKRankingOperator constructor parameter 'generateRanking' is used to set 'partial' with inverted logic (partial = !generateRanking at line 105), but then 'generateRanking' is also stored separately. This creates confusing dual state. Additionally, the constructor parameter name at line 92 is 'generateRanking' but the field at line 66 is named 'generateRanking' while the parameter is used to derive 'partial'. Consider using a single boolean field with clear semantics.
| return new RowNumberNode( | ||
| getPlanNodeId(), partitionBy, orderSensitive, rowNumberSymbol, maxRowCountPerPartition); | ||
| } | ||
|
|
There was a problem hiding this comment.
This method overrides PlanNode.accept; it is advisable to add an Override annotation.
| @Override |
|




Description
This PR introduce the following optimization rules:
And its corresponding nodes and operators.