Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions deps/rabbit/src/rabbit_msg_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
-behaviour(gen_server2).

-export([start_link/5, successfully_recovered_state/1,
gc_pid/1,
client_init/3, client_terminate/1, client_delete_and_terminate/1,
client_pre_hibernate/1, client_ref/1,
write/4, write_flow/4, read/2, read_many/2, contains/2, remove/2]).
Expand Down Expand Up @@ -401,6 +402,11 @@ start_link(VHost, Type, Dir, ClientRefs, StartupFunState) when is_atom(Type) ->
successfully_recovered_state(Server) ->
gen_server2:call(Server, successfully_recovered_state, infinity).

-spec gc_pid(server()) -> pid().

gc_pid(Server) ->
gen_server2:call(Server, gc_pid, infinity).

-spec client_init(server(), client_ref(), maybe_msg_id_fun()) -> client_msstate().

client_init(Server, Ref, MsgOnDiskFun) when is_pid(Server); is_atom(Server) ->
Expand Down Expand Up @@ -808,6 +814,7 @@ init([VHost, Type, BaseDir, ClientRefs, StartupFunState]) ->
prioritise_call(Msg, _From, _Len, _State) ->
case Msg of
successfully_recovered_state -> 7;
gc_pid -> 7;
{new_client_state, _Ref, _Pid, _MODC} -> 7;
_ -> 0
end.
Expand All @@ -828,6 +835,9 @@ prioritise_info(Msg, _Len, _State) ->
handle_call(successfully_recovered_state, _From, State) ->
reply(State #msstate.successfully_recovered, State);

handle_call(gc_pid, _From, State) ->
reply(State #msstate.gc_pid, State);

handle_call({new_client_state, CRef, CPid, MsgOnDiskFun}, _From,
State = #msstate { dir = Dir,
index_ets = IndexEts,
Expand Down Expand Up @@ -973,9 +983,11 @@ terminate(Reason, State = #msstate { index_ets = IndexEts,
_ -> {" with reason ~0p", [Reason]}
end,
?LOG_INFO("Stopping message store for directory '~ts'" ++ ExtraLog, [Dir|ExtraLogArgs]),
%% stop the gc first, otherwise it could be working and we pull
%% out the ets tables from under it.
ok = rabbit_msg_store_gc:stop(GCPid),
%% Stop the GC first, otherwise it could be working and we pull
%% out the ets tables from under it. Use a bounded timeout so
%% that if the GC is stuck (e.g. on disk I/O), we can still
%% write recovery files before the supervisor kills us.
stop_gc(GCPid, Dir),
State3 = case CurHdl of
undefined -> State;
_ -> State2 = internal_sync(State),
Expand Down Expand Up @@ -1010,6 +1022,25 @@ code_change(_OldVsn, State, _Extra) ->

format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).

stop_gc(GCPid, Dir) ->
ShutdownTimeout = rabbit_misc:get_env(
rabbit, msg_store_shutdown_timeout, 600_000),
GCTimeout = max(ShutdownTimeout - 60_000, 5_000),
case rabbit_msg_store_gc:stop(GCPid, GCTimeout) of
ok ->
ok;
{error, timeout} ->
?LOG_WARNING("Message store GC for directory '~ts' "
"did not stop within ~bms, killing it",
[Dir, GCTimeout]),
MRef = erlang:monitor(process, GCPid),
exit(GCPid, kill),
receive
{'DOWN', MRef, process, GCPid, _} -> ok
after min(GCTimeout div 2, 5_000) -> ok
end
end.

%%----------------------------------------------------------------------------
%% general helper functions
%%----------------------------------------------------------------------------
Expand Down
13 changes: 11 additions & 2 deletions deps/rabbit/src/rabbit_msg_store_gc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

-behaviour(gen_server2).

-export([start_link/1, compact/2, truncate/4, delete/2, stop/1]).
-export([start_link/1, compact/2, truncate/4, delete/2, stop/1, stop/2]).

-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, prioritise_cast/3]).
Expand Down Expand Up @@ -49,7 +49,16 @@ delete(Server, File) ->
-spec stop(pid()) -> 'ok'.

stop(Server) ->
gen_server2:call(Server, stop, infinity).
stop(Server, infinity).

-spec stop(pid(), timeout()) -> 'ok' | {error, timeout}.

stop(Server, Timeout) ->
try
gen_server2:call(Server, stop, Timeout)
catch
exit:{timeout, _} -> {error, timeout}
end.

%% TODO replace with priority messages for OTP28+
prioritise_cast({delete, _}, _Len, _State) -> 5;
Expand Down
69 changes: 69 additions & 0 deletions deps/rabbit/test/backing_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ groups() ->
msg_store,
msg_store_read_many_fanout,
msg_store_file_scan,
msg_store_gc_stop_timeout,
{backing_queue_v2, [], Common ++ V2Only}
]}
].
Expand Down Expand Up @@ -718,6 +719,74 @@ msg_store_file_scan1(Config) ->
gen_id() ->
rand:bytes(16).

