From 7cc4b429975413b055c51b01a8657ccfbc3ac777 Mon Sep 17 00:00:00 2001 From: Eduardo Gurgel Pinho Date: Thu, 25 Jun 2026 13:02:56 +1200 Subject: [PATCH 1/2] wip --- .../extensions/cdc_rls/subscriptions_test.exs | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/test/realtime/extensions/cdc_rls/subscriptions_test.exs b/test/realtime/extensions/cdc_rls/subscriptions_test.exs index e51c43bf7..cfb175707 100644 --- a/test/realtime/extensions/cdc_rls/subscriptions_test.exs +++ b/test/realtime/extensions/cdc_rls/subscriptions_test.exs @@ -241,6 +241,73 @@ defmodule Realtime.Extensions.PostgresCdcRls.SubscriptionsTest do Subscriptions.create(conn, "supabase_realtime_test", params_list, self(), self()) end + test "create succeeds when the connection cached a stale user_defined_filter arity", %{conn: conn, tenant: tenant} do + # Regression for ErrorOnRpcCall :badarg in element(4, {col, op, value}). + # + # realtime.user_defined_filter is a composite type. Postgrex caches a composite's field + # list per connection at bootstrap and never refreshes it when ALTER TYPE changes the + # type. A connection that bootstrapped while the type briefly carried a 4th `negate` + # attribute keeps a 4-field encoder; after the attribute is dropped, encoding a 3-element + # filter tuple would call element(4, {col, op, value}) and crash with :badarg. + # + # We reproduce the stale cache by adding the attribute, opening a fresh connection (which + # caches 4 fields), then dropping it back to 3 fields before inserting. + {:ok, admin_settings} = Database.from_tenant(tenant, "realtime_test", :stop) + + {:ok, admin_conn} = + Postgrex.start_link( + hostname: admin_settings.hostname, + port: admin_settings.port, + database: admin_settings.database, + username: "supabase_admin", + password: admin_settings.password + ) + + Postgrex.query!( + admin_conn, + "alter type realtime.user_defined_filter add attribute negate boolean cascade", + [] + ) + + {:ok, stale_conn} = + admin_settings + |> Map.from_struct() + |> Keyword.new() + |> Postgrex.start_link() + + # Postgrex loads a composite type's field info lazily on first encode/decode, so force the + # fresh connection to cache the 4-field arity by decoding a 4-field value now. + Postgrex.query!(stale_conn, "select row('x', 'eq', 'y', true)::realtime.user_defined_filter", []) + + Postgrex.query!( + admin_conn, + "alter type realtime.user_defined_filter drop attribute negate cascade", + [] + ) + + {:ok, subscription_params} = + Subscriptions.parse_subscription_params(%{ + "schema" => "public", + "table" => "test", + "filter" => "id=eq.123" + }) + + params_list = [%{claims: %{"role" => "anon"}, id: UUID.uuid1(), subscription_params: subscription_params}] + + assert {:ok, [%Postgrex.Result{}]} = + Subscriptions.create(stale_conn, "supabase_realtime_test", params_list, self(), self()) + + # Read filters back as text: the shared Postgrex type cache still holds the stale 4-field + # composite decoder, so casting to text avoids decoding through it while still proving the + # row was inserted with the right 3-field filter. + assert %Postgrex.Result{rows: [["test", ~s|{"(id,eq,123)"}|, "*"]]} = + Postgrex.query!( + conn, + "select entity::text, filters::text, action_filter from realtime.subscription", + [] + ) + end + test "user can subscribe to only INSERT events", %{conn: conn} do {:ok, subscription_params} = Subscriptions.parse_subscription_params(%{"event" => "INSERT", "schema" => "public"}) From 82f70ea849c4eb689511487276c9a81ef3ba217b Mon Sep 17 00:00:00 2001 From: Eduardo Gurgel Pinho Date: Thu, 25 Jun 2026 13:06:39 +1200 Subject: [PATCH 2/2] fix: dont rely on cached type --- .../postgres_cdc_rls/subscriptions.ex | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/lib/extensions/postgres_cdc_rls/subscriptions.ex b/lib/extensions/postgres_cdc_rls/subscriptions.ex index a82eb179b..74eae6255 100644 --- a/lib/extensions/postgres_cdc_rls/subscriptions.ex +++ b/lib/extensions/postgres_cdc_rls/subscriptions.ex @@ -80,10 +80,22 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do select $4::text::uuid, sub_tables.entity, - $6, + -- Build the realtime.user_defined_filter[] server-side from primitive text arrays + -- instead of binding a list of composite tuples. Postgrex caches the composite type's + -- field count per connection at bootstrap and never refreshes it, so a long-lived + -- connection whose cache predates an ALTER TYPE on user_defined_filter would otherwise + -- encode against a stale arity and crash with :badarg. Constructing the rows here keeps + -- the arity resolved by the server's current catalog. + ( + select coalesce( + array_agg(row(c, o::realtime.equality_op, v)::realtime.user_defined_filter), + '{}'::realtime.user_defined_filter[] + ) + from unnest($6::text[], $7::text[], $8::text[]) as f(c, o, v) + ), $5, - $7, - $8 + $9, + $10 from sub_tables on conflict @@ -95,7 +107,10 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do returning id" {action_filter, schema, table, filters, selected_columns} = subscription_params - query(conn, sql, [publication, schema, table, id, claims, filters, action_filter, selected_columns]) + columns = Enum.map(filters, &elem(&1, 0)) + ops = Enum.map(filters, &elem(&1, 1)) + values = Enum.map(filters, &elem(&1, 2)) + query(conn, sql, [publication, schema, table, id, claims, columns, ops, values, action_filter, selected_columns]) end defp params_to_log({action_filter, schema, table, filters, selected_columns}) do