Skip to content

pg_lake_table: bulk-INSERT data file catalog changes#355

Open
sfc-gh-okalaci wants to merge 9 commits into
mainfrom
okalaci/bulk_insert_data_file_catalog
Open

pg_lake_table: bulk-INSERT data file catalog changes#355
sfc-gh-okalaci wants to merge 9 commits into
mainfrom
okalaci/bulk_insert_data_file_catalog

Conversation

@sfc-gh-okalaci
Copy link
Copy Markdown
Collaborator

Empirical (LOAD only): same fixed parquet per COPY, partitioned Iceberg table, one transaction, local PG17 + MinIO. Baseline = okalaci/batch_delete_in_progress_files tip immediately before this change.

Workload Baseline LOAD With this change Δ
24k files (60 × 400 rows/file) 427.8 s 414.2 s −3.2%
100k files (250 × 400 rows/file) 2411.1 s 1939.6 s −19.5%

SQL COMMIT wall time is dominated by manifest/metadata work outside ApplyDataFileCatalogChanges, so it is not a useful signal for this change and is omitted.

Previously, ApplyDataFileCatalogChanges in data_files_catalog.c walked the metadata operation list and applied each DATA_FILE_ADD with three per-row helpers (AddDataFileToTable, AddDataFileColumnStatsToCatalog, AddDataFilePartitionValueToCatalog). For N files with C stat columns and P partition fields each, that was N × (1 + C + P + 1) SPI calls plus N redundant AllPartitionTransformList() lookups.

This change collapses contiguous DATA_FILE_ADD runs into four bulk inserts of the shape INSERT … SELECT … FROM unnest(…), implemented in data_files_catalog_batch.c:

# Function What it does
1 BulkInsertDataFiles One insert into lake_table.files; ids from files_id_seq default
2 BulkInsertDataFileColumnStats One insert into lake_table.data_file_column_stats with nullable lower/upper arrays
3 BulkInsertDataFilePartitionValues One insert joining unnest(…) to files_pkey on (table_name, path); AllPartitionTransformList() once per batch
4 BulkInsertTrackedFileIds One insert joining unnest(…) to files_pkey when PgLakeAddDataFileHook opted in

ApplyDataFileCatalogChanges is now an alternating-phase loop: it flushes a run of same-type ops through FlushBatch (today only DATA_FILE_ADD; structured so adjacent DATA_FILE_REMOVE can follow later) and applies non-batchable ops one at a time via ApplySingleOp. Order between adds and other op types matches the old per-row behavior.

Layout: bulk helpers live in data_files_catalog_batch.c; private contract in data_files_catalog_internal.h (BatchableType, FlushBatch, TX_DATA_FILES_QUALIFIED_TABLE_NAME). data_files_catalog.c keeps the public orchestrator and ApplySingleOp.

Intentional simplifications

  1. Only DATA_FILE_ADD is batched. On a non-add op we flush pending adds first, so ordering vs. other ops matches the old code. Within a batch, multi-argument unnest() in FROM iterates in lockstep so $2[i], $3[i], … share one output row (no WITH ORDINALITY needed; downstream inserts join on (table_name, path), not on position).

  2. PgLakeAddDataFileHook() is still invoked once per CONTENT_DATA file (hook may inspect per-file state); paths where it returns true are batched into one insert into the tx-scoped temp table.

  3. Column stats: empty / NULL-bound stats still skip the row entirely (same as before), not (NULL, NULL) catalog rows.

Removed / moved

Per-row helpers AddDataFileToTable, InsertDataFileIdIntoTransactionTable, AddDataFileColumnStatsToCatalog, AddDataFilePartitionValueToCatalog, and GenerateDataFileId are no longer reachable and were removed. MakeArrayFromDatums was moved to pg_lake_engine (util/array_utils) for reuse.

The LOAD win scales with file count: on the order of ~280k SPI calls per ApplyDataFileCatalogChanges invocation collapse into four inserts, and the gap widens as the in-transaction catalog grows.

