Skip to content

Comments

Add read support for nested data types in iceberg.#14270

Open
liurenjie1024 wants to merge 9 commits intoNVIDIA:mainfrom
liurenjie1024:ray/14238
Open

Add read support for nested data types in iceberg.#14270
liurenjie1024 wants to merge 9 commits intoNVIDIA:mainfrom
liurenjie1024:ray/14238

Conversation

@liurenjie1024
Copy link
Collaborator

Fixes #14238 .

Description

Add read support for nested data types in iceberg, also including tests.

Checklists

  • This PR has added documentation for new or modified features or behaviors.
  • This PR has added new tests or modified existing tests to cover new code paths.
    (Please explain in the PR description how the new code paths are tested, such as names of the new/existing tests that cover them.)
  • Performance testing has been performed and its results are added in the PR description. Or, an issue has been filed with a link in the PR description.

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Feb 6, 2026

Greptile Overview

Greptile Summary

This PR adds GPU support for reading nested data types (arrays, maps, structs) in Iceberg tables, addressing issue #14238.

Key Changes:

  • Redesigned GpuParquetReaderPostProcessor with an action-based tree architecture that pre-computes schema transformations at construction time using Iceberg's SchemaWithPartnerVisitor pattern
  • Implemented ColumnAction hierarchy supporting: pass-through, type promotion (int→long, float→double, binary→string), null filling, constant fetching, and metadata columns (_file, _pos)
  • Added nested type processing: ProcessStruct handles field reordering/generation, ProcessList transforms array elements, ProcessMap transforms map key/value pairs
  • Replaced custom SparkSchemaConverter with standard ParquetToSparkSchemaConverter for better compatibility
  • Enhanced FuzzerUtils to generate random columnar batches for nested types, enabling comprehensive testing
  • Added performance metrics: ICEBERG_BUILD_ACTION_TIME and ICEBERG_POST_PROCESS_TIME

Testing:

  • New GpuPostProcessorSuite with 7 unit tests covering all operations including passthrough, promotion, deep nesting, and actual batch processing
  • Integration tests updated to remove GPU fallback markers for nested types
  • Added test for metadata columns with partition evolution

The implementation efficiently handles schema evolution by building the transformation plan once during construction rather than on every batch, with fast-path optimization when schemas match exactly.

Confidence Score: 4/5

  • Safe to merge with thorough testing - well-architected implementation with comprehensive test coverage
  • The implementation is well-designed with proper error handling and resource management. The action-based tree architecture is clean and maintainable. Comprehensive unit tests cover all code paths including edge cases. Integration tests verify end-to-end functionality. The one concern noted in previous review about validity mask propagation in struct creation appears to be handled correctly by cuDF's makeStruct API.
  • Pay close attention to GpuParquetReaderPostProcessor.scala - this file contains complex nested type transformation logic that is critical to correctness

Important Files Changed

Filename Overview
iceberg/common/src/main/scala/com/nvidia/spark/rapids/iceberg/parquet/GpuParquetReaderPostProcessor.scala Major refactor adding nested data type support (structs, lists, maps) with action-based processing tree - includes comprehensive logic for schema evolution, type promotion, and field generation
tests/src/test/spark350/scala/com/nvidia/spark/rapids/iceberg/GpuPostProcessorSuite.scala New comprehensive test suite with 7 test cases covering all nested type operations including passthrough, promotion, and actual batch processing
tests/src/test/scala/com/nvidia/spark/rapids/FuzzerUtils.scala Enhanced to generate random columnar batches for nested types (arrays, maps, structs) - crucial for testing nested data type support
iceberg/common/src/main/scala/com/nvidia/spark/rapids/iceberg/parquet/reader.scala Updated to use ParquetToSparkSchemaConverter instead of custom converter, returns shaded schema for post-processor
sql-plugin/src/main/scala/com/nvidia/spark/rapids/parquet/GpuParquetScan.scala Added support for passing Iceberg metrics to readers, included new metric tracking for Iceberg operations

