Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rabbit_db_cluster: Reset feature flags immediately after a failure to join #12163

Merged
merged 1 commit into from
Aug 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1162,7 +1162,7 @@ rabbitmq_integration_suite(
rabbitmq_integration_suite(
name = "metadata_store_clustering_SUITE",
size = "large",
shard_count = 18,
shard_count = 19,
sharding_method = "case",
)

Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1982,7 +1982,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
outs = ["test/metadata_store_clustering_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
)
erlang_bytecode(
name = "metadata_store_migration_SUITE_beam_files",
Expand Down
13 changes: 9 additions & 4 deletions deps/rabbit/src/rabbit_db_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,15 @@ join(RemoteNode, NodeType)
false -> join_using_mnesia(ClusterNodes, NodeType)
end,

case Ret of
ok ->
ok;
{error, _} ->
%% We reset feature flags states again and make sure the
%% recorded states on disk are deleted.
rabbit_feature_flags:reset()
end,

%% Restart RabbitMQ afterwards, if it was running before the join.
%% Likewise for the Feature flags controller and Mnesia (if we
%% still need it).
Expand All @@ -201,10 +210,6 @@ join(RemoteNode, NodeType)
rabbit_node_monitor:notify_joined_cluster(),
ok;
{error, _} = Error ->
%% We reset feature flags states again and make sure the
%% recorded states on disk are deleted.
rabbit_feature_flags:reset(),

Error
end;
{ok, already_member} ->
Expand Down
165 changes: 162 additions & 3 deletions deps/rabbit/test/metadata_store_clustering_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,40 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-compile([nowarn_export_all, export_all]).
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").

-export([suite/0,
all/0,
groups/0,
init_per_suite/1,
end_per_suite/1,
init_per_group/2,
end_per_group/2,
init_per_testcase/2,
end_per_testcase/2,

join_khepri_khepri_cluster/1,
join_mnesia_khepri_cluster/1,
join_mnesia_khepri_cluster_reverse/1,
join_khepri_mnesia_cluster/1,
join_khepri_mnesia_cluster_reverse/1,

join_khepri_khepri_khepri_cluster/1,
join_mnesia_khepri_khepri_cluster/1,
join_mnesia_khepri_khepri_cluster_reverse/1,
join_khepri_mnesia_khepri_cluster/1,
join_khepri_mnesia_khepri_cluster_reverse/1,
join_khepri_khepri_mnesia_cluster/1,
join_khepri_khepri_mnesia_cluster_reverse/1,
join_mnesia_mnesia_khepri_cluster/1,
join_mnesia_mnesia_khepri_cluster_reverse/1,
join_mnesia_khepri_mnesia_cluster/1,
join_mnesia_khepri_mnesia_cluster_reverse/1,
join_khepri_mnesia_mnesia_cluster/1,
join_khepri_mnesia_mnesia_cluster_reverse/1,

join_khepri_while_in_minority/1
]).

suite() ->
[{timetrap, 5 * 60_000}].
Expand All @@ -23,7 +56,8 @@ all() ->
groups() ->
[
{unclustered, [], [{cluster_size_2, [], cluster_size_2_tests()},
{cluster_size_3, [], cluster_size_3_tests()}]}
{cluster_size_3, [], cluster_size_3_tests()},
{cluster_size_5, [], cluster_size_5_tests()}]}
].

cluster_size_2_tests() ->
Expand Down Expand Up @@ -52,6 +86,11 @@ cluster_size_3_tests() ->
join_khepri_mnesia_mnesia_cluster_reverse
].

cluster_size_5_tests() ->
[
join_khepri_while_in_minority
].

%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------
Expand All @@ -78,7 +117,9 @@ init_per_group(unclustered, Config) ->
init_per_group(cluster_size_2, Config) ->
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 2}]);
init_per_group(cluster_size_3, Config) ->
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 3}]).
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 3}]);
init_per_group(cluster_size_5, Config) ->
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 5}]).

end_per_group(_, Config) ->
Config.
Expand Down Expand Up @@ -343,3 +384,121 @@ declare(Ch, Q) ->
durable = true,
auto_delete = false,
arguments = []}).

