Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
<spark.version.prefix>3.4</spark.version.prefix>
<iceberg.version>1.4.2</iceberg.version>
<delta.version>2.4.0</delta.version>
<paimon.version>1.2.0</paimon.version>
<paimon.version>1.3.1</paimon.version>
<jackson.version>2.18.2</jackson.version>
<spotless.version>2.43.0</spotless.version>
<apache.rat.version>0.16.1</apache.rat.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,32 @@

package org.apache.xtable.paimon;

import java.time.Instant;
import java.util.*;
import java.util.stream.Collectors;

import lombok.extern.log4j.Log4j2;

import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryArray;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.types.TimestampType;

import org.apache.xtable.exception.ReadException;
import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.schema.InternalType;
import org.apache.xtable.model.stat.ColumnStat;
import org.apache.xtable.model.stat.Range;
import org.apache.xtable.model.storage.InternalDataFile;

@Log4j2
public class PaimonDataFileExtractor {

private final PaimonPartitionExtractor partitionExtractor =
Expand Down Expand Up @@ -61,7 +75,7 @@ private InternalDataFile toInternalDataFile(
.recordCount(entry.file().rowCount())
.partitionValues(
partitionExtractor.toPartitionValues(table, entry.partition(), internalSchema))
.columnStats(toColumnStats(entry.file()))
.columnStats(toColumnStats(entry.file(), internalSchema))
.build();
}

Expand All @@ -78,10 +92,137 @@ private String toFullPhysicalPath(FileStoreTable table, ManifestEntry entry) {
}
}

