pg_lake_table: bulk-INSERT data file catalog changes#355
pg_lake_table: bulk-INSERT data file catalog changes#355sfc-gh-okalaci wants to merge 9 commits into
Conversation
The gains are becoming less relevant at this point, but if you look at the wall-clock time, the 100K case still interesting, gaining 5 minutes. And, the changes are mostly straight-forward and makes sense, so I'm still inclined to merge. Let me know what you think, we could still consider not merging this if you think the code is complex. I iterated quite a few times to simplify, hopefully it feels simple enough for you as well. |
| "SELECT $1, f.id, t.partition_field_id, t.value " | ||
| "FROM unnest($2, $3, $4) " | ||
| " AS t(path, partition_field_id, value) " | ||
| "JOIN " DATA_FILES_TABLE_QUALIFIED " f " |
There was a problem hiding this comment.
I think the down-side of this PR is this addition on the JOINs. Though, as noted in the above comment, we use primary key on the files catalog, so doesn't cause any problems.
There was a problem hiding this comment.
would it be worth getting this via RETURNING in BulkInsertDataFiles?
There was a problem hiding this comment.
I pushed such a commit. I think the downside is in memory MAX_S3_PATH_LENGTH times file counts for the HTAB.
Which is probably ok ~100MB for 100k files
| default: | ||
|
|
||
| /* | ||
| * Future bulk paths slot in as new cases here, e.g. a run of |
There was a problem hiding this comment.
intentionally a bit over-engineered for this PR but could make the data file deletes trivial, which is also relevant for bulk-deletes on partitioned tables.
|
Verified that other dependent extensions' CI also pass with this commit |
|
|
||
| /* Dispatch a run of same-typed ops to the right per-type bulk SQL. */ | ||
| void | ||
| FlushBatch(Oid relationId, TableMetadataOperationType type, List *batch) |
There was a problem hiding this comment.
overly generic name for a public function
|
|
||
| capacity += list_length(operation->dataFileStats.columnStats); | ||
| } | ||
|
|
There was a problem hiding this comment.
could probably break down into functions a bit more for readability
There was a problem hiding this comment.
added AppendColumnStatsForFile, and also ExecInsertDataFileColumnStats and its friends
| "SELECT $1, f.id, t.partition_field_id, t.value " | ||
| "FROM unnest($2, $3, $4) " | ||
| " AS t(path, partition_field_id, value) " | ||
| "JOIN " DATA_FILES_TABLE_QUALIFIED " f " |
There was a problem hiding this comment.
would it be worth getting this via RETURNING in BulkInsertDataFiles?
644e2f1 to
d8882d7
Compare
**Empirical (LOAD only):** same fixed parquet per `COPY`, partitioned Iceberg table, **one transaction**, local PG17 + MinIO. Baseline = `okalaci/batch_delete_in_progress_files` tip immediately before this change. | Workload | Baseline LOAD | With this change | Δ | |----------|---------------|------------------|---| | 24k files (60 × 400 rows/file) | 427.8 s | 414.2 s | −3.2% | | 100k files (250 × 400 rows/file) | 2411.1 s | 1939.6 s | −19.5% | *SQL `COMMIT` wall time is dominated by manifest/metadata work outside `ApplyDataFileCatalogChanges`, so it is not a useful signal for this change and is omitted.* Previously, `ApplyDataFileCatalogChanges` in `data_files_catalog.c` walked the metadata operation list and applied each `DATA_FILE_ADD` with three per-row helpers (`AddDataFileToTable`, `AddDataFileColumnStatsToCatalog`, `AddDataFilePartitionValueToCatalog`). For *N* files with *C* stat columns and *P* partition fields each, that was **N × (1 + C + P + 1)** SPI calls plus **N** redundant `AllPartitionTransformList()` lookups. This change **collapses contiguous `DATA_FILE_ADD` runs** into **four** bulk inserts of the shape `INSERT … SELECT … FROM unnest(…)`, implemented in `data_files_catalog_batch.c`: | # | Function | What it does | |---|----------|----------------| | 1 | `BulkInsertDataFiles` | One insert into `lake_table.files`; ids from `files_id_seq` default | | 2 | `BulkInsertDataFileColumnStats` | One insert into `lake_table.data_file_column_stats` with nullable lower/upper arrays | | 3 | `BulkInsertDataFilePartitionValues` | One insert joining `unnest(…)` to `files_pkey` on `(table_name, path)`; `AllPartitionTransformList()` once per batch | | 4 | `BulkInsertTrackedFileIds` | One insert joining `unnest(…)` to `files_pkey` when `PgLakeAddDataFileHook` opted in | `ApplyDataFileCatalogChanges` is now an **alternating-phase loop**: it flushes a run of same-type ops through `FlushBatch` (today only `DATA_FILE_ADD`; structured so adjacent `DATA_FILE_REMOVE` can follow later) and applies non-batchable ops one at a time via `ApplySingleOp`. Order between adds and other op types matches the old per-row behavior. **Layout:** bulk helpers live in `data_files_catalog_batch.c`; private contract in `data_files_catalog_internal.h` (`BatchableType`, `FlushBatch`, `TX_DATA_FILES_QUALIFIED_TABLE_NAME`). `data_files_catalog.c` keeps the public orchestrator and `ApplySingleOp`. 1. **Only `DATA_FILE_ADD` is batched.** On a non-add op we flush pending adds first, so ordering vs. other ops matches the old code. Within a batch, multi-argument `unnest()` in `FROM` iterates in lockstep so `$2[i]`, `$3[i]`, … share one output row (no `WITH ORDINALITY` needed; downstream inserts join on `(table_name, path)`, not on position). 2. **`PgLakeAddDataFileHook()`** is still invoked once per `CONTENT_DATA` file (hook may inspect per-file state); paths where it returns true are batched into one insert into the tx-scoped temp table. 3. **Column stats:** empty / NULL-bound stats still **skip the row entirely** (same as before), not `(NULL, NULL)` catalog rows. Per-row helpers `AddDataFileToTable`, `InsertDataFileIdIntoTransactionTable`, `AddDataFileColumnStatsToCatalog`, `AddDataFilePartitionValueToCatalog`, and `GenerateDataFileId` are no longer reachable and were removed. `MakeArrayFromDatums` was moved to `pg_lake_engine` (`util/array_utils`) for reuse. The LOAD win **scales with file count**: on the order of ~280k SPI calls per `ApplyDataFileCatalogChanges` invocation collapse into **four** inserts, and the gap widens as the in-transaction catalog grows. Co-authored-by: Cursor <cursoragent@cursor.com>
…id inserts BulkInsertDataFiles now returns the sequence-assigned ids via RETURNING id, path. FlushDataFileAddBatch builds a path->id HTAB from those rows and threads it to BulkInsertDataFilePartitionValues and BulkInsertTrackedFileIds. Both helpers now unnest an int8[] id array directly instead of joining back to lake_table.files on (table_name, path). This removes N btree descents (one per partition field row) from the write path entirely — replaced by O(1) in-memory hash lookups. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Move the hash creation and SPI_tuptable iteration out of BulkInsertDataFiles into a dedicated BuildFileIdHashFromReturning(count) helper. The helper owns hash_create and the RETURNING row loop; BulkInsertDataFiles calls it while still inside SPI then returns the populated table to FlushDataFileAddBatch. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…dFileIdHashFromReturning The HTAB was allocated with hcxt = CurrentMemoryContext while inside an SPI session; SPI_END() freed that context, leaving the returned pointer dangling. Capture CurrentMemoryContext before SPI_START_EXTENSION_OWNER and pass it as targetCxt so the hash lives in the caller's context. Also move all mid-block declarations to the top of their blocks to satisfy pgindent / C89 rules: HTAB *pathToFileId and MemoryContext callerCxt in BulkInsertDataFiles, HTAB *pathToFileId and uint64 nRows in BuildFileIdHashFromReturning, and the per-iteration locals (fileId, path, found, entry) in the RETURNING-row loop. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…clarations
- Rename FlushBatch -> ApplyDataFileBatch (more specific public name per
review feedback).
- Extract ExecInsert* helpers so each Bulk* function only builds arrays and
each Exec* function only runs SPI. The pair pattern per catalog is:
BulkInsertDataFiles / ExecInsertDataFiles
BulkInsertDataFileColumnStats / ExecInsertDataFileColumnStats
BulkInsertDataFilePartitionValues / ExecInsertDataFilePartitionValues
BulkInsertTrackedFileIds / ExecInsertTrackedFileIds
- ExecInsertDataFiles receives count from its caller rather than reading
SPI_processed internally.
- Move all mid-block variable declarations to the top of their blocks to
satisfy pgindent / C89 rules.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…r-file helpers - Declare variables at point of use throughout (project style, not pgindent C89 style). - Replace generic loop/count names with descriptive ones: fileCount, fileIndex, statCapacity, statIndex, partitionRowCapacity, partitionRowIndex, trackedFileCount, fieldCount, fieldIndex. - Extract AppendColumnStatsForFile from BulkInsertDataFileColumnStats and AppendPartitionValuesForFile from BulkInsertDataFilePartitionValues so each function has one clear job. Both helpers follow caller-above-callee order with forward decls at the top. - Drop unused relationId parameter from BulkInsertTrackedFileIds (the hook-tracked insert does not need it). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
pgindent requires a space before * in pointer-to-struct parameters: TableMetadataOperation *operation -> TableMetadataOperation * operation Apply to all six occurrences (three forward decls, three definitions). Also rewrap two comments to fit within the 79-column line limit as pgindent would produce them. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…spacing pgindent puts the space before * only in function parameter lists, not in local variable declarations. The previous commit over-applied the substitution to foreach-local variables (lfirst assignments). Revert those back to the no-space form. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
c3a8771 to
3e9599d
Compare
Qualify pg_catalog.unnest($1) in ExecInsertTrackedFileIds (single-array form — a normal resolvable SRF) so search_path cannot shadow it. Multi-argument FROM unnest($2, $3, ...) is a special UNNEST table-function syntax (PostgreSQL docs §9.19, §7.2.1.4) and is not the same as calling pg_catalog.unnest(text[], bigint[], ...) as a regular function; the server raises UndefinedFunction if schema-qualified that way. Keep those sites unqualified and add a short comment explaining why. Co-authored-by: Cursor <cursoragent@cursor.com>
3d4ff94 to
d05958b
Compare
| List *runOps = NIL; | ||
|
|
||
| while (opCell != NULL && | ||
| ((TableMetadataOperation *) lfirst(opCell))->type == runType) |
There was a problem hiding this comment.
can this be expressed as a foreach loop? I don't find this syntax very intuitive
sfc-gh-mslot
left a comment
There was a problem hiding this comment.
looks pretty good apart from the loop comment
Empirical (LOAD only): same fixed parquet per
COPY, partitioned Iceberg table, one transaction, local PG17 + MinIO. Baseline =okalaci/batch_delete_in_progress_filestip immediately before this change.SQL
COMMITwall time is dominated by manifest/metadata work outsideApplyDataFileCatalogChanges, so it is not a useful signal for this change and is omitted.Previously,
ApplyDataFileCatalogChangesindata_files_catalog.cwalked the metadata operation list and applied eachDATA_FILE_ADDwith three per-row helpers (AddDataFileToTable,AddDataFileColumnStatsToCatalog,AddDataFilePartitionValueToCatalog). For N files with C stat columns and P partition fields each, that was N × (1 + C + P + 1) SPI calls plus N redundantAllPartitionTransformList()lookups.This change collapses contiguous
DATA_FILE_ADDruns into four bulk inserts of the shapeINSERT … SELECT … FROM unnest(…), implemented indata_files_catalog_batch.c:BulkInsertDataFileslake_table.files; ids fromfiles_id_seqdefaultBulkInsertDataFileColumnStatslake_table.data_file_column_statswith nullable lower/upper arraysBulkInsertDataFilePartitionValuesunnest(…)tofiles_pkeyon(table_name, path);AllPartitionTransformList()once per batchBulkInsertTrackedFileIdsunnest(…)tofiles_pkeywhenPgLakeAddDataFileHookopted inApplyDataFileCatalogChangesis now an alternating-phase loop: it flushes a run of same-type ops throughFlushBatch(today onlyDATA_FILE_ADD; structured so adjacentDATA_FILE_REMOVEcan follow later) and applies non-batchable ops one at a time viaApplySingleOp. Order between adds and other op types matches the old per-row behavior.Layout: bulk helpers live in
data_files_catalog_batch.c; private contract indata_files_catalog_internal.h(BatchableType,FlushBatch,TX_DATA_FILES_QUALIFIED_TABLE_NAME).data_files_catalog.ckeeps the public orchestrator andApplySingleOp.Intentional simplifications
Only
DATA_FILE_ADDis batched. On a non-add op we flush pending adds first, so ordering vs. other ops matches the old code. Within a batch, multi-argumentunnest()inFROMiterates in lockstep so$2[i],$3[i], … share one output row (noWITH ORDINALITYneeded; downstream inserts join on(table_name, path), not on position).PgLakeAddDataFileHook()is still invoked once perCONTENT_DATAfile (hook may inspect per-file state); paths where it returns true are batched into one insert into the tx-scoped temp table.Column stats: empty / NULL-bound stats still skip the row entirely (same as before), not
(NULL, NULL)catalog rows.Removed / moved
Per-row helpers
AddDataFileToTable,InsertDataFileIdIntoTransactionTable,AddDataFileColumnStatsToCatalog,AddDataFilePartitionValueToCatalog, andGenerateDataFileIdare no longer reachable and were removed.MakeArrayFromDatumswas moved topg_lake_engine(util/array_utils) for reuse.The LOAD win scales with file count: on the order of ~280k SPI calls per
ApplyDataFileCatalogChangesinvocation collapse into four inserts, and the gap widens as the in-transaction catalog grows.Co-authored-by: Cursor cursoragent@cursor.com