%% Test that when the GC process is unresponsive during shutdown,
%% the msg_store recovers cleanly because terminate kills the GC
%% after a bounded timeout and proceeds to write recovery files.
msg_store_gc_stop_timeout(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, msg_store_gc_stop_timeout1, [Config]).

msg_store_gc_stop_timeout1(_Config) ->
GenRef = fun() -> make_ref() end,
restart_msg_store_empty(),

%% Write some messages so the store has data to recover.
Ref = rabbit_guid:gen(),
MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
MsgIds = [{GenRef(), msg_id_bin(M)} || M <- lists:seq(1, 50)],
ok = msg_store_write(MsgIds, MSCState),
ok = rabbit_msg_store:client_terminate(MSCState),

%% Get the msg_store pid and its GC pid.
StorePid = rabbit_vhost_msg_store:vhost_store_pid(
?VHOST, ?PERSISTENT_MSG_STORE),
GCPid = rabbit_msg_store:gc_pid(StorePid),
true = is_process_alive(GCPid),

%% Suspend the GC process so it cannot respond to the stop call.
ok = sys:suspend(GCPid),

%% Stop the transient store cleanly first.
rabbit_vhost_msg_store:stop(?VHOST, ?TRANSIENT_MSG_STORE),

%% Set a short shutdown timeout so the GC stop times out quickly.
%% The terminate callback reads this at runtime to compute the
%% GC timeout. The supervisor's own timeout (set at startup) is
%% still the original 600s, giving us plenty of room.
ok = application:set_env(rabbit, msg_store_shutdown_timeout, 6_000),

%% Now terminate the persistent store via the supervisor. The
%% terminate callback will call rabbit_msg_store_gc:stop with a
%% 5s timeout (max(6000 - 60000, 5000)). The GC is suspended so
%% the stop will time out, the GC will be killed, and terminate
%% will proceed to write recovery files.
{ok, VHostSup} = rabbit_vhost_sup_sup:get_vhost_sup(?VHOST),
ok = supervisor:terminate_child(VHostSup, ?PERSISTENT_MSG_STORE),

%% Restore the default timeout.
ok = application:set_env(rabbit, msg_store_shutdown_timeout, 600_000),

%% Delete the child specs so we can restart.
ok = supervisor:delete_child(VHostSup, ?PERSISTENT_MSG_STORE),

%% Restart the msg_store and check recovery state.
ok = rabbit_variable_queue:start_msg_store(
?VHOST, [Ref], {fun ([]) -> finished end, []}),

%% The store should report a clean recovery because the fix
%% kills the unresponsive GC and proceeds to write recovery files.
true = rabbit_vhost_msg_store:successfully_recovered_state(
?VHOST, ?PERSISTENT_MSG_STORE),

%% Verify all messages survived the unclean GC shutdown.
MSCState2 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
true = msg_store_contains(true, MsgIds, MSCState2),
ok = rabbit_msg_store:client_terminate(MSCState2),

%% Clean up.
restart_msg_store_empty(),
passed.

gen_msg() ->
gen_msg(1024 * 1024).

Expand Down
Loading