private List<ColumnStat> toColumnStats(DataFileMeta file) {
// TODO: Implement logic to extract column stats from the file meta
// https://github.com/apache/incubator-xtable/issues/755
return Collections.emptyList();
private List<ColumnStat> toColumnStats(DataFileMeta file, InternalSchema internalSchema) {
Copy link
Contributor

@the-other-tim-brown the-other-tim-brown Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move the column stats conversion to its own class? In the future if we add Paimon as a target then we will also need to convert to the Paimon representation and it would be nice to have all this stats logic in its own class.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do, thanks for the early review @the-other-tim-brown !

I do wonder though, if it is even possible to have Paimon as a target... Paimon has a pretty unique file layout, and might not be as easily "tricked" as other formats.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know enough about Paimon to say. Hudi also has a unique native layout structure to allow for update heavy workloads though and we were able to make this work.

Mainly we do this separation to keep the logic isolated though. As not necessarily relevant to Paimon, but if a table format changes how they represent stats in a new version, we can plug in the appropriate converter based on the version.

List<ColumnStat> columnStats = new ArrayList<>();
Map<String, InternalField> fieldMap =
internalSchema.getAllFields().stream()
.collect(Collectors.toMap(InternalField::getPath, f -> f));

// stats for all columns are present in valueStats, we can safely ignore file.keyStats() - TODO: validate this assumption
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q to Paimon experts: Is this assumption valid?

SimpleStats valueStats = file.valueStats();
if (valueStats != null) {
List<String> colNames = file.valueStatsCols();
if (colNames == null || colNames.isEmpty()) {
// if column names are not present, we assume all columns in the schema are present in the same order as the schema - TODO: validate this assumption
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q to Paimon experts: Is this assumption valid?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mao-liu are you in contact with anyone on the Paimon side to get these questions answered?

colNames =
internalSchema.getAllFields().stream()
.map(InternalField::getPath)
.collect(Collectors.toList());
}

if (colNames.size() != valueStats.minValues().getFieldCount()) {
// paranoia check - this should never happen, but if the code reaches here, then there is a bug! Please file a bug report
throw new ReadException(
String.format(
"Mismatch between column stats names and values arity: names=%d, values=%d",
colNames.size(), valueStats.minValues().getFieldCount()));
}

extractStats(columnStats, valueStats, colNames, fieldMap, file.rowCount());
}

return columnStats;
}

private void extractStats(
List<ColumnStat> columnStats,
SimpleStats stats,
List<String> colNames,
Map<String, InternalField> fieldMap,
long rowCount) {
BinaryRow minValues = stats.minValues();
BinaryRow maxValues = stats.maxValues();
BinaryArray nullCounts = stats.nullCounts();

for (int i = 0; i < colNames.size(); i++) {
String colName = colNames.get(i);
InternalField field = fieldMap.get(colName);
if (field == null) {
continue;
}

// Check if we already have stats for this field
boolean alreadyExists =
columnStats.stream().anyMatch(cs -> cs.getField().getPath().equals(colName));
if (alreadyExists) {
continue;
}

InternalType type = field.getSchema().getDataType();
Object min = getValue(minValues, i, type, field.getSchema());
Object max = getValue(maxValues, i, type, field.getSchema());
Long nullCount = (nullCounts != null && i < nullCounts.size()) ? nullCounts.getLong(i) : 0L;

columnStats.add(
ColumnStat.builder()
.field(field)
.range(Range.vector(min, max))
.numNulls(nullCount)
.numValues(rowCount)
.build());
}
}

private Object getValue(BinaryRow row, int index, InternalType type, InternalSchema fieldSchema) {
if (row.isNullAt(index)) {
return null;
}
switch (type) {
case BOOLEAN:
return row.getBoolean(index);
case INT:
case DATE:
return row.getInt(index);
case LONG:
return row.getLong(index);
case TIMESTAMP:
case TIMESTAMP_NTZ:
int tsPrecision;
InternalSchema.MetadataValue tsPrecisionEnum =
(InternalSchema.MetadataValue)
fieldSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION);
if (tsPrecisionEnum == InternalSchema.MetadataValue.MILLIS) {
tsPrecision = 3;
} else if (tsPrecisionEnum == InternalSchema.MetadataValue.MICROS) {
tsPrecision = 6;
} else if (tsPrecisionEnum == InternalSchema.MetadataValue.NANOS) {
tsPrecision = 9;
} else {
log.warn(
"Field idx={}, name={} does not have MetadataKey.TIMESTAMP_PRECISION set, defaulting to default precision",
index,
fieldSchema.getName());
tsPrecision = TimestampType.DEFAULT_PRECISION;
}
Timestamp ts = row.getTimestamp(index, tsPrecision);

// according to docs for org.apache.xtable.model.stat.Range, timestamp is stored as millis
// or micros - even if precision is higher than micros, return micros
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: to xtable maintainers - is this assumption valid?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this assumption is correct. Right now there is no support for other precisions so I would expect that an error is thrown if some other precision is found.

if (tsPrecisionEnum == InternalSchema.MetadataValue.MILLIS) {
return ts.getMillisecond();
} else {
return ts.toMicros();
}
case FLOAT:
return row.getFloat(index);
case DOUBLE:
return row.getDouble(index);
case STRING:
case ENUM:
return row.getString(index).toString();
case DECIMAL:
int precision =
(int) fieldSchema.getMetadata().get(InternalSchema.MetadataKey.DECIMAL_PRECISION);
int scale = (int) fieldSchema.getMetadata().get(InternalSchema.MetadataKey.DECIMAL_SCALE);
return row.getDecimal(index, precision, scale).toBigDecimal();
default:
log.warn(
"Handling of {}-type stats for column idx={}, name={} is not yet implemented, skipping stats for this column",
type,
index,
fieldSchema.getName());
return null;
}
}

