Skip to content

Commit

Permalink
Merge pull request #12117 from rabbitmq/mergify/bp/v3.13.x/pr-12116
Browse files Browse the repository at this point in the history
rabbitmq-upgrade await_quorum_plus_one improvements (backport #12113) (backport #12116)
  • Loading branch information
michaelklishin committed Aug 25, 2024
2 parents dce5ecd + 52e80b3 commit 52b3843
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 39 deletions.
24 changes: 22 additions & 2 deletions deps/rabbit/src/rabbit_upgrade_preparation.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
%%

-define(SAMPLING_INTERVAL, 200).
-define(LOGGING_FREQUENCY, ?SAMPLING_INTERVAL * 100).

await_online_quorum_plus_one(Timeout) ->
Iterations = ceil(Timeout / ?SAMPLING_INTERVAL),
Expand All @@ -38,7 +39,11 @@ online_members(Component) ->
erlang, whereis, [Component])).

endangered_critical_components() ->
CriticalComponents = [rabbit_stream_coordinator],
CriticalComponents = [rabbit_stream_coordinator] ++
case rabbit_feature_flags:is_enabled(khepri_db) of
true -> [rabbitmq_metadata];
false -> []
end,
Nodes = rabbit_nodes:list_members(),
lists:filter(fun (Component) ->
NumAlive = length(online_members(Component)),
Expand All @@ -65,6 +70,21 @@ do_await_safe_online_quorum(IterationsLeft) ->
case EndangeredQueues =:= [] andalso endangered_critical_components() =:= [] of
true -> true;
false ->
case IterationsLeft rem ?LOGGING_FREQUENCY of
0 ->
case length(EndangeredQueues) of
0 -> ok;
N -> rabbit_log:info("Waiting for ~ts queues and streams to have quorum+1 replicas online."
"You can list them with `rabbitmq-diagnostics check_if_node_is_quorum_critical`", [N])
end,
case endangered_critical_components() of
[] -> ok;
_ -> rabbit_log:info("Waiting for the following critical components to have quorum+1 replicas online: ~p.",
[endangered_critical_components()])
end;
_ ->
ok
end,
timer:sleep(?SAMPLING_INTERVAL),
do_await_safe_online_quorum(IterationsLeft - 1)
end.
Expand All @@ -89,6 +109,6 @@ list_with_minimum_quorum_for_cli() ->
[#{
<<"readable_name">> => C,
<<"name">> => C,
<<"virtual_host">> => "-",
<<"virtual_host">> => <<"(not applicable)">>,
<<"type">> => process
} || C <- endangered_critical_components()].
65 changes: 36 additions & 29 deletions deps/rabbit/test/upgrade_preparation_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,16 @@

all() ->
[
{group, quorum_queue},
{group, stream}
{group, clustered}
].

groups() ->
[
{quorum_queue, [], [
await_quorum_plus_one_qq
]},
{stream, [], [
await_quorum_plus_one_stream
]},
{stream_coordinator, [], [
await_quorum_plus_one_stream_coordinator
{clustered, [], [
await_quorum_plus_one_qq,
await_quorum_plus_one_stream,
await_quorum_plus_one_stream_coordinator,
await_quorum_plus_one_rabbitmq_metadata
]}
].

Expand All @@ -45,31 +41,30 @@ end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).

init_per_group(Group, Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
true ->
%% in a 3.8/3.9 mixed cluster, ra will not cluster across versions,
%% so quorum plus one will not be achieved
{skip, "not mixed versions compatible"};
_ ->
Config1 = rabbit_ct_helpers:set_config(Config,
[
{rmq_nodes_count, 3},
{rmq_nodename_suffix, Group}
]),
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps())
end.
Config1 = rabbit_ct_helpers:set_config(Config,
[
{rmq_nodes_count, 3},
{rmq_nodename_suffix, Group}
]),
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).

end_per_group(_Group, Config) ->
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).


init_per_testcase(TestCase, Config) ->
rabbit_ct_helpers:testcase_started(Config, TestCase),
Config.
init_per_testcase(Testcase, Config) when Testcase == await_quorum_plus_one_rabbitmq_metadata ->
case rabbit_ct_helpers:is_mixed_versions() of
true ->
{skip, "not mixed versions compatible"};
_ ->
rabbit_ct_helpers:testcase_started(Config, Testcase)
end;
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).

end_per_testcase(TestCase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, TestCase).
Expand Down Expand Up @@ -121,12 +116,24 @@ await_quorum_plus_one_stream_coordinator(Config) ->
%% no queues/streams beyond this point

ok = rabbit_ct_broker_helpers:stop_node(Config, B),
%% this should fail because the corrdinator has only 2 running nodes
%% this should fail because the coordinator has only 2 running nodes
?assertNot(await_quorum_plus_one(Config, 0)),

ok = rabbit_ct_broker_helpers:start_node(Config, B),
?assert(await_quorum_plus_one(Config, 0)).

await_quorum_plus_one_rabbitmq_metadata(Config) ->
Nodes = [A, B, _C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, Nodes, khepri_db),
?assert(await_quorum_plus_one(Config, A)),

ok = rabbit_ct_broker_helpers:stop_node(Config, B),
%% this should fail because rabbitmq_metadata has only 2 running nodes
?assertNot(await_quorum_plus_one(Config, A)),

ok = rabbit_ct_broker_helpers:start_node(Config, B),
?assert(await_quorum_plus_one(Config, A)).

%%
%% Implementation
%%
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ defmodule RabbitMQ.CLI.Streams.Commands.AddReplicaCommand do
to_atom(node)
]) do
{:error, :classic_queue_not_supported} ->
{:error, "Cannot add replicas to a classic queue"}
{:error, "Cannot add replicas to classic queues"}

{:error, :quorum_queue_not_supported} ->
{:error, "Cannot add replicas to a quorum queue"}
{:error, "Cannot add replicas to quorum queues"}

other ->
other
Expand All @@ -37,11 +37,11 @@ defmodule RabbitMQ.CLI.Streams.Commands.AddReplicaCommand do

use RabbitMQ.CLI.DefaultOutput

def usage, do: "add_replica [--vhost <vhost>] <queue> <node>"
def usage, do: "add_replica [--vhost <vhost>] <stream> <node>"

def usage_additional do
[
["<queue>", "stream queue name"],
["<queue>", "stream name"],
["<node>", "node to add a new replica on"]
]
end
Expand All @@ -54,11 +54,11 @@ defmodule RabbitMQ.CLI.Streams.Commands.AddReplicaCommand do

def help_section, do: :replication

def description, do: "Adds a stream queue replica on the given node."
def description, do: "Adds a stream replica on the given node"

def banner([name, node], _) do
[
"Adding a replica for queue #{name} on node #{node}..."
"Adding a replica for stream #{name} on node #{node}..."
]
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,11 @@ is_quorum_critical_test(Config) ->
Body = http_get_failed(Config, "/health/checks/node-is-quorum-critical"),
?assertEqual(<<"failed">>, maps:get(<<"status">>, Body)),
?assertEqual(true, maps:is_key(<<"reason">>, Body)),
[Queue] = maps:get(<<"queues">>, Body),
?assertEqual(QName, maps:get(<<"name">>, Queue)),
Queues = maps:get(<<"queues">>, Body),
?assert(lists:any(
fun(Item) ->
QName =:= maps:get(<<"name">>, Item)
end, Queues)),

passed.

Expand Down

0 comments on commit 52b3843

Please sign in to comment.