Skip to content

Commit

Permalink
Add enforcing video codec for each room (#52)
Browse files Browse the repository at this point in the history
* Change encoding back to h264

* Add enforcing video codec for each room

* Fix tests

* Fix tests

* Requested changes
  • Loading branch information
Karolk99 committed Jul 14, 2023
1 parent d1e8635 commit 70f2ce4
Show file tree
Hide file tree
Showing 18 changed files with 214 additions and 82 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,7 @@ $RECYCLE.BIN/

# Default Jellyfish output directory
jellyfish_output


# asdf
.tool-versions
2 changes: 1 addition & 1 deletion lib/jellyfish/component.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ defmodule Jellyfish.Component do
@type t :: %__MODULE__{
id: id(),
type: component(),
engine_endpoint: Membrane.ChildrenSpec.child_definition_t()
engine_endpoint: Membrane.ChildrenSpec.child_definition()
}

@spec parse_type(String.t()) :: {:ok, component()} | {:error, :invalid_type}
Expand Down
26 changes: 23 additions & 3 deletions lib/jellyfish/component/hls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ defmodule Jellyfish.Component.HLS do
@behaviour Jellyfish.Endpoint.Config

alias Membrane.RTC.Engine.Endpoint.HLS
alias Membrane.RTC.Engine.Endpoint.HLS.HLSConfig
alias Membrane.RTC.Engine.Endpoint.HLS.{CompositorConfig, HLSConfig, MixerConfig}
alias Membrane.Time

@segment_duration Time.seconds(4)
@partial_segment_duration Time.milliseconds(400)

@impl true
def config(options) do
Expand All @@ -18,8 +22,24 @@ defmodule Jellyfish.Component.HLS do
rtc_engine: options.engine_pid,
owner: self(),
output_directory: output_dir,
mixer_config: nil,
hls_config: %HLSConfig{}
mixer_config: %MixerConfig{
video: %CompositorConfig{
stream_format: %Membrane.RawVideo{
width: 1920,
height: 1080,
pixel_format: :I420,
framerate: {24, 1},
aligned: true
}
}
},
hls_config: %HLSConfig{
hls_mode: :muxed_av,
mode: :live,
target_window_duration: :infinity,
segment_duration: @segment_duration,
partial_segment_duration: @partial_segment_duration
}
}}
end
end
2 changes: 1 addition & 1 deletion lib/jellyfish/endpoint/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ defmodule Jellyfish.Endpoint.Config do
An interface for RTC Engine endpoint configuration.
"""

@callback config(map()) :: {:ok, Membrane.ChildrenSpec.child_definition_t()} | {:error, term()}
@callback config(map()) :: {:ok, Membrane.ChildrenSpec.child_definition()} | {:error, term()}
end
2 changes: 1 addition & 1 deletion lib/jellyfish/peer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ defmodule Jellyfish.Peer do
type: peer(),
status: status(),
socket_pid: pid() | nil,
engine_endpoint: Membrane.ChildrenSpec.child_definition_t()
engine_endpoint: Membrane.ChildrenSpec.child_definition()
}

@spec parse_type(String.t()) :: {:ok, peer()} | {:error, :invalid_type}
Expand Down
37 changes: 37 additions & 0 deletions lib/jellyfish/peer/webrtc.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Jellyfish.Peer.WebRTC do
alias Membrane.RTC.Engine.Endpoint.WebRTC
alias Membrane.RTC.Engine.Endpoint.WebRTC.SimulcastConfig
alias Membrane.WebRTC.Extension.{Mid, RepairedRid, Rid, TWCC}
alias Membrane.WebRTC.Track.Encoding

@impl true
def config(options) do
Expand All @@ -26,6 +27,18 @@ defmodule Jellyfish.Peer.WebRTC do
webrtc_extensions = [Mid, Rid, RepairedRid, TWCC]
network_options = options.network_options

filter_codecs =
case options.enforce_encoding do
:h264 ->
&filter_codecs_h264/1

:vp8 ->
&filter_codecs_vp8/1

nil ->
&any_codecs/1
end

{:ok,
%WebRTC{
rtc_engine: options.engine_pid,
Expand All @@ -34,6 +47,7 @@ defmodule Jellyfish.Peer.WebRTC do
integrated_turn_options: network_options[:integrated_turn_options],
integrated_turn_domain: network_options[:integrated_turn_domain],
handshake_opts: handshake_options,
filter_codecs: filter_codecs,
log_metadata: [peer_id: options.peer_id],
trace_context: nil,
webrtc_extensions: webrtc_extensions,
Expand All @@ -43,4 +57,27 @@ defmodule Jellyfish.Peer.WebRTC do
}
}}
end

defp filter_codecs_h264(%Encoding{name: "H264", format_params: fmtp}) do
import Bitwise

# Only accept constrained baseline
# based on RFC 6184, Table 5.
case fmtp.profile_level_id >>> 16 do
0x42 -> (fmtp.profile_level_id &&& 0x00_4F_00) == 0x00_40_00
0x4D -> (fmtp.profile_level_id &&& 0x00_8F_00) == 0x00_80_00
0x58 -> (fmtp.profile_level_id &&& 0x00_CF_00) == 0x00_C0_00
_otherwise -> false
end
end

defp filter_codecs_h264(encoding), do: filter_codecs(encoding)

defp filter_codecs_vp8(%Encoding{name: "VP8"}), do: true
defp filter_codecs_vp8(encoding), do: filter_codecs(encoding)

defp any_codecs(_encoding), do: true

defp filter_codecs(%Encoding{name: "opus"}), do: true
defp filter_codecs(_rtp_mapping), do: false
end
31 changes: 22 additions & 9 deletions lib/jellyfish/room.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ defmodule Jellyfish.Room do

@type id :: String.t()
@type max_peers :: non_neg_integer() | nil
@type enforce_encoding :: :h264 | :vp8 | nil

@typedoc """
This module contains:
Expand All @@ -35,17 +36,24 @@ defmodule Jellyfish.Room do
"""
@type t :: %__MODULE__{
id: id(),
config: %{max_peers: max_peers(), simulcast?: boolean()},
config: %{
max_peers: max_peers(),
enforce_encoding: enforce_encoding(),
simulcast?: boolean()
},
components: %{Component.id() => Component.t()},
peers: %{Peer.id() => Peer.t()},
engine_pid: pid(),
network_options: map()
}

@spec start(max_peers()) :: {:ok, pid(), id()}
def start(max_peers) do
@spec start(max_peers(), enforce_encoding()) :: {:ok, pid(), id()}
def start(max_peers, enforce_encoding) do
id = UUID.uuid4()
{:ok, pid} = GenServer.start(__MODULE__, [id, max_peers], name: registry_id(id))

{:ok, pid} =
GenServer.start(__MODULE__, [id, max_peers, enforce_encoding], name: registry_id(id))

{:ok, pid, id}
end

Expand Down Expand Up @@ -103,8 +111,8 @@ defmodule Jellyfish.Room do
end

@impl true
def init([id, max_peers]) do
state = new(id, max_peers)
def init([id, max_peers, enforce_encoding]) do
state = new(id, max_peers, enforce_encoding)
Logger.metadata(room_id: id)
Logger.info("Initialize room")

Expand All @@ -122,7 +130,12 @@ defmodule Jellyfish.Room do
if Enum.count(state.peers) == state.config.max_peers do
{{:error, :reached_peers_limit}, state}
else
options = %{engine_pid: state.engine_pid, network_options: state.network_options}
options = %{
engine_pid: state.engine_pid,
network_options: state.network_options,
enforce_encoding: state.config.enforce_encoding
}

peer = Peer.new(peer_type, options)
state = put_in(state, [:peers, peer.id], peer)

Expand Down Expand Up @@ -306,7 +319,7 @@ defmodule Jellyfish.Room do
{:noreply, state}
end

defp new(id, max_peers) do
defp new(id, max_peers, enforce_encoding) do
rtc_engine_options = [
id: id
]
Expand Down Expand Up @@ -337,7 +350,7 @@ defmodule Jellyfish.Room do

%__MODULE__{
id: id,
config: %{max_peers: max_peers},
config: %{max_peers: max_peers, enforce_encoding: enforce_encoding},
engine_pid: pid,
network_options: [integrated_turn_options: integrated_turn_options]
}
Expand Down
50 changes: 34 additions & 16 deletions lib/jellyfish/room_service.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ defmodule Jellyfish.RoomService do
|> Enum.reject(&(&1 == nil))
end

@spec create_room(Room.max_peers()) :: {:ok, Room.t()} | {:error, :bad_arg}
def create_room(max_peers) do
GenServer.call(__MODULE__, {:create_room, max_peers})
@spec create_room(Room.max_peers(), Room.enforce_encoding()) ::
{:ok, Room.t()} | {:error, :invalid_max_peers | :invalid_enforce_encoding}
def create_room(max_peers, enforce_encoding) do
GenServer.call(__MODULE__, {:create_room, max_peers, enforce_encoding})
end

@spec delete_room(Room.id()) :: :ok | {:error, :room_not_found}
Expand All @@ -70,24 +71,32 @@ defmodule Jellyfish.RoomService do
end

@impl true
def handle_call({:create_room, max_peers}, _from, state)
when is_nil(max_peers) or (is_integer(max_peers) and max_peers >= 0) do
{:ok, room_pid, room_id} = Room.start(max_peers)
room = Room.get_state(room_id)
Process.monitor(room_pid)
def handle_call({:create_room, max_peers, enforce_encoding}, _from, state) do
with :ok <- validate_max_peers(max_peers),
{:ok, enforce_encoding} <- encoding_to_atom(enforce_encoding),
{:ok, room_pid, room_id} <- Room.start(max_peers, enforce_encoding) do
room = Room.get_state(room_id)
Process.monitor(room_pid)

state = put_in(state, [:rooms, room_pid], room_id)
state = put_in(state, [:rooms, room_pid], room_id)

Logger.info("Created room #{inspect(room.id)}")
Logger.info("Created room #{inspect(room.id)}")

Phoenix.PubSub.broadcast(Jellyfish.PubSub, "server_notification", {:room_created, room_id})
Phoenix.PubSub.broadcast(
Jellyfish.PubSub,
"server_notification",
{:room_created, room_id}
)

{:reply, {:ok, room}, state}
end
{:reply, {:ok, room}, state}
else
{:error, :max_peers} ->
{:reply, {:error, :invalid_max_peers}, state}

@impl true
def handle_call({:create_room, _max_peers}, _from, state),
do: {:reply, {:error, :bad_arg}, state}
{:error, :enforce_encoding} ->
{:reply, {:error, :invalid_enforce_encoding}, state}
end
end

@impl true
def handle_call({:delete_room, room_id}, _from, state) do
Expand Down Expand Up @@ -140,4 +149,13 @@ defmodule Jellyfish.RoomService do
Logger.warn("Room process with id #{inspect(room_id)} doesn't exist")
end
end

defp validate_max_peers(nil), do: :ok
defp validate_max_peers(max_peers) when is_integer(max_peers) and max_peers >= 0, do: :ok
defp validate_max_peers(_max_peers), do: {:error, :max_peers}

defp encoding_to_atom("h264"), do: {:ok, :h264}
defp encoding_to_atom("vp8"), do: {:ok, :vp8}
defp encoding_to_atom(nil), do: {:ok, nil}
defp encoding_to_atom(_encoding), do: {:error, :enforce_encoding}
end
6 changes: 6 additions & 0 deletions lib/jellyfish_web/api_spec/room.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ defmodule JellyfishWeb.ApiSpec.Room do
example: 10,
description: "Maximum amount of peers allowed into the room",
nullable: true
},
enforceEncoding: %Schema{
description: "Enforces video codec for each peer in the room",
type: :string,
enum: ["h264", "vp8"],
nullable: true
}
}
})
Expand Down
9 changes: 7 additions & 2 deletions lib/jellyfish_web/controllers/room_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,18 @@ defmodule JellyfishWeb.RoomController do

def create(conn, params) do
with max_peers <- Map.get(params, "maxPeers"),
{:ok, room} <- RoomService.create_room(max_peers) do
enforce_encoding <- Map.get(params, "enforceEncoding"),
{:ok, room} <- RoomService.create_room(max_peers, enforce_encoding) do
conn
|> put_resp_content_type("application/json")
|> put_status(:created)
|> render("show.json", room: room)
else
{:error, :bad_arg} -> {:error, :bad_request, "maxPeers must be a number"}
{:error, :invalid_max_peers} ->
{:error, :bad_request, "maxPeers must be a number"}

{:error, :invalid_enforce_encoding} ->
{:error, :bad_request, "enforceEncoding must be 'h264' or 'vp8'"}
end
end

Expand Down
9 changes: 8 additions & 1 deletion lib/jellyfish_web/server_socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,10 @@ defmodule JellyfishWeb.ServerSocket do
}
)

config = struct!(RoomState.Config, room.config)
config =
room.config
|> Map.update!(:enforce_encoding, &to_proto_encoding/1)
|> then(&struct!(RoomState.Config, &1))

%RoomState{id: room.id, config: config, peers: peers, components: components}
end
Expand All @@ -238,6 +241,10 @@ defmodule JellyfishWeb.ServerSocket do
defp to_proto_type(Jellyfish.Component.RTSP), do: :TYPE_RTSP
defp to_proto_type(Jellyfish.Peer.WebRTC), do: :TYPE_WEBRTC

defp to_proto_encoding(:h264), do: :ENCODING_H264
defp to_proto_encoding(:vp8), do: :ENCODING_VP8
defp to_proto_encoding(nil), do: :ENCODING_UNSPECIFIED

defp to_proto_status(:disconnected), do: :STATUS_DISCONNECTED
defp to_proto_status(:connected), do: :STATUS_CONNECTED
end
15 changes: 15 additions & 0 deletions lib/protos/jellyfish/server_notifications.pb.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ defmodule Jellyfish.ServerMessage.SubscribeRequest.ServerNotification.Option do
field :OPTION_ALL, 1
end

defmodule Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Config.Encoding do
@moduledoc false

use Protobuf, enum: true, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :ENCODING_UNSPECIFIED, 0
field :ENCODING_H264, 1
field :ENCODING_VP8, 2
end

defmodule Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Peer.Type do
@moduledoc false

Expand Down Expand Up @@ -130,6 +140,11 @@ defmodule Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Config do
use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :max_peers, 1, type: :uint32, json_name: "maxPeers"

field :enforce_encoding, 2,
type: Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Config.Encoding,
json_name: "enforceEncoding",
enum: true
end

defmodule Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Peer do
Expand Down
Loading

0 comments on commit 70f2ce4

Please sign in to comment.