private SnapshotReader newSnapshotReader(FileStoreTable table, Snapshot snapshot) {
Expand Down
67 changes: 46 additions & 21 deletions xtable-core/src/test/java/org/apache/xtable/TestPaimonTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,23 @@ public static GenericTable<GenericRow, String> createTable(
Path tempDir,
Configuration hadoopConf,
boolean additionalColumns) {

Schema schema = buildGenericSchema(partitionField, additionalColumns);
return createTable(
tableName, partitionField, tempDir, hadoopConf, additionalColumns, schema);
}

public static GenericTable<GenericRow, String> createTable(
String tableName,
String partitionField,
Path tempDir,
Configuration hadoopConf,
boolean additionalColumns,
Schema schema) {
String basePath = initBasePath(tempDir, tableName);
Catalog catalog = createFilesystemCatalog(basePath, hadoopConf);
FileStoreTable paimonTable = createTable(catalog, partitionField, additionalColumns);
FileStoreTable paimonTable =
createTable(catalog, tableName, schema);

System.out.println(
"Initialized Paimon test table at base path: "
Expand All @@ -91,19 +105,20 @@ public static Catalog createFilesystemCatalog(String basePath, Configuration had
}

public static FileStoreTable createTable(
Catalog catalog, String partitionField, boolean additionalColumns) {
Catalog catalog,
String tableName,
Schema schema) {
try {
catalog.createDatabase("test_db", true);
Identifier identifier = Identifier.create("test_db", "test_table");
Schema schema = buildSchema(partitionField, additionalColumns);
Identifier identifier = Identifier.create("test_db", tableName);
catalog.createTable(identifier, schema, true);
return (FileStoreTable) catalog.getTable(identifier);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static Schema buildSchema(String partitionField, boolean additionalColumns) {
private static Schema buildGenericSchema(String partitionField, boolean additionalColumns) {
Schema.Builder builder =
Schema.newBuilder()
.primaryKey("id")
Expand All @@ -116,7 +131,8 @@ private static Schema buildSchema(String partitionField, boolean additionalColum
.column("description", DataTypes.VARCHAR(255))
.option("bucket", "1")
.option("bucket-key", "id")
.option("full-compaction.delta-commits", "1");
.option("full-compaction.delta-commits", "1")
.option("metadata.stats-mode", "full");

if (partitionField != null) {
builder
Expand Down Expand Up @@ -178,20 +194,12 @@ public List<GenericRow> insertRecordsForSpecialPartition(int numRows) {
}

private List<GenericRow> insertRecordsToPartition(int numRows, String partitionValue) {
BatchWriteBuilder batchWriteBuilder = paimonTable.newBatchWriteBuilder();
try (BatchTableWrite writer = batchWriteBuilder.newWrite()) {
List<GenericRow> rows = new ArrayList<>(numRows);
for (int i = 0; i < numRows; i++) {
GenericRow row = buildGenericRow(i, paimonTable.schema(), partitionValue);
writer.write(row);
rows.add(row);
}
commitWrites(batchWriteBuilder, writer);
compactTable();
return rows;
} catch (Exception e) {
throw new RuntimeException("Failed to insert rows into Paimon table", e);
List<GenericRow> rows = new ArrayList<>(numRows);
for (int i = 0; i < numRows; i++) {
rows.add(buildGenericRow(i, paimonTable.schema(), partitionValue));
}
writeRows(paimonTable, rows);
return rows;
}

@Override
Expand Down Expand Up @@ -224,8 +232,12 @@ public void deleteRows(List<GenericRow> rows) {
}

private void compactTable() {
BatchWriteBuilder batchWriteBuilder = paimonTable.newBatchWriteBuilder();
SnapshotReader snapshotReader = paimonTable.newSnapshotReader();
compactTable(paimonTable);
}

public static void compactTable(FileStoreTable table) {
BatchWriteBuilder batchWriteBuilder = table.newBatchWriteBuilder();
SnapshotReader snapshotReader = table.newSnapshotReader();
try (BatchTableWrite writer = batchWriteBuilder.newWrite()) {
for (BucketEntry bucketEntry : snapshotReader.bucketEntries()) {
writer.compact(bucketEntry.partition(), bucketEntry.bucket(), true);
Expand All @@ -236,6 +248,19 @@ private void compactTable() {
}
}

public static void writeRows(FileStoreTable table, List<GenericRow> rows) {
BatchWriteBuilder batchWriteBuilder = table.newBatchWriteBuilder();
try (BatchTableWrite writer = batchWriteBuilder.newWrite()) {
for (GenericRow row : rows) {
writer.write(row);
}
commitWrites(batchWriteBuilder, writer);
compactTable(table);
} catch (Exception e) {
throw new RuntimeException("Failed to write rows into Paimon table", e);
}
}

private static void commitWrites(BatchWriteBuilder batchWriteBuilder, BatchTableWrite writer)
throws Exception {
BatchTableCommit commit = batchWriteBuilder.newCommit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ void testGetTableWithUnpartitionedTable() {
InternalTable result = unpartitionedSource.getTable(snapshot);

assertNotNull(result);
assertEquals("test_table", result.getName());
assertEquals("unpartitioned_table", result.getName());
assertEquals(TableFormat.PAIMON, result.getTableFormat());
assertNotNull(result.getReadSchema());
assertEquals(DataLayoutStrategy.HIVE_STYLE_PARTITION, result.getLayoutStrategy());
Expand Down
Loading