Sequence Diagram

sequenceDiagram
    participant Reader as Parquet Reader
    participant Filter as filterParquetBlocks
    participant Processor as GpuParquetReaderPostProcessor
    participant Visitor as ActionBuildingVisitor
    participant Action as ColumnAction Tree
    
    Reader->>Filter: filterParquetBlocks(file, requiredSchema)
    Filter->>Filter: Convert shaded parquet schema to Iceberg schema
    Filter->>Filter: Build field ID to batch index mapping
    Filter-->>Reader: (ParquetFileInfo, ShadedMessageType)
    
    Reader->>Processor: new GpuParquetReaderPostProcessor(...)
    Processor->>Visitor: Build action tree via SchemaWithPartnerVisitor
    Visitor->>Visitor: Compare expected vs file schema recursively
    Visitor->>Visitor: Handle primitives, structs, lists, maps
    Visitor->>Action: Generate PassThrough/UpCast/FillNull/etc
    Action-->>Processor: rootAction tree
    Processor->>Processor: Check if canPassThroughBatch
    
    Reader->>Processor: process(originalBatch)
    alt canPassThroughBatch
        Processor-->>Reader: Return original batch
    else needs transformation
        Processor->>Action: Execute action tree on batch columns
        Action->>Action: Process nested types recursively
        Action->>Action: Apply type promotions, fill nulls, fetch constants
        Action-->>Processor: Transformed columns
        Processor-->>Reader: New ColumnarBatch with expected schema
    end
Loading

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

