Skip to content

Commit

Permalink
mirrored_supervisor: Restore child ID format
Browse files Browse the repository at this point in the history
[Why]
The format was changed to be compatible with Khepri paths. However, this
ID is used in in-memory states here and there as well. So changing its
format makes upgrades complicated because the code has to handle both
the old and new formats possibly used by the mirrored supervisor already
running on other nodes.

[How]
Instead, this patch converts the ID (in its old format) to something
compatible with a Khepri path only when we need to build a Khepri path.

This relies on the fact that the `Group` is a module and we can call it
to let it convert the opaque ID to a Khepri path.

While here, improve the type specs to document that a group is always a
module name and to document what a child ID can be.
  • Loading branch information
dumbbell committed Feb 13, 2024
1 parent 9d0e2ae commit 21975a5
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 28 deletions.
6 changes: 5 additions & 1 deletion deps/rabbit/src/mirrored_supervisor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,11 @@
-type startlink_err() :: {'already_started', pid()} | 'shutdown' | term().
-type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}.

-type group_name() :: any().
-type group_name() :: module().
-type child_id() :: term(). %% supervisor:child_id() is not exported.

-export_type([group_name/0,
child_id/0]).

-spec start_link(GroupName, Module, Args) -> startlink_ret() when
GroupName :: group_name(),
Expand Down
32 changes: 18 additions & 14 deletions deps/rabbit/src/rabbit_db_msup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ table_definitions() ->
%% -------------------------------------------------------------------

-spec create_or_update(Group, Overall, Delegate, ChildSpec, Id) -> Ret when
Group :: any(),
Group :: mirrored_supervisor:group_name(),
Overall :: pid(),
Delegate :: pid() | undefined,
ChildSpec :: supervisor2:child_spec(),
Id :: {any(), any()},
Id :: mirrored_supervisor:child_id(),
Ret :: start | undefined | pid().

create_or_update(Group, Overall, Delegate, ChildSpec, Id) ->
Expand Down Expand Up @@ -129,8 +129,8 @@ write_in_mnesia(Group, Overall, ChildSpec, Id) ->
ok = mnesia:write(?TABLE, S, write),
ChildSpec.

create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, {SimpleId, _} = Id) ->
Path = khepri_mirrored_supervisor_path(Group, SimpleId),
create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, Id) ->
Path = khepri_mirrored_supervisor_path(Group, Id),
S = #mirrored_sup_childspec{key = {Group, Id},
mirroring_pid = Overall,
childspec = ChildSpec},
Expand Down Expand Up @@ -169,8 +169,8 @@ create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, {SimpleId, _} =
%% -------------------------------------------------------------------

-spec delete(Group, Id) -> ok when
Group :: any(),
Id :: any().
Group :: mirrored_supervisor:group_name(),
Id :: mirrored_supervisor:child_id().

delete(Group, Id) ->
rabbit_khepri:handle_fallback(
Expand All @@ -184,16 +184,16 @@ delete_in_mnesia(Group, Id) ->
ok = mnesia:delete({?TABLE, {Group, Id}})
end).

delete_in_khepri(Group, {SimpleId, _}) ->
ok = rabbit_khepri:delete(khepri_mirrored_supervisor_path(Group, SimpleId)).
delete_in_khepri(Group, Id) ->
ok = rabbit_khepri:delete(khepri_mirrored_supervisor_path(Group, Id)).

%% -------------------------------------------------------------------
%% find_mirror().
%% -------------------------------------------------------------------

-spec find_mirror(Group, Id) -> Ret when
Group :: any(),
Id :: any(),
Group :: mirrored_supervisor:group_name(),
Id :: mirrored_supervisor:child_id(),
Ret :: {ok, pid()} | {error, not_found}.

find_mirror(Group, Id) ->
Expand All @@ -214,8 +214,8 @@ find_mirror_in_mnesia(Group, Id) ->
_ -> {error, not_found}
end.

