Skip to content

Streaming writes via pgduck_server RECEIVE protocol#345

Open
timmclaughlin wants to merge 2 commits into
Snowflake-Labs:mainfrom
timmclaughlin:feat/streaming-writes
Open

Streaming writes via pgduck_server RECEIVE protocol#345
timmclaughlin wants to merge 2 commits into
Snowflake-Labs:mainfrom
timmclaughlin:feat/streaming-writes

Conversation

@timmclaughlin
Copy link
Copy Markdown
Contributor

@timmclaughlin timmclaughlin commented May 10, 2026

Streaming writes via pgduck_server RECEIVE protocol

What this PR adds

An opt-in path for pg_lake's bulk-write operations (INSERT, UPDATE,
DELETE, COPY FROM STDIN, iceberg metadata uploads) to push bytes
directly to pgduck_server over libpq COPY-IN, instead of writing
them to a shared filesystem under $PGDATA/pgsql_tmp and asking
pgduck to read them back via read_csv().

Activated by setting pg_lake_engine.streaming_writes = on
(default off).

Why

The existing pg_lake design assumes postgres and pgduck_server
share a filesystem — the standard sidecar deployment puts both in
the same pod, talking via a Unix socket on a shared
$PGDATA-relative path. That layout breaks every bulk-write path
under deployment models where postgres and pgduck don't share a
filesystem:

  • Operator-managed Kubernetes deployments (e.g. CloudNativePG, where
    the Cluster CR overrides the pod's command: and doesn't allow
    arbitrary sidecar containers).
  • Multi-host setups where postgres and pgduck run on different
    machines.
  • Anywhere a co-located sidecar isn't practical for ops or security
    reasons.

In all of these, today's pg_lake hits a "no such file or directory"
in pgduck_server when the bulk-write path's CSV temp lives on a
filesystem the server can't see.

This PR makes pg_lake work in those topologies by routing the bulk-
write data through libpq COPY-IN to a server-local sink on
pgduck_server, decoupling the two processes' filesystems.

How it works