Co-authored-by: Cursor cursoragent@cursor.com

@sfc-gh-okalaci
Copy link
Copy Markdown
Collaborator Author

Workload | Baseline LOAD | With this change | Δ
-- | -- | -- | --
24k files (60 × 400 rows/file) | 427.8 s | 414.2 s | −3.2%
100k files (250 × 400 rows/file) | 2411.1 s | 1939.6 s | −19.5%

The gains are becoming less relevant at this point, but if you look at the wall-clock time, the 100K case still interesting, gaining 5 minutes.

And, the changes are mostly straight-forward and makes sense, so I'm still inclined to merge. Let me know what you think, we could still consider not merging this if you think the code is complex. I iterated quite a few times to simplify, hopefully it feels simple enough for you as well.

"SELECT $1, f.id, t.partition_field_id, t.value "
"FROM unnest($2, $3, $4) "
" AS t(path, partition_field_id, value) "
"JOIN " DATA_FILES_TABLE_QUALIFIED " f "
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the down-side of this PR is this addition on the JOINs. Though, as noted in the above comment, we use primary key on the files catalog, so doesn't cause any problems.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be worth getting this via RETURNING in BulkInsertDataFiles?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed such a commit. I think the downside is in memory MAX_S3_PATH_LENGTH times file counts for the HTAB.

Which is probably ok ~100MB for 100k files

default:

/*
* Future bulk paths slot in as new cases here, e.g. a run of
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intentionally a bit over-engineered for this PR but could make the data file deletes trivial, which is also relevant for bulk-deletes on partitioned tables.

@sfc-gh-okalaci
Copy link
Copy Markdown
Collaborator Author

Verified that other dependent extensions' CI also pass with this commit


/* Dispatch a run of same-typed ops to the right per-type bulk SQL. */
void
FlushBatch(Oid relationId, TableMetadataOperationType type, List *batch)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overly generic name for a public function

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bump


capacity += list_length(operation->dataFileStats.columnStats);
}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could probably break down into functions a bit more for readability

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added AppendColumnStatsForFile, and also ExecInsertDataFileColumnStats and its friends

"SELECT $1, f.id, t.partition_field_id, t.value "
"FROM unnest($2, $3, $4) "
" AS t(path, partition_field_id, value) "
"JOIN " DATA_FILES_TABLE_QUALIFIED " f "
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be worth getting this via RETURNING in BulkInsertDataFiles?

@sfc-gh-okalaci sfc-gh-okalaci force-pushed the okalaci/batch_delete_in_progress_files branch from 644e2f1 to d8882d7 Compare May 20, 2026 07:34
Base automatically changed from okalaci/batch_delete_in_progress_files to main May 20, 2026 08:44
sfc-gh-okalaci and others added 8 commits May 20, 2026 11:53
**Empirical (LOAD only):** same fixed parquet per `COPY`, partitioned Iceberg table, **one transaction**, local PG17 + MinIO. Baseline = `okalaci/batch_delete_in_progress_files` tip immediately before this change.

| Workload | Baseline LOAD | With this change | Δ |
|----------|---------------|------------------|---|
| 24k files (60 × 400 rows/file) | 427.8 s | 414.2 s | −3.2% |
| 100k files (250 × 400 rows/file) | 2411.1 s | 1939.6 s | −19.5% |

*SQL `COMMIT` wall time is dominated by manifest/metadata work outside `ApplyDataFileCatalogChanges`, so it is not a useful signal for this change and is omitted.*

Previously, `ApplyDataFileCatalogChanges` in `data_files_catalog.c` walked the metadata operation list and applied each `DATA_FILE_ADD` with three per-row helpers (`AddDataFileToTable`, `AddDataFileColumnStatsToCatalog`, `AddDataFilePartitionValueToCatalog`). For *N* files with *C* stat columns and *P* partition fields each, that was **N × (1 + C + P + 1)** SPI calls plus **N** redundant `AllPartitionTransformList()` lookups.

