Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/soft-mirrors-search.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@core/sync-service': patch
---

Fix a bug in LockBreakerConnection that was preventing it from terminating stuck backends holding the advisory lock.
13 changes: 7 additions & 6 deletions packages/sync-service/lib/electric/connection/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -543,16 +543,17 @@ defmodule Electric.Connection.Manager do

def handle_continue(
:check_lock_not_abandoned,
%State{replication_pg_backend_pid: pid} = state
%State{replication_pg_backend_pid: pg_backend_pid} = state
) do
if state.current_step == {:start_replication_client, :acquiring_lock} and not is_nil(pid) do
if state.current_step == {:start_replication_client, :acquiring_lock} and
not is_nil(pg_backend_pid) do
with {:ok, conn_opts, state} <-
validate_connection(pooled_connection_opts(state), :pool, state),
{:ok, breaker_pid} <-
LockBreakerConnection.start(connection_opts: conn_opts, stack_id: state.stack_id) do
lock_name = Keyword.fetch!(state.replication_opts, :slot_name)

LockBreakerConnection.stop_backends_and_close(breaker_pid, lock_name, pid)
LockBreakerConnection.stop_backends_and_close(breaker_pid, lock_name, pg_backend_pid)
else
{:error, reason} ->
# no-op, this is a one-shot attempt at fixing a lock
Expand Down Expand Up @@ -1149,7 +1150,7 @@ defmodule Electric.Connection.Manager do
end

defp kill_replication_backend(%State{replication_pg_backend_pid: nil} = state) do
Logger.debug("Skipping killing replication backend, pid not known")
Logger.debug("Skipping killing replication backend, pg_backend_pid not known")
state
end

Expand All @@ -1158,9 +1159,9 @@ defmodule Electric.Connection.Manager do
state
end

defp kill_replication_backend(%State{replication_pg_backend_pid: backend_pid} = state) do
defp kill_replication_backend(%State{replication_pg_backend_pid: pg_backend_pid} = state) do
pool = pool_name(state.stack_id, :admin)
execute_and_log_errors(pool, "SELECT pg_terminate_backend(#{backend_pid});")
execute_and_log_errors(pool, "SELECT pg_terminate_backend(#{pg_backend_pid});")
%{state | replication_pg_backend_pid: nil}
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ defmodule Electric.Postgres.LockBreakerConnection do
end
end

def stop_backends_and_close(server, lock_name, lock_connection_backend_pid \\ nil) do
send(server, {:stop_backends_and_close, lock_name, lock_connection_backend_pid})
def stop_backends_and_close(server, lock_name, lock_connection_pg_backend_pid \\ nil) do
send(server, {:stop_backends_and_close, lock_name, lock_connection_pg_backend_pid})
end

@impl true
Expand All @@ -74,10 +74,10 @@ defmodule Electric.Postgres.LockBreakerConnection do

@impl true
def handle_info(
{:stop_backends_and_close, lock_name, lock_connection_backend_pid},
{:stop_backends_and_close, lock_name, lock_connection_pg_backend_pid},
state
) do
{:query, lock_breaker_query(lock_name, lock_connection_backend_pid, state.database),
{:query, lock_breaker_query(lock_name, lock_connection_pg_backend_pid, state.database),
Map.put(state, :lock_name, lock_name)}
end

Expand All @@ -101,8 +101,8 @@ defmodule Electric.Postgres.LockBreakerConnection do
@impl true
def notify(_, _, _), do: :ok

defp lock_breaker_query(lock_name, lock_connection_backend_pid, database)
when is_integer(lock_connection_backend_pid) or is_nil(lock_connection_backend_pid) do
defp lock_breaker_query(lock_name, lock_connection_pg_backend_pid, database)
when is_integer(lock_connection_pg_backend_pid) or is_nil(lock_connection_pg_backend_pid) do
# We're using a `WITH` clause to execute all this in one statement
# - See if there are existing but inactive replication slots with the given name
# - Find all backends that are holding locks with the same name
Expand All @@ -125,7 +125,7 @@ defmodule Electric.Postgres.LockBreakerConnection do
and objsubid = 1
and database = (select oid from pg_database where datname = #{quote_string(database)})
and granted
and pid != #{lock_connection_backend_pid || 0}
and pid != #{lock_connection_pg_backend_pid || 0}
)
SELECT pg_terminate_backend(pid) FROM stuck_backends;
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,11 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do
end

defp pg_info_result([%Postgrex.Result{} = result], state) do
%{rows: [[version_str, backend_pid]]} = result
%{rows: [[version_str, backend_pid_str]]} = result
version_num = String.to_integer(version_str)
backend_pid_num = String.to_integer(backend_pid_str)

{%{server_version_num: version_num, pg_backend_pid: backend_pid},
{%{server_version_num: version_num, pg_backend_pid: backend_pid_num},
%{state | pg_version: version_num}}
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,30 @@ defmodule Electric.Postgres.LockBreakerConnectionTest do
end
})