join_khepri_while_in_minority(Config) ->
[Node1 | ClusteredNodes] = rabbit_ct_broker_helpers:get_node_configs(
Config, nodename),
[NodeToJoin | OtherNodes] = ClusteredNodes,

%% Cluster nodes 2 to 5.
ct:pal("Cluster nodes ~p", [ClusteredNodes]),
lists:foreach(
fun(Node) ->
?assertEqual(
ok,
rabbit_control_helper:command(
join_cluster, Node, [atom_to_list(NodeToJoin)], []))
end, OtherNodes),
lists:foreach(
fun(Node) ->
?awaitMatch(
ClusteredNodes,
lists:sort(
rabbit_ct_broker_helpers:rpc(
Config, Node, rabbit_nodes, list_members, [])),
30000)
end, ClusteredNodes),

%% Enable Khepri on all nodes. Only `Node2' is given here because it is
%% clustered with `OtherNodes'.
ct:pal("Enable `khepri_db` on nodes ~0p and ~0p", [Node1, NodeToJoin]),
Ret1 = rabbit_ct_broker_helpers:enable_feature_flag(
Config, [Node1, NodeToJoin], khepri_db),
case Ret1 of
ok ->
StoreId = rabbit_khepri:get_store_id(),
LeaderId = rabbit_ct_broker_helpers:rpc(
Config, NodeToJoin,
ra_leaderboard, lookup_leader, [StoreId]),
{StoreId, LeaderNode} = LeaderId,

%% Stop all clustered nodes except one follower to create a
%% minority. In other words, we stop two followers, then the
%% leader.
%%
%% Using `lists:reverse/1', we keep the last running followe only
%% to see how clustering works if the first nodes in the cluster
%% are down.
Followers = ClusteredNodes -- [LeaderNode],
[FollowerToKeep | FollowersToStop] = lists:reverse(Followers),

lists:foreach(
fun(Node) ->
ct:pal("Stop node ~0p", [Node]),
ok = rabbit_ct_broker_helpers:stop_node(Config, Node)
end, FollowersToStop ++ [LeaderNode]),

%% Try and fail to cluster `Node1' with the others.
ct:pal("Try to cluster node ~0p with ~0p", [Node1, FollowerToKeep]),
Ret2 = rabbit_control_helper:command(
join_cluster, Node1, [atom_to_list(FollowerToKeep)], []),
?assertMatch({error, 75, _}, Ret2),
{error, _, Msg} = Ret2,
?assertEqual(
match,
re:run(
Msg, "Khepri cluster could be in minority",
[{capture, none}])),

%% `Node1' should still be up and running correctly.
ct:pal("Open a connection + channel to node ~0p", [Node1]),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(
Config, Node1),

QName = atom_to_binary(?FUNCTION_NAME),
QArgs = [{<<"x-queue-type">>, longstr, <<"quorum">>}],
ct:pal("Declare queue ~0p", [QName]),
amqp_channel:call(
Ch, #'queue.declare'{durable = true,
queue = QName,
arguments = QArgs}),

ct:pal("Enable publish confirms"),
amqp_channel:call(Ch, #'confirm.select'{}),

ct:pal("Publish a message to queue ~0p", [QName]),
amqp_channel:cast(
Ch,
#'basic.publish'{routing_key = QName},
#amqp_msg{props = #'P_basic'{delivery_mode = 2}}),
amqp_channel:wait_for_confirms(Ch),

ct:pal("Subscribe to queue ~0p", [QName]),
CTag = <<"ctag">>,
amqp_channel:subscribe(
Ch,
#'basic.consume'{queue = QName,
consumer_tag = CTag},
self()),
receive
#'basic.consume_ok'{consumer_tag = CTag} ->
ok
after 10000 ->
exit(consume_ok_timeout)
end,

ct:pal("Consume a message from queue ~0p", [QName]),
receive
{#'basic.deliver'{consumer_tag = <<"ctag">>}, _} ->
ok
after 10000 ->
exit(deliver_timeout)
end,

ct:pal("Close channel + connection"),
rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),

ok;
{skip, _} = Skip ->
Skip
end.
Loading