This change **collapses contiguous `DATA_FILE_ADD` runs** into **four** bulk inserts of the shape `INSERT … SELECT … FROM unnest(…)`, implemented in `data_files_catalog_batch.c`:

| # | Function | What it does |
|---|----------|----------------|
| 1 | `BulkInsertDataFiles` | One insert into `lake_table.files`; ids from `files_id_seq` default |
| 2 | `BulkInsertDataFileColumnStats` | One insert into `lake_table.data_file_column_stats` with nullable lower/upper arrays |
| 3 | `BulkInsertDataFilePartitionValues` | One insert joining `unnest(…)` to `files_pkey` on `(table_name, path)`; `AllPartitionTransformList()` once per batch |
| 4 | `BulkInsertTrackedFileIds` | One insert joining `unnest(…)` to `files_pkey` when `PgLakeAddDataFileHook` opted in |

`ApplyDataFileCatalogChanges` is now an **alternating-phase loop**: it flushes a run of same-type ops through `FlushBatch` (today only `DATA_FILE_ADD`; structured so adjacent `DATA_FILE_REMOVE` can follow later) and applies non-batchable ops one at a time via `ApplySingleOp`. Order between adds and other op types matches the old per-row behavior.

**Layout:** bulk helpers live in `data_files_catalog_batch.c`; private contract in `data_files_catalog_internal.h` (`BatchableType`, `FlushBatch`, `TX_DATA_FILES_QUALIFIED_TABLE_NAME`). `data_files_catalog.c` keeps the public orchestrator and `ApplySingleOp`.

1. **Only `DATA_FILE_ADD` is batched.** On a non-add op we flush pending adds first, so ordering vs. other ops matches the old code. Within a batch, multi-argument `unnest()` in `FROM` iterates in lockstep so `$2[i]`, `$3[i]`, … share one output row (no `WITH ORDINALITY` needed; downstream inserts join on `(table_name, path)`, not on position).

2. **`PgLakeAddDataFileHook()`** is still invoked once per `CONTENT_DATA` file (hook may inspect per-file state); paths where it returns true are batched into one insert into the tx-scoped temp table.

3. **Column stats:** empty / NULL-bound stats still **skip the row entirely** (same as before), not `(NULL, NULL)` catalog rows.

Per-row helpers `AddDataFileToTable`, `InsertDataFileIdIntoTransactionTable`, `AddDataFileColumnStatsToCatalog`, `AddDataFilePartitionValueToCatalog`, and `GenerateDataFileId` are no longer reachable and were removed. `MakeArrayFromDatums` was moved to `pg_lake_engine` (`util/array_utils`) for reuse.

The LOAD win **scales with file count**: on the order of ~280k SPI calls per `ApplyDataFileCatalogChanges` invocation collapse into **four** inserts, and the gap widens as the in-transaction catalog grows.

Co-authored-by: Cursor <cursoragent@cursor.com>
…id inserts

BulkInsertDataFiles now returns the sequence-assigned ids via RETURNING id,
path. FlushDataFileAddBatch builds a path->id HTAB from those rows and threads
it to BulkInsertDataFilePartitionValues and BulkInsertTrackedFileIds.

Both helpers now unnest an int8[] id array directly instead of joining back
to lake_table.files on (table_name, path). This removes N btree descents (one
per partition field row) from the write path entirely — replaced by O(1)
in-memory hash lookups.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Move the hash creation and SPI_tuptable iteration out of BulkInsertDataFiles
into a dedicated BuildFileIdHashFromReturning(count) helper. The helper owns
hash_create and the RETURNING row loop; BulkInsertDataFiles calls it while
still inside SPI then returns the populated table to FlushDataFileAddBatch.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…dFileIdHashFromReturning

