Skip to content

Commit

Permalink
Unique named partials (#81)
Browse files Browse the repository at this point in the history
* Ets: set read_concurrency to true

* Remove from partials byte ranges and replace them with unique names

* Block preload hint request until partial get available

* Fix tests

* Change requests

* Add test for preload hints

* Fix setting playlist_playable

* Update dependencies

* Update dependencies
  • Loading branch information
Karolk99 committed Sep 11, 2023
1 parent 48cb32c commit 494e514
Show file tree
Hide file tree
Showing 12 changed files with 199 additions and 121 deletions.
2 changes: 2 additions & 0 deletions lib/jellyfish/component.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ defmodule Jellyfish.Component do
and publishes the appropriate track to other Components.
"""

use Bunch.Access

alias Jellyfish.Component.{HLS, RTSP}

@callback metadata() :: map()
Expand Down
30 changes: 15 additions & 15 deletions lib/jellyfish/component/hls/ets_helper.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ defmodule Jellyfish.Component.HLS.EtsHelper do
else
# Ets is public because ll-storage can't delete table.
# If we change that storage can be protected
table = :ets.new(:hls_storage, [:public])
#
# Read concurrency can cause performance degradation when the common access pattern
# is a few read operations interleaved with a few write operations repeatedly.
# When used on a larger scale it should be carefully tested
table = :ets.new(:hls_storage, [:public, read_concurrency: true])

:ets.insert(@rooms_to_tables, {room_id, table})
{:ok, table}
end
Expand Down Expand Up @@ -67,27 +72,24 @@ defmodule Jellyfish.Component.HLS.EtsHelper do
:ets.insert(table, {@delta_recent_partial_key, partial})
end

@spec add_partial(:ets.table(), binary(), String.t(), non_neg_integer()) :: true
def add_partial(table, partial, filename, offset) do
key = generate_partial_key(filename, offset)
:ets.insert(table, {key, partial})
@spec add_partial(:ets.table(), binary(), String.t()) :: true
def add_partial(table, partial, filename) do
:ets.insert(table, {filename, partial})
end

@spec delete_partial(:ets.table(), String.t(), non_neg_integer()) :: true
def delete_partial(table, filename, offset) do
key = generate_partial_key(filename, offset)
:ets.delete(table, key)
@spec delete_partial(:ets.table(), String.t()) :: true
def delete_partial(table, filename) do
:ets.delete(table, filename)
end

###
### ETS GETTERS
###

@spec get_partial(Room.id(), String.t(), non_neg_integer()) ::
@spec get_partial(Room.id(), String.t()) ::
{:ok, binary()} | {:error, atom()}
def get_partial(room_id, filename, offset) do
key = generate_partial_key(filename, offset)
get_from_ets(room_id, key)
def get_partial(room_id, filename) do
get_from_ets(room_id, filename)
end

@spec get_recent_partial(Room.id()) ::
Expand Down Expand Up @@ -140,6 +142,4 @@ defmodule Jellyfish.Component.HLS.EtsHelper do
defp room_exists?(room_id) do
:ets.lookup(@rooms_to_tables, room_id) != []
end

defp generate_partial_key(filename, offset), do: "#{filename}_#{offset}"
end
21 changes: 10 additions & 11 deletions lib/jellyfish/component/hls/ll_storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,17 @@ defmodule Jellyfish.Component.HLS.LLStorage do
end

defp store_partial_segment(
filename,
segment_name,
content,
%{byte_offset: offset, sequence_number: sequence_number},
%{sequence_number: sequence_number, partial_name: partial_name},
%__MODULE__{directory: directory} = state
) do
result = write_to_file(directory, filename, content, [:binary, :append])
result = write_to_file(directory, segment_name, content, [:binary, :append])

state =
state
|> update_sequence_numbers(sequence_number)
|> add_partial_to_ets(filename, offset, content)
|> add_partial_to_ets(partial_name, content)

{result, state}
end
Expand Down Expand Up @@ -119,26 +119,25 @@ defmodule Jellyfish.Component.HLS.LLStorage do
segment_sn: segment_sn,
partial_sn: partial_sn
} = state,
filename,
offset,
partial_name,
content
) do
EtsHelper.add_partial(table, content, filename, offset)
EtsHelper.add_partial(table, content, partial_name)

partial = {segment_sn, partial_sn}
%{state | partials_in_ets: [{partial, {filename, offset}} | partials_in_ets]}
%{state | partials_in_ets: [{partial, partial_name} | partials_in_ets]}
end

defp remove_partials_from_ets(
%{partials_in_ets: partials_in_ets, segment_sn: curr_segment_sn, table: table} = state
) do
{partials_in_ets, partial_to_be_removed} =
Enum.split_with(partials_in_ets, fn {{segment_sn, _partial_sn}, _partial} ->
Enum.split_with(partials_in_ets, fn {{segment_sn, _partial_sn}, _partial_name} ->
segment_sn + @ets_cached_duration_in_segments > curr_segment_sn
end)

Enum.each(partial_to_be_removed, fn {_sn, {filename, offset}} ->
EtsHelper.delete_partial(table, filename, offset)
Enum.each(partial_to_be_removed, fn {_sn, partial_name} ->
EtsHelper.delete_partial(table, partial_name)
end)

%{state | partials_in_ets: partials_in_ets}
Expand Down
100 changes: 93 additions & 7 deletions lib/jellyfish/component/hls/request_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule Jellyfish.Component.HLS.RequestHandler do
@enforce_keys [:room_id, :room_pid]
defstruct @enforce_keys ++
[
preload_hints: [],
manifest: %{waiting_pids: %{}, last_partial: nil},
delta_manifest: %{waiting_pids: %{}, last_partial: nil}
]
Expand All @@ -24,7 +25,8 @@ defmodule Jellyfish.Component.HLS.RequestHandler do
room_id: Room.id(),
room_pid: pid(),
manifest: status(),
delta_manifest: status()
delta_manifest: status(),
preload_hints: [pid()]
}

###
Expand All @@ -45,10 +47,24 @@ defmodule Jellyfish.Component.HLS.RequestHandler do
@doc """
Handles ll-hls partial requests
"""
@spec handle_partial_request(Room.id(), String.t(), non_neg_integer()) ::
@spec handle_partial_request(Room.id(), String.t()) ::
{:ok, binary()} | {:error, atom()}
def handle_partial_request(room_id, filename, offset) do
EtsHelper.get_partial(room_id, filename, offset)
def handle_partial_request(room_id, filename) do
with {:ok, partial} <- EtsHelper.get_partial(room_id, filename) do
{:ok, partial}
else
{:error, :file_not_found} ->
case is_preload_hint(room_id, filename) do
{:ok, true} ->
wait_for_partial_ready(room_id, filename)

_other ->
{:error, :file_not_found}
end

error ->
error
end
end

@doc """
Expand Down Expand Up @@ -117,9 +133,17 @@ defmodule Jellyfish.Component.HLS.RequestHandler do
end

@impl true
def handle_cast({:update_recent_partial, last_partial, manifest}, state) do
def handle_cast(
{:update_recent_partial, last_partial, manifest},
%{preload_hints: preload_hints} = state
) do
status = Map.fetch!(state, manifest)
state = Map.put(state, manifest, update_status(status, last_partial))

state =
state
|> Map.put(manifest, update_and_notify_manifest_ready(status, last_partial))
|> Map.put(:preload_hints, update_and_notify_preload_hint_ready(preload_hints))

{:noreply, state}
end

Expand All @@ -134,6 +158,17 @@ defmodule Jellyfish.Component.HLS.RequestHandler do
{:noreply, state}
end

@impl true
def handle_cast({:preload_hint, room_id, filename, from}, state) do
with {:ok, _partial} <- EtsHelper.get_partial(room_id, filename) do
send(from, :preload_hint_ready)
{:noreply, state}
else
{:error, _reason} ->
{:noreply, %{state | preload_hints: [from | state.preload_hints]}}
end
end

@impl true
def handle_cast(:shutdown, state) do
{:stop, :normal, state}
Expand Down Expand Up @@ -162,13 +197,28 @@ defmodule Jellyfish.Component.HLS.RequestHandler do
end
end

defp update_status(status, last_partial) do
defp wait_for_partial_ready(room_id, filename) do
GenServer.cast(registry_id(room_id), {:preload_hint, room_id, filename, self()})

receive do
:preload_hint_ready ->
EtsHelper.get_partial(room_id, filename)
end
end

defp update_and_notify_preload_hint_ready(preload_hints) do
send_preload_hint_ready(preload_hints)
[]
end

defp update_and_notify_manifest_ready(status, last_partial) do
{waiting_pids, status} =
status
|> Map.put(:last_partial, last_partial)
|> pop_in([:waiting_pids, last_partial])

send_partial_ready(waiting_pids)

status
end

Expand All @@ -186,6 +236,38 @@ defmodule Jellyfish.Component.HLS.RequestHandler do
end
end

defp is_preload_hint(room_id, filename) do
partial_sn = get_partial_sn(filename)

with {:ok, recent_partial_sn} <- EtsHelper.get_recent_partial(room_id) do
{:ok, check_if_preload_hint(partial_sn, recent_partial_sn)}
end
end

defp check_if_preload_hint({segment_sn, partial_sn}, {recent_segment_sn, recent_partial_sn}) do
cond do
segment_sn - recent_segment_sn == 1 and partial_sn == 0 -> true
segment_sn == recent_segment_sn and (partial_sn - recent_partial_sn) in [0, 1] -> true
true -> false
end
end

defp check_if_preload_hint(_partial_sn, _recent_partial_sn) do
require Logger

Logger.warning("Unable to parse partial segment filename")
false
end

# Filename example: muxed_segment_32_g2QABXZpZGVv_5_part.m4s
defp get_partial_sn(filename) do
filename
|> String.split("_")
|> Enum.filter(fn s -> match?({_integer, ""}, Integer.parse(s)) end)
|> Enum.map(fn sn -> String.to_integer(sn) end)
|> List.to_tuple()
end

defp registry_id(room_id), do: {:via, Registry, {Jellyfish.RequestHandlerRegistry, room_id}}

defp send_partial_ready(nil), do: nil
Expand All @@ -194,6 +276,10 @@ defmodule Jellyfish.Component.HLS.RequestHandler do
Enum.each(waiting_pids, fn pid -> send(pid, :manifest_ready) end)
end

defp send_preload_hint_ready(waiting_pids) do
Enum.each(waiting_pids, fn pid -> send(pid, :preload_hint_ready) end)
end

defp is_partial_ready(_partial, nil) do
false
end
Expand Down
2 changes: 1 addition & 1 deletion lib/jellyfish/room.ex
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ defmodule Jellyfish.Room do

Event.broadcast(:server_notification, {:hls_playable, state.id, endpoint_id})

state = update_in(state, [:components, endpoint_id], &Map.put(&1, :playable, true))
state = update_in(state, [:components, endpoint_id, :metadata], &Map.put(&1, :playable, true))
{:noreply, state}
end

Expand Down
26 changes: 2 additions & 24 deletions lib/jellyfish_web/controllers/hls_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,9 @@ defmodule JellyfishWeb.HLSController do
end

def index(conn, %{"room_id" => room_id, "filename" => filename}) do
offset =
conn
|> get_req_header("range")
|> get_offset()

result =
if String.ends_with?(filename, ".m4s") and offset != :undefined do
RequestHandler.handle_partial_request(room_id, filename, offset)
if String.ends_with?(filename, "_part.m4s") do
RequestHandler.handle_partial_request(room_id, filename)
else
RequestHandler.handle_file_request(room_id, filename)
end
Expand All @@ -102,21 +97,4 @@ defmodule JellyfishWeb.HLSController do
{:error, :not_found, "File not found"}
end
end

# Every partial request comes with a byte range which represents where specifically in the file partial is located.
# Example: "bytes=100-200" 100-200, represents the scope in which partial is located in the file.
defp get_offset(raw_range) do
case raw_range do
[] ->
:undefined

[raw_range] ->
"bytes=" <> range = raw_range

range
|> String.split("-")
|> Enum.map(&String.to_integer(&1))
|> List.first()
end
end
end
12 changes: 8 additions & 4 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,14 @@ defmodule Jellyfish.MixProject do
{:protobuf, "~> 0.12.0"},

# Membrane deps
{:membrane_rtc_engine, "~> 0.17.0"},
{:membrane_rtc_engine_webrtc, "~> 0.2.1"},
{:membrane_rtc_engine_hls, "~> 0.2.0"},
{:membrane_rtc_engine_rtsp, "~> 0.2.0"},
{:membrane_rtc_engine,
github: "jellyfish-dev/membrane_rtc_engine", sparse: "engine", override: true},
{:membrane_rtc_engine_webrtc,
github: "jellyfish-dev/membrane_rtc_engine", sparse: "webrtc", override: true},
{:membrane_rtc_engine_hls,
github: "jellyfish-dev/membrane_rtc_engine", sparse: "hls", override: true},
{:membrane_rtc_engine_rtsp,
github: "jellyfish-dev/membrane_rtc_engine", sparse: "rtsp", override: true},
{:membrane_ice_plugin, "~> 0.16.0"},
{:membrane_telemetry_metrics, "~> 0.1.0"},

Expand Down
Loading

0 comments on commit 494e514

Please sign in to comment.