Comment on lines 282 to 319
override def execute(ctx: ColumnActionContext): CudfColumnVector = {
// Handle case where entire struct is missing from file (ctx.column is None)
// In this case, all inputIndices must be None and we generate all children
ctx.column match {
case None =>
// Struct is missing from file - all fields must be generated
require(inputIndices.forall(_.isEmpty),
"When struct column is missing, all inputIndices must be None")
val childCols = fieldActions.safeMap(action => action.execute(ctx))
withResource(childCols) { cols =>
CudfColumnVector.makeStruct(ctx.numRows, cols: _*)
}

case Some(col) =>
val childCols = fieldActions.zip(inputIndices).safeMap { case (action, inputIdx) =>
inputIdx match {
case Some(idx) =>
// Field exists in input - get child at the correct index
val childCol = withResource(col.getChildColumnView(idx)) { childView =>
childView.copyToColumnVector()
}
if (action == PassThrough) {
childCol
} else {
withResource(childCol) { _ =>
action.execute(ctx.withColumn(childCol))
}
}
case None =>
// Field doesn't exist in input - action must generate the column
// (FillNull, FetchConstant, etc.)
action.execute(ctx)
}
}
withResource(childCols) { cols =>
CudfColumnVector.makeStruct(ctx.numRows, cols: _*)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verify validity mask propagation when creating struct with makeStruct for nullable parent structs, especially in case None (line 286-293) where all children are generated.

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

@liurenjie1024
Copy link
Collaborator Author

build

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

@liurenjie1024
Copy link
Collaborator Author

build

@sameerz sameerz added the feature request New feature or request label Feb 8, 2026
@res-life
Copy link
Collaborator

In general, LGTM.
I also used AI to help review.
Please run all Iceberg ITs since premerge does not cover Iceberg ITs.

@sameerz sameerz requested a review from jihoonson February 11, 2026 00:08
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

17 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

Copy link
Collaborator

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice addition to the iceberg support. @liurenjie1024 thank you for the PR. I have not finished my review yet, but wanted to leave some comments first.

Comment on lines +960 to +961
// https://github.com/apache/parquet-format/blob/master/
// LogicalTypes.md#backward-compatibility-rules
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest to leave this as it is. First, it will be easier to just copy this and paste it in the web browser. Some IDEs even let you open the link directly. Having a URL spanned in multiple lines will just make it harder to actually open it. Second, this style fix is nice and I appreciate it, but it's irrelevant to the main change of this PR. This PR is already pretty big and the style fix makes it harder to review.

Comment on lines +963 to +965
// https://github.com/apache/spark/blob/3a0e6bde2aaa11e1165f4fde040ff02e1743795e/
// sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/
// ParquetSchemaConverter.scala#L413
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here.

}

/**
* CRITICAL BUG FIX for nested types:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* CRITICAL BUG FIX for nested types:
* Note:

return originalSchema
}

// Get all column paths from the first block (all blocks should have the same columns)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this being checked somewhere already? If not, should we add some sanity check here?

Comment on lines +915 to +922
checkSchemaCompat(
fileGroupType,
array.elementType,
errorCallback,
isCaseSensitive,
useFieldId,
rootFileType,
rootReadType)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will stop making the same comment, but please do not add unrelated style fix in a PR that is already big.



override def schema(schema: Schema, structResult: HandlerResult): HandlerResult = structResult
def numRows: Int = column.map(_.getRowCount.toInt).getOrElse(processor.currentNumRows)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check the overflow when doing getRowCount.toInt.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we first check the row count in the column instead of just using processor.currentNumRows? Is it because the row count can be a different thing for nested type columns? If so, is using processor.currentNumRows() for nested type columns ever valid?

col.incRefCount()
} else {
val elementCol = withResource(col.getChildColumnView(0)) { childView =>
childView.copyToColumnVector()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this copy because we are going to replace the list child?

// Struct is missing from file - all fields must be generated
require(inputIndices.forall(_.isEmpty),
"When struct column is missing, all inputIndices must be None")
val childCols = fieldActions.safeMap(action => action.execute(ctx))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So ctx.column will be None in this case when action.execute(ctx) is executed. But many ColumnActions above are missing the existence check for ctx.column. We should add the check.

def buildAction(
fieldId: Int,
sparkType: DataType,
isOptional: Boolean,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
isOptional: Boolean,
isFieldOptional: Boolean,

) extends SchemaWithPartnerVisitor[Type, ColumnAction] {

// Track current field using a list as stack (for nested struct handling)
private var fieldStack: List[Types.NestedField] = Nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not using the actual Stack? It should be better than using an immutable collection List.

// For nested types (array, map, struct), we fall back to Row-based approach
// since GpuColumnarBatchBuilder doesn't support these directly
case _: ArrayType | _: MapType | _: StructType =>
throw new MatchError(field.dataType)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per https://www.scala-lang.org/api/2.13.3/scala/MatchError.html, the MatchError is used when there is no matching pattern, which is not the case here. We should use a better exception type since it is confusing. Because this is a test util, I don't mind using some generic exception type such as RuntimeException. Or you can use something else if you have a better idea.

/**
* Creates a HostColumnVector type descriptor for any data type
*/
private def getHostColumnType(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private def getHostColumnType(
private def createHostColumnType(

/**
* Creates a HostColumnVector type descriptor for lists
*/
private def getHostListType(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private def getHostListType(
private def createHostListType(

This is not a simple 'get' function that simply returns something. It 'creates' a new list type.

new org.apache.spark.sql.vectorized.ColumnarBatch(gpuCols.toArray, rowCount)
} catch {
case e: Throwable =>
columns.foreach(_.close())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use withResource or columns.safeClose(). The current code may not close all columns if one of them fails to close.

Comment on lines +596 to +604
private val rootAction: ColumnAction = buildActionTimeMetric.ns {
val visitor = new ActionBuildingVisitor(idToConstant)
val accessors = new FileSchemaAccessors()
SchemaWithPartnerVisitor.visit(
expectedSchema.asStruct(),
fileIcebergSchema.asStruct(),
visitor,
accessors)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty neat 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature request New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEA] Support read nested data type for iceberg.

4 participants