Skip to content

Commit

Permalink
[RTC-275] Allow for subscribing to notifications from a single room (#45
Browse files Browse the repository at this point in the history
)

* Add handling of new server messages

* Proper handling of `RoomStateRequest` server message

* Add tests for new server messages

* Add more tests

* Apply requested changes
  • Loading branch information
LVala committed Jun 28, 2023
1 parent 0538894 commit 5142846
Show file tree
Hide file tree
Showing 4 changed files with 280 additions and 18 deletions.
80 changes: 77 additions & 3 deletions lib/jellyfish_web/server_socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ defmodule JellyfishWeb.ServerSocket do
@behaviour Phoenix.Socket.Transport
require Logger

alias Jellyfish.RoomService

alias Jellyfish.ServerMessage

alias Jellyfish.ServerMessage.{
Expand All @@ -12,9 +14,15 @@ defmodule JellyfishWeb.ServerSocket do
PeerConnected,
PeerCrashed,
PeerDisconnected,
RoomCrashed
RoomCrashed,
RoomNotFound,
RoomsState,
RoomState,
RoomStateRequest
}

alias Jellyfish.ServerMessage.RoomState.{Component, Config, Peer}

@heartbeat_interval 30_000

@impl true
Expand Down Expand Up @@ -73,10 +81,32 @@ defmodule JellyfishWeb.ServerSocket do
end
end

def handle_in({encoded_message, [opcode: :binary]}, state) do
with %ServerMessage{content: request} <- ServerMessage.decode(encoded_message),
{:room_state_request, %RoomStateRequest{content: {_variant, option}}} <- request do
room_state = get_room_state(option)

reply =
%ServerMessage{content: room_state}
|> ServerMessage.encode()

{:reply, :ok, {:binary, reply}, state}
else
other ->
Logger.warn("""
Received unexpected message on server WS.
Closing the connection.
Message: #{inspect(other)}
""")

{:stop, :closed, {1003, "operation not allowed"}, state}
end
end

