-
Notifications
You must be signed in to change notification settings - Fork 196
#755 - Support column stats for paimon #767
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
678ceac
7139e6b
63adfe0
ed8ac27
b08ce6f
e82dbb9
be48db4
60df096
3b884f8
8400413
203b651
6e927e2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,18 +18,32 @@ | |
|
|
||
| package org.apache.xtable.paimon; | ||
|
|
||
| import java.time.Instant; | ||
| import java.util.*; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import lombok.extern.log4j.Log4j2; | ||
|
|
||
| import org.apache.paimon.Snapshot; | ||
| import org.apache.paimon.data.BinaryArray; | ||
| import org.apache.paimon.data.BinaryRow; | ||
| import org.apache.paimon.data.Timestamp; | ||
| import org.apache.paimon.io.DataFileMeta; | ||
| import org.apache.paimon.manifest.ManifestEntry; | ||
| import org.apache.paimon.stats.SimpleStats; | ||
| import org.apache.paimon.table.FileStoreTable; | ||
| import org.apache.paimon.table.source.snapshot.SnapshotReader; | ||
| import org.apache.paimon.types.TimestampType; | ||
|
|
||
| import org.apache.xtable.exception.ReadException; | ||
| import org.apache.xtable.model.schema.InternalField; | ||
| import org.apache.xtable.model.schema.InternalSchema; | ||
| import org.apache.xtable.model.schema.InternalType; | ||
| import org.apache.xtable.model.stat.ColumnStat; | ||
| import org.apache.xtable.model.stat.Range; | ||
| import org.apache.xtable.model.storage.InternalDataFile; | ||
|
|
||
| @Log4j2 | ||
| public class PaimonDataFileExtractor { | ||
|
|
||
| private final PaimonPartitionExtractor partitionExtractor = | ||
|
|
@@ -61,7 +75,7 @@ private InternalDataFile toInternalDataFile( | |
| .recordCount(entry.file().rowCount()) | ||
| .partitionValues( | ||
| partitionExtractor.toPartitionValues(table, entry.partition(), internalSchema)) | ||
| .columnStats(toColumnStats(entry.file())) | ||
| .columnStats(toColumnStats(entry.file(), internalSchema)) | ||
| .build(); | ||
| } | ||
|
|
||
|
|
@@ -78,10 +92,137 @@ private String toFullPhysicalPath(FileStoreTable table, ManifestEntry entry) { | |
| } | ||
| } | ||
|
|
||
| private List<ColumnStat> toColumnStats(DataFileMeta file) { | ||
| // TODO: Implement logic to extract column stats from the file meta | ||
| // https://github.com/apache/incubator-xtable/issues/755 | ||
| return Collections.emptyList(); | ||
| private List<ColumnStat> toColumnStats(DataFileMeta file, InternalSchema internalSchema) { | ||
| List<ColumnStat> columnStats = new ArrayList<>(); | ||
| Map<String, InternalField> fieldMap = | ||
| internalSchema.getAllFields().stream() | ||
| .collect(Collectors.toMap(InternalField::getPath, f -> f)); | ||
|
|
||
| // stats for all columns are present in valueStats, we can safely ignore file.keyStats() - TODO: validate this assumption | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Q to Paimon experts: Is this assumption valid? |
||
| SimpleStats valueStats = file.valueStats(); | ||
| if (valueStats != null) { | ||
| List<String> colNames = file.valueStatsCols(); | ||
| if (colNames == null || colNames.isEmpty()) { | ||
| // if column names are not present, we assume all columns in the schema are present in the same order as the schema - TODO: validate this assumption | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Q to Paimon experts: Is this assumption valid?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mao-liu are you in contact with anyone on the Paimon side to get these questions answered? |
||
| colNames = | ||
| internalSchema.getAllFields().stream() | ||
| .map(InternalField::getPath) | ||
| .collect(Collectors.toList()); | ||
| } | ||
|
|
||
| if (colNames.size() != valueStats.minValues().getFieldCount()) { | ||
| // paranoia check - this should never happen, but if the code reaches here, then there is a bug! Please file a bug report | ||
| throw new ReadException( | ||
| String.format( | ||
| "Mismatch between column stats names and values arity: names=%d, values=%d", | ||
| colNames.size(), valueStats.minValues().getFieldCount())); | ||
| } | ||
|
|
||
| extractStats(columnStats, valueStats, colNames, fieldMap, file.rowCount()); | ||
| } | ||
|
|
||
| return columnStats; | ||
| } | ||
|
|
||
| private void extractStats( | ||
| List<ColumnStat> columnStats, | ||
| SimpleStats stats, | ||
| List<String> colNames, | ||
| Map<String, InternalField> fieldMap, | ||
| long rowCount) { | ||
| BinaryRow minValues = stats.minValues(); | ||
| BinaryRow maxValues = stats.maxValues(); | ||
| BinaryArray nullCounts = stats.nullCounts(); | ||
|
|
||
| for (int i = 0; i < colNames.size(); i++) { | ||
| String colName = colNames.get(i); | ||
| InternalField field = fieldMap.get(colName); | ||
| if (field == null) { | ||
| continue; | ||
| } | ||
|
|
||
| // Check if we already have stats for this field | ||
| boolean alreadyExists = | ||
| columnStats.stream().anyMatch(cs -> cs.getField().getPath().equals(colName)); | ||
| if (alreadyExists) { | ||
| continue; | ||
| } | ||
|
|
||
| InternalType type = field.getSchema().getDataType(); | ||
| Object min = getValue(minValues, i, type, field.getSchema()); | ||
| Object max = getValue(maxValues, i, type, field.getSchema()); | ||
| Long nullCount = (nullCounts != null && i < nullCounts.size()) ? nullCounts.getLong(i) : 0L; | ||
|
|
||
| columnStats.add( | ||
| ColumnStat.builder() | ||
| .field(field) | ||
| .range(Range.vector(min, max)) | ||
| .numNulls(nullCount) | ||
| .numValues(rowCount) | ||
| .build()); | ||
| } | ||
| } | ||
|
|
||
| private Object getValue(BinaryRow row, int index, InternalType type, InternalSchema fieldSchema) { | ||
| if (row.isNullAt(index)) { | ||
| return null; | ||
| } | ||
| switch (type) { | ||
| case BOOLEAN: | ||
| return row.getBoolean(index); | ||
| case INT: | ||
| case DATE: | ||
| return row.getInt(index); | ||
| case LONG: | ||
| return row.getLong(index); | ||
| case TIMESTAMP: | ||
| case TIMESTAMP_NTZ: | ||
| int tsPrecision; | ||
| InternalSchema.MetadataValue tsPrecisionEnum = | ||
| (InternalSchema.MetadataValue) | ||
| fieldSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION); | ||
| if (tsPrecisionEnum == InternalSchema.MetadataValue.MILLIS) { | ||
| tsPrecision = 3; | ||
| } else if (tsPrecisionEnum == InternalSchema.MetadataValue.MICROS) { | ||
| tsPrecision = 6; | ||
| } else if (tsPrecisionEnum == InternalSchema.MetadataValue.NANOS) { | ||
| tsPrecision = 9; | ||
| } else { | ||
| log.warn( | ||
| "Field idx={}, name={} does not have MetadataKey.TIMESTAMP_PRECISION set, defaulting to default precision", | ||
| index, | ||
| fieldSchema.getName()); | ||
| tsPrecision = TimestampType.DEFAULT_PRECISION; | ||
| } | ||
| Timestamp ts = row.getTimestamp(index, tsPrecision); | ||
|
|
||
| // according to docs for org.apache.xtable.model.stat.Range, timestamp is stored as millis | ||
| // or micros - even if precision is higher than micros, return micros | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Q: to xtable maintainers - is this assumption valid?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes this assumption is correct. Right now there is no support for other precisions so I would expect that an error is thrown if some other precision is found. |
||
| if (tsPrecisionEnum == InternalSchema.MetadataValue.MILLIS) { | ||
| return ts.getMillisecond(); | ||
| } else { | ||
| return ts.toMicros(); | ||
| } | ||
| case FLOAT: | ||
| return row.getFloat(index); | ||
| case DOUBLE: | ||
| return row.getDouble(index); | ||
| case STRING: | ||
| case ENUM: | ||
| return row.getString(index).toString(); | ||
| case DECIMAL: | ||
| int precision = | ||
| (int) fieldSchema.getMetadata().get(InternalSchema.MetadataKey.DECIMAL_PRECISION); | ||
| int scale = (int) fieldSchema.getMetadata().get(InternalSchema.MetadataKey.DECIMAL_SCALE); | ||
| return row.getDecimal(index, precision, scale).toBigDecimal(); | ||
| default: | ||
| log.warn( | ||
| "Handling of {}-type stats for column idx={}, name={} is not yet implemented, skipping stats for this column", | ||
| type, | ||
| index, | ||
| fieldSchema.getName()); | ||
| return null; | ||
| } | ||
| } | ||
|
|
||
| private SnapshotReader newSnapshotReader(FileStoreTable table, Snapshot snapshot) { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move the column stats conversion to its own class? In the future if we add Paimon as a target then we will also need to convert to the Paimon representation and it would be nice to have all this stats logic in its own class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do, thanks for the early review @the-other-tim-brown !
I do wonder though, if it is even possible to have Paimon as a target... Paimon has a pretty unique file layout, and might not be as easily "tricked" as other formats.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know enough about Paimon to say. Hudi also has a unique native layout structure to allow for update heavy workloads though and we were able to make this work.
Mainly we do this separation to keep the logic isolated though. As not necessarily relevant to Paimon, but if a table format changes how they represent stats in a new version, we can plug in the appropriate converter based on the version.