Skip to content

Commit

Permalink
rabbitmq_shovel: Restore original mirrored_supervisor child ID handling
Browse files Browse the repository at this point in the history
[Why]
We don't need to change the mirrored_supervisor child ID format for
Khepri. Unfortunately, the temporary experimental was erroneously
backported to 3.11.x and 3.12.x releases...

This broke the federation and shovel plugins during upgrades.

[How]
Here, we restore the original behavior, meaning that the ID stays as it
was and we just modify it when we need a Khepri path.

The code is updated to know about the temporary experimental format as
well because it will be used by the latest 3.11.x and 3.12.x releases.
  • Loading branch information
dumbbell committed Feb 13, 2024
1 parent 21975a5 commit 29f4858
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 53 deletions.
102 changes: 51 additions & 51 deletions deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
-behaviour(mirrored_supervisor).

-export([start_link/0, init/1, adjust/2, stop_child/1, cleanup_specs/0]).
-export([id_to_khepri_path/1]).

-import(rabbit_misc, [pget/2]).
-import(rabbit_data_coercion, [to_map/1, to_list/1]).
Expand Down Expand Up @@ -61,10 +62,9 @@ obfuscated_uris_parameters(Def) when is_list(Def) ->

child_exists(Name) ->
Id = id(Name),
%% older format, pre 3.13.0 and 3.12.8. See rabbitmq/rabbitmq-server#9894.
OldId = old_id(Name),
TmpExpId = temp_experimental_id(Name),
lists:any(fun ({ChildId, _, _, _}) ->
ChildId =:= Id orelse ChildId =:= OldId
ChildId =:= Id orelse ChildId =:= TmpExpId
end,
mirrored_supervisor:which_children(?SUPERVISOR)).

Expand All @@ -74,20 +74,14 @@ stop_child({VHost, ShovelName} = Name) ->
case get({shovel_worker_autodelete, Name}) of
true -> ok; %% [1]
_ ->
case stop_and_delete_child(id(Name)) of
Id = id(Name),
case stop_and_delete_child(Id) of
ok ->
ok;
{error, not_found} ->
case rabbit_khepri:is_enabled() of
true ->
%% Old id format is not supported by and cannot exist in Khepri
ok;
false ->
%% try older format, pre 3.13.0 and 3.12.8.
%% See rabbitmq/rabbitmq-server#9894.
_ = stop_and_delete_child(old_id(Name)),
ok
end
TmpExpId = temp_experimental_id(Name),
_ = stop_and_delete_child(TmpExpId),
ok
end,
rabbit_shovel_status:remove(Name)
end,
Expand All @@ -112,48 +106,54 @@ stop_and_delete_child(Id) ->

cleanup_specs() ->
Children = mirrored_supervisor:which_children(?SUPERVISOR),

ChildIdSet = sets:from_list([element(1, S) || S <- Children]),
ParamsSet = params_to_child_ids(rabbit_khepri:is_enabled()),
F = fun(ChildId, ok) ->
try
%% The supervisor operation is very unlikely to fail, it's the schema
%% data stores that can make a fuss about a non-existent or non-standard value passed in.
%% For example, an old style Shovel name is an invalid Khepri query path element. MK.
_ = mirrored_supervisor:delete_child(?SUPERVISOR, ChildId)
catch _:_:_Stacktrace ->
ok
end,
ok
end,
ParamsSet = sets:from_list(
[id({proplists:get_value(vhost, S),
proplists:get_value(name, S)})
|| S <- rabbit_runtime_parameters:list_component(
<<"shovel">>)]),
%% Delete any supervisor children that do not have their respective runtime parameters in the database.
SetToCleanUp = sets:subtract(ChildIdSet, ParamsSet),
ok = sets:fold(F, ok, SetToCleanUp).

params_to_child_ids(_KhepriEnabled = true) ->
%% Old id format simply cannot exist in Khepri because having Khepri enabled
%% means a cluster-wide move to 3.13+, so we can conditionally compute what specs we care about. MK.
sets:from_list([id({proplists:get_value(vhost, S), proplists:get_value(name, S)})
|| S <- rabbit_runtime_parameters:list_component(<<"shovel">>)]);
params_to_child_ids(_KhepriEnabled = false) ->
sets:from_list(
lists:flatmap(
fun(S) ->
Name = {proplists:get_value(vhost, S), proplists:get_value(name, S)},
%% Supervisor Id format was different pre 3.13.0 and 3.12.8.
%% Try both formats to cover the transitionary mixed version cluster period.
[id(Name), old_id(Name)]
end,
rabbit_runtime_parameters:list_component(<<"shovel">>))).
lists:foreach(
fun
({{VHost, ShovelName} = ChildId, _, _, _})
when is_binary(VHost) andalso is_binary(ShovelName) ->
case sets:is_element(ChildId, ParamsSet) of
false ->
_ = mirrored_supervisor:delete_child(
?SUPERVISOR, ChildId);
true ->
ok
end;
({{List, {VHost, ShovelName} = Id} = ChildId, _, _, _})
when is_list(List) andalso
is_binary(VHost) andalso is_binary(ShovelName) ->
case sets:is_element(Id, ParamsSet) of
false ->
_ = mirrored_supervisor:delete_child(
?SUPERVISOR, ChildId);
true ->
ok
end
end, Children).

%%----------------------------------------------------------------------------

init([]) ->
{ok, {{one_for_one, 3, 10}, []}}.

id({V, S} = Name) ->
{[V, S], Name}.

%% older format, pre 3.13.0 and 3.12.8. See rabbitmq/rabbitmq-server#9894
old_id({_V, _S} = Name) ->
id({VHost, ShovelName} = Name)
when is_binary(VHost) andalso is_binary(ShovelName) ->
Name.

id_to_khepri_path({VHost, ShovelName})
when is_binary(VHost) andalso is_binary(ShovelName) ->
[VHost, ShovelName];
id_to_khepri_path({List, {VHost, ShovelName}})
when is_list(List) andalso is_binary(VHost) andalso is_binary(ShovelName) ->
[VHost, ShovelName].

%% Temporary experimental format, erroneously backported to some 3.11.x and
%% 3.12.x releases in rabbitmq/rabbitmq-server#9796.
%%
%% See rabbitmq/rabbitmq-server#10306.
temp_experimental_id({V, S} = Name) ->
{[V, S], Name}.
8 changes: 6 additions & 2 deletions deps/rabbitmq_shovel/src/rabbit_shovel_worker_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
-behaviour(mirrored_supervisor).

-export([start_link/2, init/1]).
-export([id_to_khepri_path/1]).

-include("rabbit_shovel.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
Expand All @@ -30,5 +31,8 @@ init([Name, Config]) ->
[rabbit_shovel_worker]}],
{ok, {{one_for_one, 1, ?MAX_WAIT}, ChildSpecs}}.

id(Name) ->
{[Name], Name}.
id(Name) when is_atom(Name) ->
Name.

id_to_khepri_path(Name) when is_atom(Name) ->
[Name].

0 comments on commit 29f4858

Please sign in to comment.