Add read support for nested data types in iceberg.#14270
Add read support for nested data types in iceberg.#14270liurenjie1024 wants to merge 9 commits intoNVIDIA:mainfrom
Conversation
Signed-off-by: Ray Liu <liurenjie2008@gmail.com>
Greptile OverviewGreptile SummaryThis PR adds GPU support for reading nested data types (arrays, maps, structs) in Iceberg tables, addressing issue #14238. Key Changes:
Testing:
The implementation efficiently handles schema evolution by building the transformation plan once during construction rather than on every batch, with fast-path optimization when schemas match exactly. Confidence Score: 4/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Reader as Parquet Reader
participant Filter as filterParquetBlocks
participant Processor as GpuParquetReaderPostProcessor
participant Visitor as ActionBuildingVisitor
participant Action as ColumnAction Tree
Reader->>Filter: filterParquetBlocks(file, requiredSchema)
Filter->>Filter: Convert shaded parquet schema to Iceberg schema
Filter->>Filter: Build field ID to batch index mapping
Filter-->>Reader: (ParquetFileInfo, ShadedMessageType)
Reader->>Processor: new GpuParquetReaderPostProcessor(...)
Processor->>Visitor: Build action tree via SchemaWithPartnerVisitor
Visitor->>Visitor: Compare expected vs file schema recursively
Visitor->>Visitor: Handle primitives, structs, lists, maps
Visitor->>Action: Generate PassThrough/UpCast/FillNull/etc
Action-->>Processor: rootAction tree
Processor->>Processor: Check if canPassThroughBatch
Reader->>Processor: process(originalBatch)
alt canPassThroughBatch
Processor-->>Reader: Return original batch
else needs transformation
Processor->>Action: Execute action tree on batch columns
Action->>Action: Process nested types recursively
Action->>Action: Apply type promotions, fill nulls, fetch constants
Action-->>Processor: Transformed columns
Processor-->>Reader: New ColumnarBatch with expected schema
end
|
| override def execute(ctx: ColumnActionContext): CudfColumnVector = { | ||
| // Handle case where entire struct is missing from file (ctx.column is None) | ||
| // In this case, all inputIndices must be None and we generate all children | ||
| ctx.column match { | ||
| case None => | ||
| // Struct is missing from file - all fields must be generated | ||
| require(inputIndices.forall(_.isEmpty), | ||
| "When struct column is missing, all inputIndices must be None") | ||
| val childCols = fieldActions.safeMap(action => action.execute(ctx)) | ||
| withResource(childCols) { cols => | ||
| CudfColumnVector.makeStruct(ctx.numRows, cols: _*) | ||
| } | ||
|
|
||
| case Some(col) => | ||
| val childCols = fieldActions.zip(inputIndices).safeMap { case (action, inputIdx) => | ||
| inputIdx match { | ||
| case Some(idx) => | ||
| // Field exists in input - get child at the correct index | ||
| val childCol = withResource(col.getChildColumnView(idx)) { childView => | ||
| childView.copyToColumnVector() | ||
| } | ||
| if (action == PassThrough) { | ||
| childCol | ||
| } else { | ||
| withResource(childCol) { _ => | ||
| action.execute(ctx.withColumn(childCol)) | ||
| } | ||
| } | ||
| case None => | ||
| // Field doesn't exist in input - action must generate the column | ||
| // (FillNull, FetchConstant, etc.) | ||
| action.execute(ctx) | ||
| } | ||
| } | ||
| withResource(childCols) { cols => | ||
| CudfColumnVector.makeStruct(ctx.numRows, cols: _*) | ||
| } | ||
| } |
There was a problem hiding this comment.
Verify validity mask propagation when creating struct with makeStruct for nullable parent structs, especially in case None (line 286-293) where all children are generated.
|
build |
|
build |
sql-plugin/src/main/scala/com/nvidia/spark/rapids/parquet/GpuParquetScan.scala
Outdated
Show resolved
Hide resolved
...n/src/main/scala/com/nvidia/spark/rapids/iceberg/parquet/GpuParquetReaderPostProcessor.scala
Show resolved
Hide resolved
|
In general, LGTM. |
jihoonson
left a comment
There was a problem hiding this comment.
This is a nice addition to the iceberg support. @liurenjie1024 thank you for the PR. I have not finished my review yet, but wanted to leave some comments first.
| // https://github.com/apache/parquet-format/blob/master/ | ||
| // LogicalTypes.md#backward-compatibility-rules |
There was a problem hiding this comment.
I'd suggest to leave this as it is. First, it will be easier to just copy this and paste it in the web browser. Some IDEs even let you open the link directly. Having a URL spanned in multiple lines will just make it harder to actually open it. Second, this style fix is nice and I appreciate it, but it's irrelevant to the main change of this PR. This PR is already pretty big and the style fix makes it harder to review.
| // https://github.com/apache/spark/blob/3a0e6bde2aaa11e1165f4fde040ff02e1743795e/ | ||
| // sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ | ||
| // ParquetSchemaConverter.scala#L413 |
| } | ||
|
|
||
| /** | ||
| * CRITICAL BUG FIX for nested types: |
There was a problem hiding this comment.
| * CRITICAL BUG FIX for nested types: | |
| * Note: |
| return originalSchema | ||
| } | ||
|
|
||
| // Get all column paths from the first block (all blocks should have the same columns) |
There was a problem hiding this comment.
Is this being checked somewhere already? If not, should we add some sanity check here?
| checkSchemaCompat( | ||
| fileGroupType, | ||
| array.elementType, | ||
| errorCallback, | ||
| isCaseSensitive, | ||
| useFieldId, | ||
| rootFileType, | ||
| rootReadType) |
There was a problem hiding this comment.
I will stop making the same comment, but please do not add unrelated style fix in a PR that is already big.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/parquet/GpuParquetScan.scala
Outdated
Show resolved
Hide resolved
|
|
||
|
|
||
| override def schema(schema: Schema, structResult: HandlerResult): HandlerResult = structResult | ||
| def numRows: Int = column.map(_.getRowCount.toInt).getOrElse(processor.currentNumRows) |
There was a problem hiding this comment.
We should check the overflow when doing getRowCount.toInt.
There was a problem hiding this comment.
Why do we first check the row count in the column instead of just using processor.currentNumRows? Is it because the row count can be a different thing for nested type columns? If so, is using processor.currentNumRows() for nested type columns ever valid?
| col.incRefCount() | ||
| } else { | ||
| val elementCol = withResource(col.getChildColumnView(0)) { childView => | ||
| childView.copyToColumnVector() |
There was a problem hiding this comment.
Do we need this copy because we are going to replace the list child?
| // Struct is missing from file - all fields must be generated | ||
| require(inputIndices.forall(_.isEmpty), | ||
| "When struct column is missing, all inputIndices must be None") | ||
| val childCols = fieldActions.safeMap(action => action.execute(ctx)) |
There was a problem hiding this comment.
So ctx.column will be None in this case when action.execute(ctx) is executed. But many ColumnActions above are missing the existence check for ctx.column. We should add the check.
| def buildAction( | ||
| fieldId: Int, | ||
| sparkType: DataType, | ||
| isOptional: Boolean, |
There was a problem hiding this comment.
| isOptional: Boolean, | |
| isFieldOptional: Boolean, |
| ) extends SchemaWithPartnerVisitor[Type, ColumnAction] { | ||
|
|
||
| // Track current field using a list as stack (for nested struct handling) | ||
| private var fieldStack: List[Types.NestedField] = Nil |
There was a problem hiding this comment.
Why not using the actual Stack? It should be better than using an immutable collection List.
| // For nested types (array, map, struct), we fall back to Row-based approach | ||
| // since GpuColumnarBatchBuilder doesn't support these directly | ||
| case _: ArrayType | _: MapType | _: StructType => | ||
| throw new MatchError(field.dataType) |
There was a problem hiding this comment.
Per https://www.scala-lang.org/api/2.13.3/scala/MatchError.html, the MatchError is used when there is no matching pattern, which is not the case here. We should use a better exception type since it is confusing. Because this is a test util, I don't mind using some generic exception type such as RuntimeException. Or you can use something else if you have a better idea.
| /** | ||
| * Creates a HostColumnVector type descriptor for any data type | ||
| */ | ||
| private def getHostColumnType( |
There was a problem hiding this comment.
| private def getHostColumnType( | |
| private def createHostColumnType( |
| /** | ||
| * Creates a HostColumnVector type descriptor for lists | ||
| */ | ||
| private def getHostListType( |
There was a problem hiding this comment.
| private def getHostListType( | |
| private def createHostListType( |
This is not a simple 'get' function that simply returns something. It 'creates' a new list type.
| new org.apache.spark.sql.vectorized.ColumnarBatch(gpuCols.toArray, rowCount) | ||
| } catch { | ||
| case e: Throwable => | ||
| columns.foreach(_.close()) |
There was a problem hiding this comment.
Please use withResource or columns.safeClose(). The current code may not close all columns if one of them fails to close.
| private val rootAction: ColumnAction = buildActionTimeMetric.ns { | ||
| val visitor = new ActionBuildingVisitor(idToConstant) | ||
| val accessors = new FileSchemaAccessors() | ||
| SchemaWithPartnerVisitor.visit( | ||
| expectedSchema.asStruct(), | ||
| fileIcebergSchema.asStruct(), | ||
| visitor, | ||
| accessors) | ||
| } |
Fixes #14238 .
Description
Add read support for nested data types in iceberg, also including tests.
Checklists
(Please explain in the PR description how the new code paths are tested, such as names of the new/existing tests that cover them.)