From 330212e774eff55b450e3488b5dddc2f0b957b70 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 27 Nov 2025 13:02:44 +0100 Subject: [PATCH 1/4] Convert pg_backend_pid returns from Postgres from string to int The reason it is returned as a string, even though the return type of the PG function pg_backend_pid() is integer, is because Postgrex.ReplicationConnection uses simply query protocol. Hence the explicit type conversion in our Elixir code. --- .../electric/postgres/replication_client/connection_setup.ex | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex b/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex index 2c0a046e86..9fd4cb1290 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex @@ -90,10 +90,11 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do end defp pg_info_result([%Postgrex.Result{} = result], state) do - %{rows: [[version_str, backend_pid]]} = result + %{rows: [[version_str, backend_pid_str]]} = result version_num = String.to_integer(version_str) + backend_pid_num = String.to_integer(backend_pid_str) - {%{server_version_num: version_num, pg_backend_pid: backend_pid}, + {%{server_version_num: version_num, pg_backend_pid: backend_pid_num}, %{state | pg_version: version_num}} end From 966ce115e390bfee90c6bade408f5971d12cc3de Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 27 Nov 2025 13:03:20 +0100 Subject: [PATCH 2/4] Use explicit naming for pg_backend_pid to avoid the confusion with Elixir process pid --- .../lib/electric/connection/manager.ex | 13 +++++++------ .../electric/postgres/lock_breaker_connection.ex | 14 +++++++------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/packages/sync-service/lib/electric/connection/manager.ex b/packages/sync-service/lib/electric/connection/manager.ex index c8799174ee..94df1ce856 100644 --- a/packages/sync-service/lib/electric/connection/manager.ex +++ b/packages/sync-service/lib/electric/connection/manager.ex @@ -543,16 +543,17 @@ defmodule Electric.Connection.Manager do def handle_continue( :check_lock_not_abandoned, - %State{replication_pg_backend_pid: pid} = state + %State{replication_pg_backend_pid: pg_backend_pid} = state ) do - if state.current_step == {:start_replication_client, :acquiring_lock} and not is_nil(pid) do + if state.current_step == {:start_replication_client, :acquiring_lock} and + not is_nil(pg_backend_pid) do with {:ok, conn_opts, state} <- validate_connection(pooled_connection_opts(state), :pool, state), {:ok, breaker_pid} <- LockBreakerConnection.start(connection_opts: conn_opts, stack_id: state.stack_id) do lock_name = Keyword.fetch!(state.replication_opts, :slot_name) - LockBreakerConnection.stop_backends_and_close(breaker_pid, lock_name, pid) + LockBreakerConnection.stop_backends_and_close(breaker_pid, lock_name, pg_backend_pid) else {:error, reason} -> # no-op, this is a one-shot attempt at fixing a lock @@ -1149,7 +1150,7 @@ defmodule Electric.Connection.Manager do end defp kill_replication_backend(%State{replication_pg_backend_pid: nil} = state) do - Logger.debug("Skipping killing replication backend, pid not known") + Logger.debug("Skipping killing replication backend, pg_backend_pid not known") state end @@ -1158,9 +1159,9 @@ defmodule Electric.Connection.Manager do state end - defp kill_replication_backend(%State{replication_pg_backend_pid: backend_pid} = state) do + defp kill_replication_backend(%State{replication_pg_backend_pid: pg_backend_pid} = state) do pool = pool_name(state.stack_id, :admin) - execute_and_log_errors(pool, "SELECT pg_terminate_backend(#{backend_pid});") + execute_and_log_errors(pool, "SELECT pg_terminate_backend(#{pg_backend_pid});") %{state | replication_pg_backend_pid: nil} end diff --git a/packages/sync-service/lib/electric/postgres/lock_breaker_connection.ex b/packages/sync-service/lib/electric/postgres/lock_breaker_connection.ex index 3f0d4a54b5..eb2427bb18 100644 --- a/packages/sync-service/lib/electric/postgres/lock_breaker_connection.ex +++ b/packages/sync-service/lib/electric/postgres/lock_breaker_connection.ex @@ -46,8 +46,8 @@ defmodule Electric.Postgres.LockBreakerConnection do end end - def stop_backends_and_close(server, lock_name, lock_connection_backend_pid \\ nil) do - send(server, {:stop_backends_and_close, lock_name, lock_connection_backend_pid}) + def stop_backends_and_close(server, lock_name, lock_connection_pg_backend_pid \\ nil) do + send(server, {:stop_backends_and_close, lock_name, lock_connection_pg_backend_pid}) end @impl true @@ -74,10 +74,10 @@ defmodule Electric.Postgres.LockBreakerConnection do @impl true def handle_info( - {:stop_backends_and_close, lock_name, lock_connection_backend_pid}, + {:stop_backends_and_close, lock_name, lock_connection_pg_backend_pid}, state ) do - {:query, lock_breaker_query(lock_name, lock_connection_backend_pid, state.database), + {:query, lock_breaker_query(lock_name, lock_connection_pg_backend_pid, state.database), Map.put(state, :lock_name, lock_name)} end @@ -101,8 +101,8 @@ defmodule Electric.Postgres.LockBreakerConnection do @impl true def notify(_, _, _), do: :ok - defp lock_breaker_query(lock_name, lock_connection_backend_pid, database) - when is_integer(lock_connection_backend_pid) or is_nil(lock_connection_backend_pid) do + defp lock_breaker_query(lock_name, lock_connection_pg_backend_pid, database) + when is_integer(lock_connection_pg_backend_pid) or is_nil(lock_connection_pg_backend_pid) do # We're using a `WITH` clause to execute all this in one statement # - See if there are existing but inactive replication slots with the given name # - Find all backends that are holding locks with the same name @@ -125,7 +125,7 @@ defmodule Electric.Postgres.LockBreakerConnection do and objsubid = 1 and database = (select oid from pg_database where datname = #{quote_string(database)}) and granted - and pid != #{lock_connection_backend_pid || 0} + and pid != #{lock_connection_pg_backend_pid || 0} ) SELECT pg_terminate_backend(pid) FROM stuck_backends; """ From ad03527bb1a5456b34251d443e120f3cb4bdcc80 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 27 Nov 2025 13:04:32 +0100 Subject: [PATCH 3/4] Fix LockBreakerConnectionTest The way the unit test was ignoring the exit reason for the LockBreakerConnection process was masking a bug in the code. Before the explicit conversion of pg_backend_pid's value to integer, the exit reason in the test was {:function_clause, [ {Electric.Postgres.LockBreakerConnection, :lock_breaker_query, ["electric_test_slot_d7f423cc8759", "16492", "NXmQhg ~ test doesn't break the lock if it's taken from expe"], [file: ~c"lib/electric/postgres/lock_breaker_connection.ex", line: 104]}, {Electric.Postgres.LockBreakerConnection, :handle_info, 2, [file: ~c"lib/electric/postgres/lock_breaker_connection.ex", line: 80]}, {Postgrex.SimpleConnection, :handle, 5, [file: ~c"lib/postgrex/simple_connection.ex", line: 438]}, {:gen_statem, :loop_state_callback, 11, [file: ~c"gen_statem.erl", line: 3750]}, {:proc_lib, :init_p_do_apply, 3, [file: ~c"proc_lib.erl", line: 333]} ]} --- .../postgres/lock_breaker_connection_test.exs | 42 +++++++++++-------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/packages/sync-service/test/electric/postgres/lock_breaker_connection_test.exs b/packages/sync-service/test/electric/postgres/lock_breaker_connection_test.exs index 72868a2047..acc2245b5f 100644 --- a/packages/sync-service/test/electric/postgres/lock_breaker_connection_test.exs +++ b/packages/sync-service/test/electric/postgres/lock_breaker_connection_test.exs @@ -38,23 +38,30 @@ defmodule Electric.Postgres.LockBreakerConnectionTest do end }) - {:ok, pid} = - start_supervised(%{ - id: :lock_breaker, - start: - {Electric.Postgres.LockBreakerConnection, :start, - [[connection_opts: ctx.db_config, stack_id: ctx.stack_id]]} - }) + {:ok, lock_breaker_pid} = + Electric.Postgres.LockBreakerConnection.start( + connection_opts: ctx.db_config, + stack_id: ctx.stack_id + ) - ref2 = Process.monitor(pid) + lock_breaker_monitor = Process.monitor(lock_breaker_pid) assert_receive :lock_acquired + # Verify there's an entry for the acquired lock in pg_locks + assert %Postgrex.Result{rows: [_pg_backend_pid], num_rows: 1} = + Postgrex.query!( + ctx.db_conn, + "SELECT pid FROM pg_locks WHERE objid::bigint = hashtext($1) AND locktype = 'advisory'", + [ctx.slot_name] + ) + # Make sure we can stop the lock connection above, so we're not specifying current pid - LockBreakerConnection.stop_backends_and_close(pid, ctx.slot_name) + LockBreakerConnection.stop_backends_and_close(lock_breaker_pid, ctx.slot_name) - assert_receive {:DOWN, ^ref2, :process, ^pid, _reason} + assert_receive {:DOWN, ^lock_breaker_monitor, :process, ^lock_breaker_pid, :shutdown} + # Verify that the pg_locks entry is gone assert %Postgrex.Result{rows: [], num_rows: 0} = Postgrex.query!( ctx.db_conn, @@ -69,7 +76,7 @@ defmodule Electric.Postgres.LockBreakerConnectionTest do "SELECT pg_create_logical_replication_slot('#{ctx.slot_name}', 'pgoutput')" ) - {:ok, pid} = + {:ok, replication_client_pid} = start_supervised( {ReplicationClient, stack_id: ctx.stack_id, @@ -84,9 +91,9 @@ defmodule Electric.Postgres.LockBreakerConnectionTest do ]} ) - ref1 = Process.monitor(pid) + replication_client_monitor = Process.monitor(replication_client_pid) - {:ok, pid} = + {:ok, lock_breaker_pid} = start_supervised(%{ id: :lock_breaker, start: @@ -94,15 +101,16 @@ defmodule Electric.Postgres.LockBreakerConnectionTest do [[connection_opts: ctx.db_config, stack_id: ctx.stack_id]]} }) - ref2 = Process.monitor(pid) + lock_breaker_monitor = Process.monitor(lock_breaker_pid) assert_receive {:"$gen_cast", {:pg_info_obtained, %{pg_backend_pid: pg_backend_pid}}} assert_receive {:"$gen_cast", :replication_client_lock_acquired} - LockBreakerConnection.stop_backends_and_close(pid, ctx.slot_name, pg_backend_pid) + LockBreakerConnection.stop_backends_and_close(lock_breaker_pid, ctx.slot_name, pg_backend_pid) + + assert_receive {:DOWN, ^lock_breaker_monitor, :process, ^lock_breaker_pid, :shutdown} + refute_received {:DOWN, ^replication_client_monitor, :process, _pid, _reason} - assert_receive {:DOWN, ^ref2, :process, ^pid, _reason} - refute_received {:DOWN, ^ref1, :process, _, _reason} stop_supervised(ReplicationClient) end end From 2be5734daa75ab80d8b8d82f52c1fd7eba2c942f Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 27 Nov 2025 13:05:48 +0100 Subject: [PATCH 4/4] Add changeset --- .changeset/soft-mirrors-search.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/soft-mirrors-search.md diff --git a/.changeset/soft-mirrors-search.md b/.changeset/soft-mirrors-search.md new file mode 100644 index 0000000000..f4006ce5af --- /dev/null +++ b/.changeset/soft-mirrors-search.md @@ -0,0 +1,5 @@ +--- +'@core/sync-service': patch +--- + +Fix a bug in LockBreakerConnection that was preventing it from terminating stuck backends holding the advisory lock.