pgduck_server learns a new RECEIVE <inner-query> query prefix.
When the server sees RECEIVE …:

  1. Picks a server-local sink path under --recv_dir.
  2. Substitutes the '@@PG_LAKE_RECV@@' token in the inner query
    (which appears exactly once, already wrapped in single quotes
    so it stands in for a string-literal argument — e.g.
    read_csv('@@PG_LAKE_RECV@@', …)) with a single-quoted SQL
    literal containing the sink path. The sink path is generated
    server-side and never contains a single quote; the substitution
    defensively rejects any path that does, rather than attempting
    to escape, which keeps the SQL-quoting story trivial to audit.
  3. Sends CopyInResponse to the client; accepts CopyData chunks
    into the sink.
  4. On CopyDone, runs the deferred inner query (which
    read_csv()'s from the sink) and streams its results back via the
    normal PGresult flow.

On the client side, pg_lake_engine adds a CSVStreamWriter
primitive that builds the same COPY (…) TO … command pg_lake
already uses for the file-based path, prefixes it with RECEIVE,
opens a libpq COPY-IN, and exposes a DestReceiver that the FDW
callers drive with rows.

The INSERT, UPDATE, DELETE, COPY-FROM-STDIN, and iceberg-metadata
upload paths in pg_lake_table / pg_lake_copy / pg_lake_engine
all switch transparently to the streaming path when the GUC is on.

Structure

This PR is two commits:

  1. pgduck_server: add RECEIVE protocol for streaming COPY-IN sink.
    Self-contained: TCP listener, RECEIVE prefix handling,
    recv_sink module, SSL-negotiate-byte handling, explicit flush
    after CopyInResponse. Reviewable independently — nothing in
    pg_lake_* changes here.
  2. pg_lake: streaming writes for INSERT, UPDATE, DELETE, COPY,
    and iceberg metadata.
    Adds CSVStreamWriter plumbing in
    pg_lake_engine, then adopts it in the pg_lake_table /
    pg_lake_copy / iceberg-metadata callsites. Gated behind the
    new pg_lake_engine.streaming_writes GUC.

Patch 2 builds and runs cleanly against a tree with only patch 1
applied — though it doesn't do much without patch 1's server-side
support.

Compatibility

  • Default off. Existing deployments are unaffected.
  • No data layout or catalog changes. Iceberg tables produced
    via the streaming path are byte-identical to those produced via
    the file-based path. The streaming path is a transport choice,
    not a representation choice.
  • No SQL surface changes. Same INSERT / UPDATE / DELETE /
    COPY / CREATE TABLE syntax.
  • Existing file-based paths unchanged. No regressions for
    shared-filesystem topologies.
  • Unix-socket transport remains supported. pgduck_server only
    starts a TCP listener if --listen_addresses is set; the
    Unix-socket default behavior is preserved.

Testing

Tested against a CDC-shape workload (200 mixed INSERT/UPDATE/DELETE
events with COMMIT per event, ~10s per event, ~50 minutes of
continuous write activity) on a GKE deployment with pgduck_server
in a separate pod from postgres. Final verification: 10 back-to-back
soak runs with full instrumentation across the writer's entire
lifetime — zero correctness issues detected.

The companion cdc-stream.sql / cdc-stream-polaris.sql regression
fixtures in our local stack exercise the I+D+U mix that the
streaming path needs to handle correctly. Happy to upstream those
test fixtures separately if useful for CI.

Operational notes

  • The RECEIVE query prefix is only recognized in libpq's simple-
    query protocol (because it needs the server-side prefix detection
    that the extended-query protocol doesn't surface). Clients using
    PQsendQueryParams keep going through the existing path.
  • This PR does not add a per-stream byte cap on sink size;
    disk usage is currently bounded only by the underlying filesystem.
    Adding a --recv_max_bytes cap would be a small follow-up.
  • WaitForResult is used (not raw PQgetResult) so that
    statement_timeout / SIGINT / postmaster-death fire promptly
    while the backend is waiting on pgduck — typical libpq calls
    don't observe statement_timeout because they sit below
    CHECK_FOR_INTERRUPTS.

Adds a TCP listener and a new "RECEIVE" query prefix to pgduck_server.
With these in place, a remote client can stream CSV (or other data)
to a server-local sink path via the standard libpq COPY-IN protocol,
and pgduck runs a deferred query reading from that path once
CopyDone arrives.

Motivation:

The existing pgduck_server design assumes the client and pgduck share
a filesystem — the standard pg_lake "sidecar" deployment colocates
postgres and pgduck_server in the same pod and lets pg_lake's bulk-
write paths drop CSV under $PGDATA/pgsql_tmp for pgduck to read with
read_csv(). That topology breaks under deployment models that don't
co-locate the two processes (operator-managed Kubernetes deployments
where postgres and pgduck run in separate pods, multi-host setups,
etc.). The streaming-write paths in the companion pg_lake patch lean
on this RECEIVE protocol to push bytes directly to pgduck via libpq
without needing a shared filesystem.

What this patch adds:

- TCP listener on pgduck_server controlled by --listen-addresses /
  --port (default Unix-socket path remains supported). Lets remote
  postgres backends reach pgduck over the network.
- A "RECEIVE <inner-query>" query prefix recognized by
  process_query_message. When it sees "RECEIVE …", the server:
    1. Picks a server-local sink path (under --recv-dir).
    2. Substitutes the bare token "@@PGLAKE_RECV_SINK@@" inside the
       inner query with a properly-quoted SQL literal containing
       the sink path. The server adds the surrounding single quotes
       and escapes any embedded single quotes; the placeholder
       itself is intentionally NOT inside a SQL string literal in
       the client-emitted query, so it can never collide with user-
       supplied data.
    3. Sends CopyInResponse to the client and accepts CopyData
       chunks, writing them straight to the sink.
    4. On CopyDone, runs the deferred inner query against the sink
       path and streams its result rows back via the standard
       PGresult flow.
  This lets clients use the existing libpq COPY-IN flow as the
  transport for arbitrary inner queries that read from a path.
- recv_sink module: opens, writes, and cleans up the per-client sink
  files; bounded by --recv-max-bytes; refuses path traversal.
- SSL-negotiate-byte handling: pgduck_server replies 'N' to libpq's
  SSLRequest instead of closing the connection. Lets clients with
  sslmode=prefer fall back to plaintext cleanly.
- Explicit pgsession_flush() after pgsession_send_copy_in_response.
  Without this, the 5-byte CopyInResponse can sit in pgduck's send
  buffer indefinitely (no auto-flush on small messages); the client
  blocks forever in PQputCopyData waiting for the server to be ready,
  and the RECEIVE handshake deadlocks. Found while debugging exactly
  that ~4-hour hang during smoke runs.

Compatibility:

- Existing simple SELECT and COPY paths are unchanged.
- Unix-socket transport remains supported when --listen-addresses is
  not set; nothing forces TCP.
- No new dependencies. The recv_sink uses the same memory and I/O
  primitives the rest of pgsession.c uses.
- The "RECEIVE" prefix is only recognized in the simple-query path
  (because it needs the server-side prefix detection that the
  extended-query protocol doesn't offer). Clients using
  PQsendQueryParams keep using the existing path.

Signed-off-by: Tim McLaughlin <tim@gotab.io>
@bkanuka
Copy link
Copy Markdown

bkanuka commented May 13, 2026

Great job wrangling this into 2 commits! I've been following your progress because I'm also really interested in running this in Kubernetes. This is so much more readable - but it still looks like WAY more work separating these than I expected. Good job 👏🏼

Copy link
Copy Markdown
Collaborator

@sfc-gh-mslot sfc-gh-mslot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the general concept makes sense, for TRANSMIT to have a RECEIVE counter-part. I previously tried something like this where we read from the socket via a special DuckDB FileSystem, though the problem was that we lose parallelism and this was kind of slow. Using an intermediate file is probably better.

One high-level question though: It seems part of the goal is to simplify deployment on Kubernetes. If Postgres could start pgduck_server as a background process in the same container and funnel logs into the Postgres log, would that help?


#include "duckdb/duckdb.h"

/* forward declaration; full definition lives in pgsession/recv_sink.h */
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are not included

@timmclaughlin
Copy link
Copy Markdown
Contributor Author

One high-level question though: It seems part of the goal is to simplify deployment on Kubernetes. If Postgres could start pgduck_server as a background process in the same container and funnel logs into the Postgres log, would that help?

It doesn't help that much for our k8s use case because we want redundant PG pods for HA heap storage and stateless decoupled duckdb compute pod(s) that can be scaled up or down through a LB behind those PG pods. In our case we have 2x PG pods sharing a single duckdb pod but it could be 3x PG and 10x Duckdb depending on resource needs. I messed with doing it in the same pod or even the same node but liked it less and this was just more flexible.

@sfc-gh-mslot
Copy link
Copy Markdown
Collaborator

Alright, it makes sense. I think this has legs, but will need a bit of time to review the details.

Would you mind including the missing files, rebasing, and doing the DCO thing mentioned when opening the PR? (commit -s)

@timmclaughlin
Copy link
Copy Markdown
Contributor Author

timmclaughlin commented May 14, 2026 via email

@timmclaughlin timmclaughlin force-pushed the feat/streaming-writes branch 3 times, most recently from 564dd0c to 2cee319 Compare May 14, 2026 15:50
…rg metadata

Adds an opt-in streaming-write feature that pushes bulk-write bytes
directly to pgduck_server via libpq COPY-IN, instead of writing them
to a shared filesystem under $PGDATA/pgsql_tmp and asking pgduck to
read them back. Companion to the pgduck_server RECEIVE protocol.

This decouples pgduck_server's filesystem from postgres's: the two
processes can run on different machines, different containers,
different pods, with no shared mount required.

User-visible:

- New GUC `pg_lake_engine.streaming_writes` (default off). When
  flipped on, the bulk-write code paths route through the streaming
  protocol; when off, all behavior is identical to today.
- No SQL syntax changes. INSERT, UPDATE, DELETE, COPY FROM STDIN,
  CREATE TABLE iceberg, and iceberg metadata uploads all switch
  paths transparently based on the GUC.

What this patch adds, by component:

`pg_lake_engine`:

- `OpenCSVStreamWriter` / `FinishCSVStreamWriter` /
  `CSVStreamWriterDestReceiver` — the client-side primitive that
  opens a libpq COPY-IN to pgduck_server's RECEIVE sink, hands back
  a postgres `DestReceiver` that callers drive with rows, and
  finalizes the deferred query when the stream ends.
- `StreamLocalFileToS3` — uses the same RECEIVE protocol to stream
  iceberg metadata files (metadata.json, manifest list, manifest)
  into pgduck for upload. Replaces the file-based path that needed
  pgduck to see those files locally.
- Cooperative wait via `WaitForResult` in the new primitives:
  `WaitLatchOrSocket` + `PQconsumeInput` + `CHECK_FOR_INTERRUPTS`
  loop, so `statement_timeout` / SIGINT / postmaster-death actually
  fire while the backend is waiting on pgduck. The naive
  `PQgetResult` would not.

`pg_lake_table` (the FDW):

- `multi_data_file_dest.c` (the INSERT-side `MultiDataFileUploadDestReceiver`):
  open a `CSVStreamWriter` instead of a CSV temp file when the GUC
  is on. Each rotation opens a new stream; FlushChildDestReceiver
  finalizes it via FinishCSVStreamWriter. The per-rotation writer
  lives in the receiver's existing childContext, which gets reset
  between rotations. The `partition` pointer on each
  `DataFileModification` is deep-copied under `parentContext` before
  that reset (NULL preserved for unpartitioned tables) so downstream
  consumers in `ApplyDataFileModifications` — which runs at PRE_COMMIT
  in a different memory-context lifetime — don't dereference into
  freed memory.
- `pg_lake_table.c` (the UPDATE/DELETE callsites): the
  `deleteStreamWriter` lazy-open in `DeleteSingleRow` opens a new
  per-source-file stream for the position-delete records, and
  `FinishForeignModify` calls `FinishCSVStreamWriter` to seal it.
  The writer is allocated in a dedicated long-lived
  `deleteStreamMemoryContext` (created at create_foreign_modify
  time, sub of the FDW state context) so it survives the executor's
  per-tuple resets — the same lifetime discipline
  multi_data_file_dest.c already uses for its INSERT-side writer.
  The "all rows deleted, drop the deletion file" optimization in
  `WriteDeleteRecord` is gated to the file-based path; for the
  streaming path, the writer keeps accumulating rows and the
  resulting position-delete file covers all rows in the source —
  semantically equivalent under iceberg's position-delete merge.

`pg_lake_copy` (the COPY pushdown):

- `copy.c` / `copy_io.c`: `COPY foreign_table FROM STDIN` opens a
  `CSVStreamWriter` and pumps the client's CopyData straight through
  to pgduck. The non-pushdown path (when COPY can't be pushed down)
  still uses the file-based code.

Memory-context discipline:

The streaming writers live across multiple rows and sometimes across
the executor's per-tuple ExprContext reset. Each writer is allocated
in a long-lived context dedicated to that writer's lifetime — for
the INSERT side, the existing `MultiDataFileUploadDestReceiver`
childContext; for the DELETE side, a new
`deleteStreamMemoryContext` on the modify state. This mirrors how
the file-based path already explicitly allocates its
`CreateCSVDestReceiver` "in a long-lived memory context" (per the
comment in create_foreign_modify).

Compatibility:

- Default-off GUC: existing deployments are untouched. No behavior
  change unless `streaming_writes = on`.
- The file-based code paths are unchanged. No regressions on
  shared-filesystem topologies.
- No SQL surface changes. No changes to data layout or iceberg
  catalog representation.

Testing:

Verified end-to-end against a CDC-shape workload (200 mixed
INSERT/UPDATE/DELETE events with COMMIT per event) via a local kind
cluster + a GKE deployment with pgduck_server running in a separate
pod from postgres. 10 back-to-back 600s soak runs with full
diagnostic instrumentation across the entire writer lifetime
detected zero correctness issues.

Signed-off-by: Tim McLaughlin <tim@gotab.io>
@timmclaughlin timmclaughlin force-pushed the feat/streaming-writes branch from 2cee319 to 954c6e2 Compare May 14, 2026 16:56
@timmclaughlin
Copy link
Copy Markdown
Contributor Author

@sfc-gh-mslot it should all be up with that last push. seems to be working well enough for us in our usage so far. let me know if you see any issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants