Streaming writes via pgduck_server RECEIVE protocol#345
Conversation
Adds a TCP listener and a new "RECEIVE" query prefix to pgduck_server.
With these in place, a remote client can stream CSV (or other data)
to a server-local sink path via the standard libpq COPY-IN protocol,
and pgduck runs a deferred query reading from that path once
CopyDone arrives.
Motivation:
The existing pgduck_server design assumes the client and pgduck share
a filesystem — the standard pg_lake "sidecar" deployment colocates
postgres and pgduck_server in the same pod and lets pg_lake's bulk-
write paths drop CSV under $PGDATA/pgsql_tmp for pgduck to read with
read_csv(). That topology breaks under deployment models that don't
co-locate the two processes (operator-managed Kubernetes deployments
where postgres and pgduck run in separate pods, multi-host setups,
etc.). The streaming-write paths in the companion pg_lake patch lean
on this RECEIVE protocol to push bytes directly to pgduck via libpq
without needing a shared filesystem.
What this patch adds:
- TCP listener on pgduck_server controlled by --listen-addresses /
--port (default Unix-socket path remains supported). Lets remote
postgres backends reach pgduck over the network.
- A "RECEIVE <inner-query>" query prefix recognized by
process_query_message. When it sees "RECEIVE …", the server:
1. Picks a server-local sink path (under --recv-dir).
2. Substitutes the bare token "@@PGLAKE_RECV_SINK@@" inside the
inner query with a properly-quoted SQL literal containing
the sink path. The server adds the surrounding single quotes
and escapes any embedded single quotes; the placeholder
itself is intentionally NOT inside a SQL string literal in
the client-emitted query, so it can never collide with user-
supplied data.
3. Sends CopyInResponse to the client and accepts CopyData
chunks, writing them straight to the sink.
4. On CopyDone, runs the deferred inner query against the sink
path and streams its result rows back via the standard
PGresult flow.
This lets clients use the existing libpq COPY-IN flow as the
transport for arbitrary inner queries that read from a path.
- recv_sink module: opens, writes, and cleans up the per-client sink
files; bounded by --recv-max-bytes; refuses path traversal.
- SSL-negotiate-byte handling: pgduck_server replies 'N' to libpq's
SSLRequest instead of closing the connection. Lets clients with
sslmode=prefer fall back to plaintext cleanly.
- Explicit pgsession_flush() after pgsession_send_copy_in_response.
Without this, the 5-byte CopyInResponse can sit in pgduck's send
buffer indefinitely (no auto-flush on small messages); the client
blocks forever in PQputCopyData waiting for the server to be ready,
and the RECEIVE handshake deadlocks. Found while debugging exactly
that ~4-hour hang during smoke runs.
Compatibility:
- Existing simple SELECT and COPY paths are unchanged.
- Unix-socket transport remains supported when --listen-addresses is
not set; nothing forces TCP.
- No new dependencies. The recv_sink uses the same memory and I/O
primitives the rest of pgsession.c uses.
- The "RECEIVE" prefix is only recognized in the simple-query path
(because it needs the server-side prefix detection that the
extended-query protocol doesn't offer). Clients using
PQsendQueryParams keep using the existing path.
Signed-off-by: Tim McLaughlin <tim@gotab.io>
|
Great job wrangling this into 2 commits! I've been following your progress because I'm also really interested in running this in Kubernetes. This is so much more readable - but it still looks like WAY more work separating these than I expected. Good job 👏🏼 |
sfc-gh-mslot
left a comment
There was a problem hiding this comment.
I think the general concept makes sense, for TRANSMIT to have a RECEIVE counter-part. I previously tried something like this where we read from the socket via a special DuckDB FileSystem, though the problem was that we lose parallelism and this was kind of slow. Using an intermediate file is probably better.
One high-level question though: It seems part of the goal is to simplify deployment on Kubernetes. If Postgres could start pgduck_server as a background process in the same container and funnel logs into the Postgres log, would that help?
|
|
||
| #include "duckdb/duckdb.h" | ||
|
|
||
| /* forward declaration; full definition lives in pgsession/recv_sink.h */ |
There was a problem hiding this comment.
these are not included
It doesn't help that much for our k8s use case because we want redundant PG pods for HA heap storage and stateless decoupled duckdb compute pod(s) that can be scaled up or down through a LB behind those PG pods. In our case we have 2x PG pods sharing a single duckdb pod but it could be 3x PG and 10x Duckdb depending on resource needs. I messed with doing it in the same pod or even the same node but liked it less and this was just more flexible. |
|
Alright, it makes sense. I think this has legs, but will need a bit of time to review the details. Would you mind including the missing files, rebasing, and doing the DCO thing mentioned when opening the PR? (commit -s) |
|
Yeah straightening out the PR. I built it as patches so I can actually use
it knowing that it’d take a while for a review if ever. To also be up front
(because some will be annoyed) it is mostly Claude with critique from GPT
and me as architect :). So don’t get your hopes up for super tight code.
…On Thu, May 14, 2026 at 3:25 AM Marco Slot ***@***.***> wrote:
*sfc-gh-mslot* left a comment (Snowflake-Labs/pg_lake#345)
<#345 (comment)>
Alright, it makes sense. I think this has legs, but will need a bit of
time to review the details.
Would you mind including the missing files, rebasing, and doing the DCO
thing mentioned when opening the PR? (commit -s)
—
Reply to this email directly, view it on GitHub
<#345 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AA2EKVU7IEWG4ENIO2MU53T42VYGLAVCNFSM6AAAAACYYGMYS2VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHM2DINBYGYYDEOJXHE>
.
You are receiving this because you authored the thread.Message ID:
***@***.***>
|
564dd0c to
2cee319
Compare
…rg metadata Adds an opt-in streaming-write feature that pushes bulk-write bytes directly to pgduck_server via libpq COPY-IN, instead of writing them to a shared filesystem under $PGDATA/pgsql_tmp and asking pgduck to read them back. Companion to the pgduck_server RECEIVE protocol. This decouples pgduck_server's filesystem from postgres's: the two processes can run on different machines, different containers, different pods, with no shared mount required. User-visible: - New GUC `pg_lake_engine.streaming_writes` (default off). When flipped on, the bulk-write code paths route through the streaming protocol; when off, all behavior is identical to today. - No SQL syntax changes. INSERT, UPDATE, DELETE, COPY FROM STDIN, CREATE TABLE iceberg, and iceberg metadata uploads all switch paths transparently based on the GUC. What this patch adds, by component: `pg_lake_engine`: - `OpenCSVStreamWriter` / `FinishCSVStreamWriter` / `CSVStreamWriterDestReceiver` — the client-side primitive that opens a libpq COPY-IN to pgduck_server's RECEIVE sink, hands back a postgres `DestReceiver` that callers drive with rows, and finalizes the deferred query when the stream ends. - `StreamLocalFileToS3` — uses the same RECEIVE protocol to stream iceberg metadata files (metadata.json, manifest list, manifest) into pgduck for upload. Replaces the file-based path that needed pgduck to see those files locally. - Cooperative wait via `WaitForResult` in the new primitives: `WaitLatchOrSocket` + `PQconsumeInput` + `CHECK_FOR_INTERRUPTS` loop, so `statement_timeout` / SIGINT / postmaster-death actually fire while the backend is waiting on pgduck. The naive `PQgetResult` would not. `pg_lake_table` (the FDW): - `multi_data_file_dest.c` (the INSERT-side `MultiDataFileUploadDestReceiver`): open a `CSVStreamWriter` instead of a CSV temp file when the GUC is on. Each rotation opens a new stream; FlushChildDestReceiver finalizes it via FinishCSVStreamWriter. The per-rotation writer lives in the receiver's existing childContext, which gets reset between rotations. The `partition` pointer on each `DataFileModification` is deep-copied under `parentContext` before that reset (NULL preserved for unpartitioned tables) so downstream consumers in `ApplyDataFileModifications` — which runs at PRE_COMMIT in a different memory-context lifetime — don't dereference into freed memory. - `pg_lake_table.c` (the UPDATE/DELETE callsites): the `deleteStreamWriter` lazy-open in `DeleteSingleRow` opens a new per-source-file stream for the position-delete records, and `FinishForeignModify` calls `FinishCSVStreamWriter` to seal it. The writer is allocated in a dedicated long-lived `deleteStreamMemoryContext` (created at create_foreign_modify time, sub of the FDW state context) so it survives the executor's per-tuple resets — the same lifetime discipline multi_data_file_dest.c already uses for its INSERT-side writer. The "all rows deleted, drop the deletion file" optimization in `WriteDeleteRecord` is gated to the file-based path; for the streaming path, the writer keeps accumulating rows and the resulting position-delete file covers all rows in the source — semantically equivalent under iceberg's position-delete merge. `pg_lake_copy` (the COPY pushdown): - `copy.c` / `copy_io.c`: `COPY foreign_table FROM STDIN` opens a `CSVStreamWriter` and pumps the client's CopyData straight through to pgduck. The non-pushdown path (when COPY can't be pushed down) still uses the file-based code. Memory-context discipline: The streaming writers live across multiple rows and sometimes across the executor's per-tuple ExprContext reset. Each writer is allocated in a long-lived context dedicated to that writer's lifetime — for the INSERT side, the existing `MultiDataFileUploadDestReceiver` childContext; for the DELETE side, a new `deleteStreamMemoryContext` on the modify state. This mirrors how the file-based path already explicitly allocates its `CreateCSVDestReceiver` "in a long-lived memory context" (per the comment in create_foreign_modify). Compatibility: - Default-off GUC: existing deployments are untouched. No behavior change unless `streaming_writes = on`. - The file-based code paths are unchanged. No regressions on shared-filesystem topologies. - No SQL surface changes. No changes to data layout or iceberg catalog representation. Testing: Verified end-to-end against a CDC-shape workload (200 mixed INSERT/UPDATE/DELETE events with COMMIT per event) via a local kind cluster + a GKE deployment with pgduck_server running in a separate pod from postgres. 10 back-to-back 600s soak runs with full diagnostic instrumentation across the entire writer lifetime detected zero correctness issues. Signed-off-by: Tim McLaughlin <tim@gotab.io>
2cee319 to
954c6e2
Compare
|
@sfc-gh-mslot it should all be up with that last push. seems to be working well enough for us in our usage so far. let me know if you see any issues. |
Streaming writes via pgduck_server RECEIVE protocol
What this PR adds
An opt-in path for pg_lake's bulk-write operations (INSERT, UPDATE,
DELETE, COPY FROM STDIN, iceberg metadata uploads) to push bytes
directly to
pgduck_serverover libpq COPY-IN, instead of writingthem to a shared filesystem under
$PGDATA/pgsql_tmpand askingpgduck to read them back via
read_csv().Activated by setting
pg_lake_engine.streaming_writes = on(default
off).Why
The existing pg_lake design assumes postgres and
pgduck_servershare a filesystem — the standard sidecar deployment puts both in
the same pod, talking via a Unix socket on a shared
$PGDATA-relative path. That layout breaks every bulk-write pathunder deployment models where postgres and pgduck don't share a
filesystem:
the
ClusterCR overrides the pod'scommand:and doesn't allowarbitrary sidecar containers).
machines.
reasons.
In all of these, today's pg_lake hits a "no such file or directory"
in
pgduck_serverwhen the bulk-write path's CSV temp lives on afilesystem the server can't see.
This PR makes pg_lake work in those topologies by routing the bulk-
write data through libpq COPY-IN to a server-local sink on
pgduck_server, decoupling the two processes' filesystems.How it works
pgduck_serverlearns a newRECEIVE <inner-query>query prefix.When the server sees
RECEIVE …:--recv_dir.'@@PG_LAKE_RECV@@'token in the inner query(which appears exactly once, already wrapped in single quotes
so it stands in for a string-literal argument — e.g.
read_csv('@@PG_LAKE_RECV@@', …)) with a single-quoted SQLliteral containing the sink path. The sink path is generated
server-side and never contains a single quote; the substitution
defensively rejects any path that does, rather than attempting
to escape, which keeps the SQL-quoting story trivial to audit.
CopyInResponseto the client; acceptsCopyDatachunksinto the sink.
CopyDone, runs the deferred inner query (whichread_csv()'s from the sink) and streams its results back via the
normal
PGresultflow.On the client side,
pg_lake_engineadds aCSVStreamWriterprimitive that builds the same
COPY (…) TO …command pg_lakealready uses for the file-based path, prefixes it with
RECEIVE,opens a libpq COPY-IN, and exposes a
DestReceiverthat the FDWcallers drive with rows.
The INSERT, UPDATE, DELETE, COPY-FROM-STDIN, and iceberg-metadata
upload paths in
pg_lake_table/pg_lake_copy/pg_lake_engineall switch transparently to the streaming path when the GUC is on.
Structure
This PR is two commits:
Self-contained: TCP listener,
RECEIVEprefix handling,recv_sinkmodule, SSL-negotiate-byte handling, explicit flushafter
CopyInResponse. Reviewable independently — nothing inpg_lake_*changes here.and iceberg metadata. Adds
CSVStreamWriterplumbing inpg_lake_engine, then adopts it in thepg_lake_table/pg_lake_copy/ iceberg-metadata callsites. Gated behind thenew
pg_lake_engine.streaming_writesGUC.Patch 2 builds and runs cleanly against a tree with only patch 1
applied — though it doesn't do much without patch 1's server-side
support.
Compatibility
via the streaming path are byte-identical to those produced via
the file-based path. The streaming path is a transport choice,
not a representation choice.
COPY / CREATE TABLE syntax.
shared-filesystem topologies.
pgduck_serveronlystarts a TCP listener if
--listen_addressesis set; theUnix-socket default behavior is preserved.
Testing
Tested against a CDC-shape workload (200 mixed INSERT/UPDATE/DELETE
events with
COMMITper event, ~10s per event, ~50 minutes ofcontinuous write activity) on a GKE deployment with
pgduck_serverin a separate pod from postgres. Final verification: 10 back-to-back
soak runs with full instrumentation across the writer's entire
lifetime — zero correctness issues detected.
The companion
cdc-stream.sql/cdc-stream-polaris.sqlregressionfixtures in our local stack exercise the I+D+U mix that the
streaming path needs to handle correctly. Happy to upstream those
test fixtures separately if useful for CI.
Operational notes
RECEIVEquery prefix is only recognized in libpq's simple-query protocol (because it needs the server-side prefix detection
that the extended-query protocol doesn't surface). Clients using
PQsendQueryParamskeep going through the existing path.disk usage is currently bounded only by the underlying filesystem.
Adding a
--recv_max_bytescap would be a small follow-up.WaitForResultis used (not rawPQgetResult) so thatstatement_timeout/ SIGINT / postmaster-death fire promptlywhile the backend is waiting on pgduck — typical libpq calls
don't observe
statement_timeoutbecause they sit belowCHECK_FOR_INTERRUPTS.