Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions lib/extensions/postgres_cdc_rls/subscriptions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,22 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
select
$4::text::uuid,
sub_tables.entity,
$6,
-- Build the realtime.user_defined_filter[] server-side from primitive text arrays
-- instead of binding a list of composite tuples. Postgrex caches the composite type's
-- field count per connection at bootstrap and never refreshes it, so a long-lived
-- connection whose cache predates an ALTER TYPE on user_defined_filter would otherwise
-- encode against a stale arity and crash with :badarg. Constructing the rows here keeps
-- the arity resolved by the server's current catalog.
(
select coalesce(
array_agg(row(c, o::realtime.equality_op, v)::realtime.user_defined_filter),
'{}'::realtime.user_defined_filter[]
)
from unnest($6::text[], $7::text[], $8::text[]) as f(c, o, v)
),
$5,
$7,
$8
$9,
$10
from
sub_tables
on conflict
Expand All @@ -95,7 +107,10 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
returning
id"
{action_filter, schema, table, filters, selected_columns} = subscription_params
query(conn, sql, [publication, schema, table, id, claims, filters, action_filter, selected_columns])
columns = Enum.map(filters, &elem(&1, 0))
ops = Enum.map(filters, &elem(&1, 1))
values = Enum.map(filters, &elem(&1, 2))
query(conn, sql, [publication, schema, table, id, claims, columns, ops, values, action_filter, selected_columns])
end

defp params_to_log({action_filter, schema, table, filters, selected_columns}) do
Expand Down
67 changes: 67 additions & 0 deletions test/realtime/extensions/cdc_rls/subscriptions_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,73 @@ defmodule Realtime.Extensions.PostgresCdcRls.SubscriptionsTest do
Subscriptions.create(conn, "supabase_realtime_test", params_list, self(), self())
end

test "create succeeds when the connection cached a stale user_defined_filter arity", %{conn: conn, tenant: tenant} do
# Regression for ErrorOnRpcCall :badarg in element(4, {col, op, value}).
#
# realtime.user_defined_filter is a composite type. Postgrex caches a composite's field
# list per connection at bootstrap and never refreshes it when ALTER TYPE changes the
# type. A connection that bootstrapped while the type briefly carried a 4th `negate`
# attribute keeps a 4-field encoder; after the attribute is dropped, encoding a 3-element
# filter tuple would call element(4, {col, op, value}) and crash with :badarg.
#
# We reproduce the stale cache by adding the attribute, opening a fresh connection (which
# caches 4 fields), then dropping it back to 3 fields before inserting.
{:ok, admin_settings} = Database.from_tenant(tenant, "realtime_test", :stop)

{:ok, admin_conn} =
Postgrex.start_link(
hostname: admin_settings.hostname,
port: admin_settings.port,
database: admin_settings.database,
username: "supabase_admin",
password: admin_settings.password
)

Postgrex.query!(
admin_conn,
"alter type realtime.user_defined_filter add attribute negate boolean cascade",
[]
)

{:ok, stale_conn} =
admin_settings
|> Map.from_struct()
|> Keyword.new()
|> Postgrex.start_link()

# Postgrex loads a composite type's field info lazily on first encode/decode, so force the
# fresh connection to cache the 4-field arity by decoding a 4-field value now.
Postgrex.query!(stale_conn, "select row('x', 'eq', 'y', true)::realtime.user_defined_filter", [])

Postgrex.query!(
admin_conn,
"alter type realtime.user_defined_filter drop attribute negate cascade",
[]
)

{:ok, subscription_params} =
Subscriptions.parse_subscription_params(%{
"schema" => "public",
"table" => "test",
"filter" => "id=eq.123"
})

params_list = [%{claims: %{"role" => "anon"}, id: UUID.uuid1(), subscription_params: subscription_params}]

assert {:ok, [%Postgrex.Result{}]} =
Subscriptions.create(stale_conn, "supabase_realtime_test", params_list, self(), self())

# Read filters back as text: the shared Postgrex type cache still holds the stale 4-field
# composite decoder, so casting to text avoids decoding through it while still proving the
# row was inserted with the right 3-field filter.
assert %Postgrex.Result{rows: [["test", ~s|{"(id,eq,123)"}|, "*"]]} =
Postgrex.query!(
conn,
"select entity::text, filters::text, action_filter from realtime.subscription",
[]
)
end

test "user can subscribe to only INSERT events", %{conn: conn} do
{:ok, subscription_params} =
Subscriptions.parse_subscription_params(%{"event" => "INSERT", "schema" => "public"})
Expand Down
Loading