{:ok, pid} =
start_supervised(%{
id: :lock_breaker,
start:
{Electric.Postgres.LockBreakerConnection, :start,
[[connection_opts: ctx.db_config, stack_id: ctx.stack_id]]}
})
{:ok, lock_breaker_pid} =
Electric.Postgres.LockBreakerConnection.start(
connection_opts: ctx.db_config,
stack_id: ctx.stack_id
)

ref2 = Process.monitor(pid)
lock_breaker_monitor = Process.monitor(lock_breaker_pid)

assert_receive :lock_acquired

# Verify there's an entry for the acquired lock in pg_locks
assert %Postgrex.Result{rows: [_pg_backend_pid], num_rows: 1} =
Postgrex.query!(
ctx.db_conn,
"SELECT pid FROM pg_locks WHERE objid::bigint = hashtext($1) AND locktype = 'advisory'",
[ctx.slot_name]
)

# Make sure we can stop the lock connection above, so we're not specifying current pid
LockBreakerConnection.stop_backends_and_close(pid, ctx.slot_name)
LockBreakerConnection.stop_backends_and_close(lock_breaker_pid, ctx.slot_name)

assert_receive {:DOWN, ^ref2, :process, ^pid, _reason}
assert_receive {:DOWN, ^lock_breaker_monitor, :process, ^lock_breaker_pid, :shutdown}

# Verify that the pg_locks entry is gone
assert %Postgrex.Result{rows: [], num_rows: 0} =
Postgrex.query!(
ctx.db_conn,
Expand All @@ -69,7 +76,7 @@ defmodule Electric.Postgres.LockBreakerConnectionTest do
"SELECT pg_create_logical_replication_slot('#{ctx.slot_name}', 'pgoutput')"
)

{:ok, pid} =
{:ok, replication_client_pid} =
start_supervised(
{ReplicationClient,
stack_id: ctx.stack_id,
Expand All @@ -84,25 +91,26 @@ defmodule Electric.Postgres.LockBreakerConnectionTest do
]}
)

ref1 = Process.monitor(pid)
replication_client_monitor = Process.monitor(replication_client_pid)

{:ok, pid} =
{:ok, lock_breaker_pid} =
start_supervised(%{
id: :lock_breaker,
start:
{Electric.Postgres.LockBreakerConnection, :start,
[[connection_opts: ctx.db_config, stack_id: ctx.stack_id]]}
})

ref2 = Process.monitor(pid)
lock_breaker_monitor = Process.monitor(lock_breaker_pid)

assert_receive {:"$gen_cast", {:pg_info_obtained, %{pg_backend_pid: pg_backend_pid}}}
assert_receive {:"$gen_cast", :replication_client_lock_acquired}

LockBreakerConnection.stop_backends_and_close(pid, ctx.slot_name, pg_backend_pid)
LockBreakerConnection.stop_backends_and_close(lock_breaker_pid, ctx.slot_name, pg_backend_pid)

assert_receive {:DOWN, ^lock_breaker_monitor, :process, ^lock_breaker_pid, :shutdown}
refute_received {:DOWN, ^replication_client_monitor, :process, _pid, _reason}

assert_receive {:DOWN, ^ref2, :process, ^pid, _reason}
refute_received {:DOWN, ^ref1, :process, _, _reason}
stop_supervised(ReplicationClient)
end
end