def handle_in({encoded_message, [opcode: _type]}, state) do
Logger.warn("""
Received message on server WS.
Server WS doesn't expect to receive any messages.
Received unexpected message on server WS.
Closing the connection.
Message: #{inspect(encoded_message)}
Expand Down Expand Up @@ -131,4 +161,48 @@ defmodule JellyfishWeb.ServerSocket do
Logger.info("Server WebSocket stopped #{inspect(reason)}")
:ok
end

defp get_room_state(:ALL) do
rooms =
RoomService.list_rooms()
|> Enum.map(&to_room_state_message/1)

{:rooms_state, %RoomsState{rooms: rooms}}
end

defp get_room_state(id) do
case RoomService.get_room(id) do
{:ok, room} ->
room = to_room_state_message(room)
{:room_state, room}

{:error, :room_not_found} ->
{:room_not_found, %RoomNotFound{id: id}}
end
end

defp to_room_state_message(room) do
components =
room.components
|> Map.values()
|> Enum.map(&%Component{id: &1.id, type: to_proto_type(&1.type)})

peers =
room.peers
|> Map.values()
|> Enum.map(
&%Peer{id: &1.id, type: to_proto_type(&1.type), status: to_proto_status(&1.status)}
)

config = struct!(Config, room.config)

%RoomState{id: room.id, config: config, peers: peers, components: components}
end

defp to_proto_type(Jellyfish.Component.HLS), do: :HLS
defp to_proto_type(Jellyfish.Component.RTSP), do: :HLS
defp to_proto_type(Jellyfish.Peer.WebRTC), do: :WEBRTC

defp to_proto_status(:disconnected), do: :DISCONNECTED
defp to_proto_status(:connected), do: :CONNECTED
end
116 changes: 116 additions & 0 deletions lib/protos/jellyfish/server_notifications.pb.ex
Original file line number Diff line number Diff line change
@@ -1,3 +1,37 @@
defmodule Jellyfish.ServerMessage.RoomStateRequest.Option do
@moduledoc false

use Protobuf, enum: true, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :ALL, 0
end

defmodule Jellyfish.ServerMessage.RoomState.Peer.Type do
@moduledoc false

use Protobuf, enum: true, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :WEBRTC, 0
end

defmodule Jellyfish.ServerMessage.RoomState.Peer.Status do
@moduledoc false

use Protobuf, enum: true, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :CONNECTED, 0
field :DISCONNECTED, 1
end

defmodule Jellyfish.ServerMessage.RoomState.Component.Type do
@moduledoc false

use Protobuf, enum: true, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :HLS, 0
field :RTSP, 1
end

defmodule Jellyfish.ServerMessage.RoomCrashed do
@moduledoc false

Expand Down Expand Up @@ -56,6 +90,71 @@ defmodule Jellyfish.ServerMessage.AuthRequest do
field :token, 1, type: :string
end

defmodule Jellyfish.ServerMessage.RoomStateRequest do
@moduledoc false

use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

oneof :content, 0

field :id, 1, type: :string, oneof: 0
field :option, 2, type: Jellyfish.ServerMessage.RoomStateRequest.Option, enum: true, oneof: 0
end

defmodule Jellyfish.ServerMessage.RoomState.Config do
@moduledoc false

use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :max_peers, 1, type: :uint32, json_name: "maxPeers"
end

defmodule Jellyfish.ServerMessage.RoomState.Peer do
@moduledoc false

use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :id, 1, type: :string
field :type, 2, type: Jellyfish.ServerMessage.RoomState.Peer.Type, enum: true
field :status, 3, type: Jellyfish.ServerMessage.RoomState.Peer.Status, enum: true
end

defmodule Jellyfish.ServerMessage.RoomState.Component do
@moduledoc false

use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :id, 1, type: :string
field :type, 2, type: Jellyfish.ServerMessage.RoomState.Component.Type, enum: true
end

defmodule Jellyfish.ServerMessage.RoomState do
@moduledoc false

use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :id, 1, type: :string
field :config, 2, type: Jellyfish.ServerMessage.RoomState.Config
field :peers, 3, repeated: true, type: Jellyfish.ServerMessage.RoomState.Peer
field :components, 4, repeated: true, type: Jellyfish.ServerMessage.RoomState.Component
end

defmodule Jellyfish.ServerMessage.RoomsState do
@moduledoc false

use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :rooms, 1, repeated: true, type: Jellyfish.ServerMessage.RoomState
end

defmodule Jellyfish.ServerMessage.RoomNotFound do
@moduledoc false

use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :id, 1, type: :string
end

defmodule Jellyfish.ServerMessage do
@moduledoc false

Expand Down Expand Up @@ -94,4 +193,21 @@ defmodule Jellyfish.ServerMessage do
type: Jellyfish.ServerMessage.AuthRequest,
json_name: "authRequest",
oneof: 0

field :room_state_request, 8,
type: Jellyfish.ServerMessage.RoomStateRequest,
json_name: "roomStateRequest",
oneof: 0

field :room_state, 9, type: Jellyfish.ServerMessage.RoomState, json_name: "roomState", oneof: 0

field :rooms_state, 10,
type: Jellyfish.ServerMessage.RoomsState,
json_name: "roomsState",
oneof: 0

field :room_not_found, 11,
type: Jellyfish.ServerMessage.RoomNotFound,
json_name: "roomNotFound",
oneof: 0
end
2 changes: 1 addition & 1 deletion protos
100 changes: 86 additions & 14 deletions test/jellyfish_web/integration/server_socket_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do
AuthRequest,
PeerConnected,
PeerDisconnected,
RoomCrashed
RoomCrashed,
RoomNotFound,
RoomsState,
RoomState,
RoomStateRequest
}

alias JellyfishWeb.{PeerSocket, ServerSocket, WS}
Expand All @@ -21,6 +25,8 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do
@path "ws://127.0.0.1:#{@port}/socket/server/websocket"
@auth_response %Authenticated{}

@max_peers 1

Application.put_env(
:jellyfish,
Endpoint,
Expand Down Expand Up @@ -71,7 +77,7 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do
create_and_authenticate()
end

test "closes on receiving a message from a client" do
test "closes on receiving an invalid message from a client" do
ws = create_and_authenticate()

:ok =
Expand All @@ -83,6 +89,68 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do
assert_receive {:disconnected, {:remote, 1003, "operation not allowed"}}, 1000
end

test "responds with room state", %{conn: conn} do
server_api_token = Application.fetch_env!(:jellyfish, :server_api_token)
ws = create_and_authenticate()
{room_id, peer_id, _token, conn} = add_room_and_peer(conn, server_api_token)

msg = %ServerMessage{
content: {:room_state_request, %RoomStateRequest{content: {:id, room_id}}}
}

:ok = WS.send_binary_frame(ws, ServerMessage.encode(msg))

assert_receive %RoomState{
id: ^room_id,
config: %{max_peers: @max_peers},
components: [],
peers: [%RoomState.Peer{id: ^peer_id, type: :WEBRTC, status: :DISCONNECTED}]
}

conn = delete(conn, ~p"/room/#{room_id}/")
assert response(conn, :no_content)
end

test "responds with all of the room states", %{conn: conn} do
server_api_token = Application.fetch_env!(:jellyfish, :server_api_token)
ws = create_and_authenticate()
{room_id, peer_id, _token, conn} = add_room_and_peer(conn, server_api_token)

msg = %ServerMessage{
content: {:room_state_request, %RoomStateRequest{content: {:option, :ALL}}}
}

:ok = WS.send_binary_frame(ws, ServerMessage.encode(msg))

assert_receive %RoomsState{
rooms: [
%RoomState{
id: ^room_id,
config: %{max_peers: @max_peers},
components: [],
peers: [%RoomState.Peer{id: ^peer_id, type: :WEBRTC, status: :DISCONNECTED}]
}
]
}

conn = delete(conn, ~p"/room/#{room_id}/")
assert response(conn, :no_content)
end

test "responds with room_not_found" do
ws = create_and_authenticate()

fake_room_id = "fake_room_id"

msg = %ServerMessage{
content: {:room_state_request, %RoomStateRequest{content: {:id, fake_room_id}}}
}

:ok = WS.send_binary_frame(ws, ServerMessage.encode(msg))

assert_receive %RoomNotFound{id: ^fake_room_id}
end

test "sends a message when room crashes", %{conn: conn} do
server_api_token = Application.fetch_env!(:jellyfish, :server_api_token)
create_and_authenticate()
Expand All @@ -102,26 +170,16 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do
server_api_token = Application.fetch_env!(:jellyfish, :server_api_token)
create_and_authenticate()

conn = put_req_header(conn, "authorization", "Bearer " <> server_api_token)

conn = post(conn, ~p"/room", maxPeers: 1)
assert %{"id" => room_id} = json_response(conn, :created)["data"]

conn = post(conn, ~p"/room/#{room_id}/peer", type: "webrtc")

assert %{"token" => peer_token, "peer" => %{"id" => peer_id}} =
json_response(conn, :created)["data"]
{room_id, peer_id, peer_token, conn} = add_room_and_peer(conn, server_api_token)

{:ok, peer_ws} = WS.start_link("ws://127.0.0.1:#{@port}/socket/peer/websocket", :peer)

auth_request = peer_auth_request(peer_token)

:ok = WS.send_binary_frame(peer_ws, auth_request)

assert_receive %PeerConnected{peer_id: ^peer_id, room_id: ^room_id}

conn = delete(conn, ~p"/room/#{room_id}/")
response(conn, :no_content)
assert response(conn, :no_content)

assert_receive %PeerDisconnected{peer_id: ^peer_id, room_id: ^room_id}
end
Expand All @@ -136,6 +194,20 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do
ws
end

defp add_room_and_peer(conn, server_api_token) do
conn = put_req_header(conn, "authorization", "Bearer " <> server_api_token)

conn = post(conn, ~p"/room", maxPeers: @max_peers)
assert %{"id" => room_id} = json_response(conn, :created)["data"]

conn = post(conn, ~p"/room/#{room_id}/peer", type: "webrtc")

assert %{"token" => peer_token, "peer" => %{"id" => peer_id}} =
json_response(conn, :created)["data"]

{room_id, peer_id, peer_token, conn}
end

defp auth_request(token) do
ServerMessage.encode(%ServerMessage{content: {:auth_request, %AuthRequest{token: token}}})
end
Expand Down

0 comments on commit 5142846

Please sign in to comment.