From bfc6f833069cba6f9100396c006a9261b518c291 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Thu, 29 Aug 2024 12:30:42 +0200 Subject: [PATCH] rabbit_db_cluster: Reset feature flags immediately after a failure to join [Why] If a node failed to join a cluster, `rabbit` was restarted then the feature flags were reset and the error returned. I.e., the error handling was in a single place at the end of the function. We need to reset feature flags after a failure because the feature flags states were copied from the remote node just before the join. However, resetting them after restarting `rabbit` was incorrect because feature flags were initialized in a way that didn't match the rest of the state. This led to crashes during the start of `rabbit`. [How] The feature flags are now reset after the failure to join but before starting `rabbit`. A new testcase was added to test this scenario. --- deps/rabbit/BUILD.bazel | 2 +- deps/rabbit/app.bzl | 2 +- deps/rabbit/src/rabbit_db_cluster.erl | 13 +- .../test/metadata_store_clustering_SUITE.erl | 165 +++++++++++++++++- 4 files changed, 173 insertions(+), 9 deletions(-) diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index 538e3c46b5a0..2eacf27d0584 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -1162,7 +1162,7 @@ rabbitmq_integration_suite( rabbitmq_integration_suite( name = "metadata_store_clustering_SUITE", size = "large", - shard_count = 18, + shard_count = 19, sharding_method = "case", ) diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index 3cb3ca4c2bc5..2d82691edef7 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -1982,7 +1982,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"): outs = ["test/metadata_store_clustering_SUITE.beam"], app_name = "rabbit", erlc_opts = "//:test_erlc_opts", - deps = ["//deps/amqp_client:erlang_app"], + deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"], ) erlang_bytecode( name = "metadata_store_migration_SUITE_beam_files", diff --git a/deps/rabbit/src/rabbit_db_cluster.erl b/deps/rabbit/src/rabbit_db_cluster.erl index b1f8cb5348ef..1df145ccb117 100644 --- a/deps/rabbit/src/rabbit_db_cluster.erl +++ b/deps/rabbit/src/rabbit_db_cluster.erl @@ -176,6 +176,15 @@ join(RemoteNode, NodeType) false -> join_using_mnesia(ClusterNodes, NodeType) end, + case Ret of + ok -> + ok; + {error, _} -> + %% We reset feature flags states again and make sure the + %% recorded states on disk are deleted. + rabbit_feature_flags:reset() + end, + %% Restart RabbitMQ afterwards, if it was running before the join. %% Likewise for the Feature flags controller and Mnesia (if we %% still need it). @@ -201,10 +210,6 @@ join(RemoteNode, NodeType) rabbit_node_monitor:notify_joined_cluster(), ok; {error, _} = Error -> - %% We reset feature flags states again and make sure the - %% recorded states on disk are deleted. - rabbit_feature_flags:reset(), - Error end; {ok, already_member} -> diff --git a/deps/rabbit/test/metadata_store_clustering_SUITE.erl b/deps/rabbit/test/metadata_store_clustering_SUITE.erl index e9bf9584d56b..b648ac0a284f 100644 --- a/deps/rabbit/test/metadata_store_clustering_SUITE.erl +++ b/deps/rabbit/test/metadata_store_clustering_SUITE.erl @@ -10,7 +10,40 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). --compile([nowarn_export_all, export_all]). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). + +-export([suite/0, + all/0, + groups/0, + init_per_suite/1, + end_per_suite/1, + init_per_group/2, + end_per_group/2, + init_per_testcase/2, + end_per_testcase/2, + + join_khepri_khepri_cluster/1, + join_mnesia_khepri_cluster/1, + join_mnesia_khepri_cluster_reverse/1, + join_khepri_mnesia_cluster/1, + join_khepri_mnesia_cluster_reverse/1, + + join_khepri_khepri_khepri_cluster/1, + join_mnesia_khepri_khepri_cluster/1, + join_mnesia_khepri_khepri_cluster_reverse/1, + join_khepri_mnesia_khepri_cluster/1, + join_khepri_mnesia_khepri_cluster_reverse/1, + join_khepri_khepri_mnesia_cluster/1, + join_khepri_khepri_mnesia_cluster_reverse/1, + join_mnesia_mnesia_khepri_cluster/1, + join_mnesia_mnesia_khepri_cluster_reverse/1, + join_mnesia_khepri_mnesia_cluster/1, + join_mnesia_khepri_mnesia_cluster_reverse/1, + join_khepri_mnesia_mnesia_cluster/1, + join_khepri_mnesia_mnesia_cluster_reverse/1, + + join_khepri_while_in_minority/1 + ]). suite() -> [{timetrap, 5 * 60_000}]. @@ -23,7 +56,8 @@ all() -> groups() -> [ {unclustered, [], [{cluster_size_2, [], cluster_size_2_tests()}, - {cluster_size_3, [], cluster_size_3_tests()}]} + {cluster_size_3, [], cluster_size_3_tests()}, + {cluster_size_5, [], cluster_size_5_tests()}]} ]. cluster_size_2_tests() -> @@ -52,6 +86,11 @@ cluster_size_3_tests() -> join_khepri_mnesia_mnesia_cluster_reverse ]. +cluster_size_5_tests() -> + [ + join_khepri_while_in_minority + ]. + %% ------------------------------------------------------------------- %% Testsuite setup/teardown. %% ------------------------------------------------------------------- @@ -78,7 +117,9 @@ init_per_group(unclustered, Config) -> init_per_group(cluster_size_2, Config) -> rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 2}]); init_per_group(cluster_size_3, Config) -> - rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 3}]). + rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 3}]); +init_per_group(cluster_size_5, Config) -> + rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 5}]). end_per_group(_, Config) -> Config. @@ -343,3 +384,121 @@ declare(Ch, Q) -> durable = true, auto_delete = false, arguments = []}). + +join_khepri_while_in_minority(Config) -> + [Node1 | ClusteredNodes] = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + [NodeToJoin | OtherNodes] = ClusteredNodes, + + %% Cluster nodes 2 to 5. + ct:pal("Cluster nodes ~p", [ClusteredNodes]), + lists:foreach( + fun(Node) -> + ?assertEqual( + ok, + rabbit_control_helper:command( + join_cluster, Node, [atom_to_list(NodeToJoin)], [])) + end, OtherNodes), + lists:foreach( + fun(Node) -> + ?awaitMatch( + ClusteredNodes, + lists:sort( + rabbit_ct_broker_helpers:rpc( + Config, Node, rabbit_nodes, list_members, [])), + 30000) + end, ClusteredNodes), + + %% Enable Khepri on all nodes. Only `Node2' is given here because it is + %% clustered with `OtherNodes'. + ct:pal("Enable `khepri_db` on nodes ~0p and ~0p", [Node1, NodeToJoin]), + Ret1 = rabbit_ct_broker_helpers:enable_feature_flag( + Config, [Node1, NodeToJoin], khepri_db), + case Ret1 of + ok -> + StoreId = rabbit_khepri:get_store_id(), + LeaderId = rabbit_ct_broker_helpers:rpc( + Config, NodeToJoin, + ra_leaderboard, lookup_leader, [StoreId]), + {StoreId, LeaderNode} = LeaderId, + + %% Stop all clustered nodes except one follower to create a + %% minority. In other words, we stop two followers, then the + %% leader. + %% + %% Using `lists:reverse/1', we keep the last running followe only + %% to see how clustering works if the first nodes in the cluster + %% are down. + Followers = ClusteredNodes -- [LeaderNode], + [FollowerToKeep | FollowersToStop] = lists:reverse(Followers), + + lists:foreach( + fun(Node) -> + ct:pal("Stop node ~0p", [Node]), + ok = rabbit_ct_broker_helpers:stop_node(Config, Node) + end, FollowersToStop ++ [LeaderNode]), + + %% Try and fail to cluster `Node1' with the others. + ct:pal("Try to cluster node ~0p with ~0p", [Node1, FollowerToKeep]), + Ret2 = rabbit_control_helper:command( + join_cluster, Node1, [atom_to_list(FollowerToKeep)], []), + ?assertMatch({error, 75, _}, Ret2), + {error, _, Msg} = Ret2, + ?assertEqual( + match, + re:run( + Msg, "Khepri cluster could be in minority", + [{capture, none}])), + + %% `Node1' should still be up and running correctly. + ct:pal("Open a connection + channel to node ~0p", [Node1]), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel( + Config, Node1), + + QName = atom_to_binary(?FUNCTION_NAME), + QArgs = [{<<"x-queue-type">>, longstr, <<"quorum">>}], + ct:pal("Declare queue ~0p", [QName]), + amqp_channel:call( + Ch, #'queue.declare'{durable = true, + queue = QName, + arguments = QArgs}), + + ct:pal("Enable publish confirms"), + amqp_channel:call(Ch, #'confirm.select'{}), + + ct:pal("Publish a message to queue ~0p", [QName]), + amqp_channel:cast( + Ch, + #'basic.publish'{routing_key = QName}, + #amqp_msg{props = #'P_basic'{delivery_mode = 2}}), + amqp_channel:wait_for_confirms(Ch), + + ct:pal("Subscribe to queue ~0p", [QName]), + CTag = <<"ctag">>, + amqp_channel:subscribe( + Ch, + #'basic.consume'{queue = QName, + consumer_tag = CTag}, + self()), + receive + #'basic.consume_ok'{consumer_tag = CTag} -> + ok + after 10000 -> + exit(consume_ok_timeout) + end, + + ct:pal("Consume a message from queue ~0p", [QName]), + receive + {#'basic.deliver'{consumer_tag = <<"ctag">>}, _} -> + ok + after 10000 -> + exit(deliver_timeout) + end, + + ct:pal("Close channel + connection"), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), + + ok; + {skip, _} = Skip -> + Skip + end.