find_mirror_in_khepri(Group, {SimpleId, _}) ->
case rabbit_khepri:get(khepri_mirrored_supervisor_path(Group, SimpleId)) of
find_mirror_in_khepri(Group, Id) ->
case rabbit_khepri:get(khepri_mirrored_supervisor_path(Group, Id)) of
{ok, #mirrored_sup_childspec{mirroring_pid = Pid}} ->
{ok, Pid};
_ ->
Expand Down Expand Up @@ -269,7 +269,7 @@ update_all_in_khepri(Overall, OldOverall) ->
%% -------------------------------------------------------------------

-spec delete_all(Group) -> ok when
Group :: any().
Group :: mirrored_supervisor:group_name().

delete_all(Group) ->
rabbit_khepri:handle_fallback(
Expand Down Expand Up @@ -324,5 +324,9 @@ clear_in_khepri() ->
khepri_mirrored_supervisor_path() ->
[?MODULE, mirrored_supervisor_childspec].

khepri_mirrored_supervisor_path(Group, Id)
when is_atom(Id) orelse is_binary(Id) ->
[?MODULE, mirrored_supervisor_childspec, Group, Id];
khepri_mirrored_supervisor_path(Group, Id) ->
[?MODULE, mirrored_supervisor_childspec, Group] ++ Id.
IdPath = Group:id_to_khepri_path(Id),
[?MODULE, mirrored_supervisor_childspec, Group] ++ IdPath.
4 changes: 2 additions & 2 deletions deps/rabbit/src/rabbit_db_msup_m2k_converter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ init_copy_to_khepri(_StoreId, _MigrationId, Tables) ->
%% @private

copy_to_khepri(mirrored_sup_childspec = Table,
#mirrored_sup_childspec{key = {Group, {SimpleId, _}} = Key} = Record,
#mirrored_sup_childspec{key = {Group, Id} = Key} = Record,
State) ->
?LOG_DEBUG(
"Mnesia->Khepri data copy: [~0p] key: ~0p",
[Table, Key],
#{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN}),
Path = rabbit_db_msup:khepri_mirrored_supervisor_path(Group, SimpleId),
Path = rabbit_db_msup:khepri_mirrored_supervisor_path(Group, Id),
rabbit_db_m2k_converter:with_correlation_id(
fun(CorrId) ->
Extra = #{async => CorrId},
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/test/mirrored_supervisor_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ childspec(Id) ->
{id(Id), {?SERVER, start_link, [Id]}, transient, 16#ffffffff, worker, [?MODULE]}.

id(Id) ->
{[Id], Id}.
Id.

pid_of(Id) ->
{received, Pid, ping} = call(Id, ping),
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/test/rabbit_db_msup_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ create_or_update1(_Config) ->
passed.

id(Id) ->
{[Id], Id}.
Id.

find_mirror(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, find_mirror1, [Config]).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

-export([start_link/0, start_child/1, adjust/1, stop_child/1]).
-export([init/1]).
-export([id_to_khepri_path/1]).

%%----------------------------------------------------------------------------

Expand Down Expand Up @@ -49,12 +50,12 @@ start_child(X) ->

adjust({clear_upstream, VHost, UpstreamName}) ->
_ = [rabbit_federation_link_sup:adjust(Pid, X, {clear_upstream, UpstreamName}) ||
{{_, #exchange{name = Name} = X}, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR),
{#exchange{name = Name} = X, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR),
Name#resource.virtual_host == VHost],
ok;
adjust(Reason) ->
_ = [rabbit_federation_link_sup:adjust(Pid, X, Reason) ||
{{_, X}, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR)],
{X, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR)],
ok.

stop_child(X) ->
Expand All @@ -77,7 +78,9 @@ init([]) ->
%% See comment in rabbit_federation_queue_link_sup_sup:id/1
id(X = #exchange{policy = Policy}) ->
X1 = rabbit_exchange:immutable(X),
{simple_id(X), X1#exchange{policy = Policy}}.
X2 = X1#exchange{policy = Policy},
X2.

simple_id(#exchange{name = #resource{virtual_host = VHost, name = Name}}) ->
id_to_khepri_path(
#exchange{name = #resource{virtual_host = VHost, name = Name}}) ->
[exchange, VHost, Name].
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

-export([start_link/0, start_child/1, adjust/1, stop_child/1]).
-export([init/1]).
-export([id_to_khepri_path/1]).

%%----------------------------------------------------------------------------

Expand Down Expand Up @@ -51,12 +52,12 @@ start_child(Q) ->

adjust({clear_upstream, VHost, UpstreamName}) ->
_ = [rabbit_federation_link_sup:adjust(Pid, Q, {clear_upstream, UpstreamName}) ||
{{_, Q}, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR),
{Q, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR),
?amqqueue_vhost_equals(Q, VHost)],
ok;
adjust(Reason) ->
_ = [rabbit_federation_link_sup:adjust(Pid, Q, Reason) ||
{{_, Q}, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR)],
{Q, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR)],
ok.

stop_child(Q) ->
Expand Down Expand Up @@ -88,8 +89,9 @@ init([]) ->
id(Q) when ?is_amqqueue(Q) ->
Policy = amqqueue:get_policy(Q),
Q1 = amqqueue:set_immutable(Q),
{simple_id(Q), amqqueue:set_policy(Q1, Policy)}.
Q2 = amqqueue:set_policy(Q1, Policy),
Q2.

simple_id(Q) when ?is_amqqueue(Q) ->
#resource{virtual_host = VHost, name = Name} = amqqueue:get_name(Q),
id_to_khepri_path(Id) when ?is_amqqueue(Id) ->
#resource{virtual_host = VHost, name = Name} = amqqueue:get_name(Id),
[queue, VHost, Name].

0 comments on commit 21975a5

Please sign in to comment.