The HTAB was allocated with hcxt = CurrentMemoryContext while inside an
SPI session; SPI_END() freed that context, leaving the returned pointer
dangling. Capture CurrentMemoryContext before SPI_START_EXTENSION_OWNER
and pass it as targetCxt so the hash lives in the caller's context.

Also move all mid-block declarations to the top of their blocks to satisfy
pgindent / C89 rules: HTAB *pathToFileId and MemoryContext callerCxt in
BulkInsertDataFiles, HTAB *pathToFileId and uint64 nRows in
BuildFileIdHashFromReturning, and the per-iteration locals (fileId, path,
found, entry) in the RETURNING-row loop.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…clarations

- Rename FlushBatch -> ApplyDataFileBatch (more specific public name per
  review feedback).

- Extract ExecInsert* helpers so each Bulk* function only builds arrays and
  each Exec* function only runs SPI. The pair pattern per catalog is:
    BulkInsertDataFiles          / ExecInsertDataFiles
    BulkInsertDataFileColumnStats / ExecInsertDataFileColumnStats
    BulkInsertDataFilePartitionValues / ExecInsertDataFilePartitionValues
    BulkInsertTrackedFileIds     / ExecInsertTrackedFileIds

- ExecInsertDataFiles receives count from its caller rather than reading
  SPI_processed internally.

- Move all mid-block variable declarations to the top of their blocks to
  satisfy pgindent / C89 rules.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…r-file helpers

- Declare variables at point of use throughout (project style, not pgindent
  C89 style).

- Replace generic loop/count names with descriptive ones: fileCount,
  fileIndex, statCapacity, statIndex, partitionRowCapacity,
  partitionRowIndex, trackedFileCount, fieldCount, fieldIndex.

- Extract AppendColumnStatsForFile from BulkInsertDataFileColumnStats and
  AppendPartitionValuesForFile from BulkInsertDataFilePartitionValues so
  each function has one clear job. Both helpers follow caller-above-callee
  order with forward decls at the top.

- Drop unused relationId parameter from BulkInsertTrackedFileIds (the
  hook-tracked insert does not need it).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
pgindent requires a space before * in pointer-to-struct parameters:
  TableMetadataOperation *operation -> TableMetadataOperation * operation
Apply to all six occurrences (three forward decls, three definitions).

Also rewrap two comments to fit within the 79-column line limit as
pgindent would produce them.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…spacing

pgindent puts the space before * only in function parameter lists, not
in local variable declarations. The previous commit over-applied the
substitution to foreach-local variables (lfirst assignments). Revert
those back to the no-space form.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@sfc-gh-okalaci sfc-gh-okalaci force-pushed the okalaci/bulk_insert_data_file_catalog branch from c3a8771 to 3e9599d Compare May 20, 2026 10:48
Comment thread pg_lake_table/src/fdw/data_files_catalog_batch.c
Comment thread pg_lake_table/src/fdw/data_files_catalog_batch.c
Qualify pg_catalog.unnest($1) in ExecInsertTrackedFileIds (single-array
form — a normal resolvable SRF) so search_path cannot shadow it.

Multi-argument FROM unnest($2, $3, ...) is a special UNNEST table-function
syntax (PostgreSQL docs §9.19, §7.2.1.4) and is not the same as calling
pg_catalog.unnest(text[], bigint[], ...) as a regular function; the server
raises UndefinedFunction if schema-qualified that way. Keep those sites
unqualified and add a short comment explaining why.

Co-authored-by: Cursor <cursoragent@cursor.com>
@sfc-gh-okalaci sfc-gh-okalaci force-pushed the okalaci/bulk_insert_data_file_catalog branch from 3d4ff94 to d05958b Compare May 21, 2026 09:38
List *runOps = NIL;

while (opCell != NULL &&
((TableMetadataOperation *) lfirst(opCell))->type == runType)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be expressed as a foreach loop? I don't find this syntax very intuitive

Copy link
Copy Markdown
Collaborator

@sfc-gh-mslot sfc-gh-mslot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks pretty good apart from the loop comment

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants