From dd7b2839767d4142730a3c29d51d09a2bd3ba3b3 Mon Sep 17 00:00:00 2001 From: anders-avos Date: Sat, 7 Sep 2024 18:13:03 +0200 Subject: [PATCH] Configuration of outgoing RTX streams (#607) * interceptor: Add AssociatedStreamInfo and populate for rtx streams * rtp_sender: Create outgoing rtx streams * sdp: Assign repair stream regardless of attribute order --- interceptor/src/stream_info.rs | 9 + webrtc/src/api/mod.rs | 2 +- webrtc/src/api/setting_engine/mod.rs | 8 + webrtc/src/peer_connection/mod.rs | 2 +- .../peer_connection_internal.rs | 19 +- webrtc/src/peer_connection/sdp/mod.rs | 34 +++- webrtc/src/peer_connection/sdp/sdp_test.rs | 166 ++++++++++++++++- webrtc/src/rtp_transceiver/mod.rs | 4 +- webrtc/src/rtp_transceiver/rtp_codec.rs | 28 +++ .../src/rtp_transceiver/rtp_receiver/mod.rs | 26 ++- .../rtp_receiver/rtp_receiver_test.rs | 2 +- webrtc/src/rtp_transceiver/rtp_sender/mod.rs | 134 +++++++++++++- .../rtp_sender/rtp_sender_test.rs | 173 ++++++++++++++++++ 13 files changed, 569 insertions(+), 38 deletions(-) diff --git a/interceptor/src/stream_info.rs b/interceptor/src/stream_info.rs index 5e9f93d5e..e17693c95 100644 --- a/interceptor/src/stream_info.rs +++ b/interceptor/src/stream_info.rs @@ -20,6 +20,15 @@ pub struct StreamInfo { pub channels: u16, pub sdp_fmtp_line: String, pub rtcp_feedback: Vec, + pub associated_stream: Option, +} + +/// AssociatedStreamInfo provides a mapping from an auxiliary stream (RTX, FEC, +/// etc.) back to the original stream. +#[derive(Default, Debug, Clone)] +pub struct AssociatedStreamInfo { + pub ssrc: u32, + pub payload_type: u8, } /// RTCPFeedback signals the connection to use additional RTCP packet types. diff --git a/webrtc/src/api/mod.rs b/webrtc/src/api/mod.rs index 2252dfbb2..e999d022e 100644 --- a/webrtc/src/api/mod.rs +++ b/webrtc/src/api/mod.rs @@ -159,11 +159,11 @@ impl API { ) -> RTCRtpSender { let kind = track.as_ref().map(|t| t.kind()).unwrap_or_default(); RTCRtpSender::new( - self.setting_engine.get_receive_mtu(), track, kind, transport, Arc::clone(&self.media_engine), + Arc::clone(&self.setting_engine), interceptor, false, ) diff --git a/webrtc/src/api/setting_engine/mod.rs b/webrtc/src/api/setting_engine/mod.rs index 3387d9d6e..17170a706 100644 --- a/webrtc/src/api/setting_engine/mod.rs +++ b/webrtc/src/api/setting_engine/mod.rs @@ -78,6 +78,7 @@ pub struct SettingEngine { pub(crate) srtp_protection_profiles: Vec, pub(crate) receive_mtu: usize, pub(crate) mid_generator: Option String + Send + Sync>>, + pub(crate) enable_sender_rtx: bool, } impl SettingEngine { @@ -334,4 +335,11 @@ impl SettingEngine { pub fn set_mid_generator(&mut self, f: impl Fn(isize) -> String + Send + Sync + 'static) { self.mid_generator = Some(Arc::new(f)); } + + /// enable_sender_rtx allows outgoing rtx streams to be created where applicable. + /// RTPSender will create an RTP retransmission stream for each source stream where a retransmission + /// codec is configured. + pub fn enable_sender_rtx(&mut self, is_enabled: bool) { + self.enable_sender_rtx = is_enabled; + } } diff --git a/webrtc/src/peer_connection/mod.rs b/webrtc/src/peer_connection/mod.rs index 9f99e18b2..222e053e1 100644 --- a/webrtc/src/peer_connection/mod.rs +++ b/webrtc/src/peer_connection/mod.rs @@ -1415,11 +1415,11 @@ impl RTCPeerConnection { let sender = Arc::new( RTCRtpSender::new( - receive_mtu, None, kind, Arc::clone(&self.internal.dtls_transport), Arc::clone(&self.internal.media_engine), + Arc::clone(&self.internal.setting_engine), Arc::clone(&self.interceptor), false, ) diff --git a/webrtc/src/peer_connection/peer_connection_internal.rs b/webrtc/src/peer_connection/peer_connection_internal.rs index 24e07b1bd..032cfde24 100644 --- a/webrtc/src/peer_connection/peer_connection_internal.rs +++ b/webrtc/src/peer_connection/peer_connection_internal.rs @@ -528,11 +528,11 @@ impl PeerConnectionInternal { let sender = Arc::new( RTCRtpSender::new( - self.setting_engine.get_receive_mtu(), None, kind, Arc::clone(&self.dtls_transport), Arc::clone(&self.media_engine), + Arc::clone(&self.setting_engine), interceptor, false, ) @@ -582,11 +582,11 @@ impl PeerConnectionInternal { let s = Arc::new( RTCRtpSender::new( - self.setting_engine.get_receive_mtu(), Some(Arc::clone(&track)), track.kind(), Arc::clone(&self.dtls_transport), Arc::clone(&self.media_engine), + Arc::clone(&self.setting_engine), Arc::clone(&interceptor), false, ) @@ -1080,6 +1080,7 @@ impl PeerConnectionInternal { params.codecs[0].payload_type, params.codecs[0].capability.clone(), ¶ms.header_extensions, + None, ); let (rtp_read_stream, rtp_interceptor, rtcp_read_stream, rtcp_interceptor) = self .dtls_transport @@ -1386,7 +1387,7 @@ impl PeerConnectionInternal { let sender = transceiver.sender().await; let track_encodings = sender.track_encodings.lock().await; for encoding in track_encodings.iter() { - let track_id = encoding.track.id().to_string(); + let track_id = encoding.track.id(); let kind = match encoding.track.kind() { RTPCodecType::Unspecified => continue, RTPCodecType::Audio => "audio", @@ -1394,12 +1395,22 @@ impl PeerConnectionInternal { }; track_infos.push(TrackInfo { - track_id, + track_id: track_id.to_owned(), ssrc: encoding.ssrc, mid: mid.to_owned(), rid: encoding.track.rid().map(Into::into), kind, }); + + if let Some(rtx) = &encoding.rtx { + track_infos.push(TrackInfo { + track_id: track_id.to_owned(), + ssrc: rtx.ssrc, + mid: mid.to_owned(), + rid: encoding.track.rid().map(Into::into), + kind, + }); + } } } diff --git a/webrtc/src/peer_connection/sdp/mod.rs b/webrtc/src/peer_connection/sdp/mod.rs index 34983cc59..1ee813415 100644 --- a/webrtc/src/peer_connection/sdp/mod.rs +++ b/webrtc/src/peer_connection/sdp/mod.rs @@ -178,21 +178,12 @@ pub(crate) fn track_details_from_sdp( } } - let mut repair_ssrc = 0; - for (repair, base) in &rtx_repair_flows { - if *base == ssrc { - repair_ssrc = *repair; - //TODO: no break? - } - } - if track_idx < tracks_in_media_section.len() { tracks_in_media_section[track_idx].mid = SmolStr::from(mid_value); tracks_in_media_section[track_idx].kind = codec_type; stream_id.clone_into(&mut tracks_in_media_section[track_idx].stream_id); track_id.clone_into(&mut tracks_in_media_section[track_idx].id); tracks_in_media_section[track_idx].ssrcs = vec![ssrc]; - tracks_in_media_section[track_idx].repair_ssrc = repair_ssrc; } else { let track_details = TrackDetails { mid: SmolStr::from(mid_value), @@ -200,7 +191,6 @@ pub(crate) fn track_details_from_sdp( stream_id: stream_id.to_owned(), id: track_id.to_owned(), ssrcs: vec![ssrc], - repair_ssrc, ..Default::default() }; tracks_in_media_section.push(track_details); @@ -210,6 +200,13 @@ pub(crate) fn track_details_from_sdp( _ => {} }; } + for (repair, base) in &rtx_repair_flows { + for track in &mut tracks_in_media_section { + if track.ssrcs.contains(base) { + track.repair_ssrc = *repair; + } + } + } // If media line is using RTP Stream Identifier Source Description per RFC8851 // we will need to override tracks, and remove ssrcs. @@ -595,6 +592,23 @@ pub(crate) async fn add_transceiver_sdp( track.stream_id().to_owned(), /* streamLabel */ track.id().to_owned(), ); + + if encoding.rtx.ssrc != 0 { + media = media.with_media_source( + encoding.rtx.ssrc, + track.stream_id().to_owned(), + track.stream_id().to_owned(), + track.id().to_owned(), + ); + + media = media.with_value_attribute( + ATTR_KEY_SSRCGROUP.to_owned(), + format!( + "{} {} {}", + SEMANTIC_TOKEN_FLOW_IDENTIFICATION, encoding.ssrc, encoding.rtx.ssrc + ), + ); + } } if send_parameters.encodings.len() > 1 { diff --git a/webrtc/src/peer_connection/sdp/sdp_test.rs b/webrtc/src/peer_connection/sdp/sdp_test.rs index 9fc3cf9f8..1bfd1571b 100644 --- a/webrtc/src/peer_connection/sdp/sdp_test.rs +++ b/webrtc/src/peer_connection/sdp/sdp_test.rs @@ -364,10 +364,6 @@ fn test_track_details_from_sdp() -> Result<()> { key: "sendrecv".to_owned(), value: None, }, - Attribute { - key: "ssrc-group".to_owned(), - value: Some("FID 3000 4000".to_owned()), - }, Attribute { key: "ssrc".to_owned(), value: Some("3000 msid:video_trk_label video_trk_guid".to_owned()), @@ -376,6 +372,10 @@ fn test_track_details_from_sdp() -> Result<()> { key: "ssrc".to_owned(), value: Some("4000 msid:rtx_trk_label rtx_trck_guid".to_owned()), }, + Attribute { + key: "ssrc-group".to_owned(), + value: Some("FID 3000 4000".to_owned()), + }, ], ..Default::default() }, @@ -441,6 +441,7 @@ fn test_track_details_from_sdp() -> Result<()> { assert_eq!(track.kind, RTPCodecType::Video); assert_eq!(track.ssrcs[0], 3000); assert_eq!(track.stream_id, "video_trk_label"); + assert_eq!(track.repair_ssrc, 4000); } else { panic!("missing video track with ssrc:3000"); } @@ -694,11 +695,11 @@ async fn test_media_description_fingerprints() -> Result<()> { media[i].transceivers[0] .set_sender(Arc::new( RTCRtpSender::new( - api.setting_engine.get_receive_mtu(), Some(track), RTPCodecType::Video, Arc::new(RTCDtlsTransport::default()), Arc::clone(&api.media_engine), + Arc::clone(&api.setting_engine), Arc::clone(&interceptor), false, ) @@ -1148,6 +1149,161 @@ async fn test_populate_sdp() -> Result<()> { assert_eq!(offer_sdp.attribute(ATTR_KEY_GROUP), None); } + // "Sender RTX" + { + let mut se = SettingEngine::default(); + se.enable_sender_rtx(true); + + let mut me = MediaEngine::default(); + me.register_default_codecs()?; + + me.register_codec( + RTCRtpCodecParameters { + capability: RTCRtpCodecCapability { + mime_type: "video/rtx".to_owned(), + clock_rate: 90000, + channels: 0, + sdp_fmtp_line: "apt=96".to_string(), + rtcp_feedback: vec![], + }, + payload_type: 97, + ..Default::default() + }, + RTPCodecType::Video, + )?; + + me.push_codecs(me.video_codecs.clone(), RTPCodecType::Video) + .await; + + let api = APIBuilder::new() + .with_media_engine(me) + .with_setting_engine(se.clone()) + .build(); + let interceptor = api.interceptor_registry.build("")?; + let transport = Arc::new(RTCDtlsTransport::default()); + let receiver = Arc::new(api.new_rtp_receiver( + RTPCodecType::Video, + Arc::clone(&transport), + Arc::clone(&interceptor), + )); + + let codec = RTCRtpCodecCapability { + mime_type: "video/vp8".to_owned(), + ..Default::default() + }; + + let track = Arc::new(TrackLocalStaticSample::new_with_rid( + codec.clone(), + "video".to_owned(), + "f".to_owned(), + "webrtc-rs".to_owned(), + )); + + let sender = Arc::new( + api.new_rtp_sender( + Some(track), + Arc::clone(&transport), + Arc::clone(&interceptor), + ) + .await, + ); + + sender + .add_encoding(Arc::new(TrackLocalStaticSample::new_with_rid( + codec.clone(), + "video".to_owned(), + "h".to_owned(), + "webrtc-rs".to_owned(), + ))) + .await?; + + let tr = RTCRtpTransceiver::new( + receiver, + sender, + RTCRtpTransceiverDirection::Sendonly, + RTPCodecType::Video, + api.media_engine.video_codecs.clone(), + Arc::clone(&api.media_engine), + None, + ) + .await; + + let media_sections = vec![MediaSection { + id: "video".to_owned(), + transceivers: vec![tr], + data: false, + ..Default::default() + }]; + + let d = SessionDescription::default(); + + let params = PopulateSdpParams { + media_description_fingerprint: se.sdp_media_level_fingerprints, + is_icelite: se.candidates.ice_lite, + extmap_allow_mixed: true, + connection_role: DEFAULT_DTLS_ROLE_OFFER.to_connection_role(), + ice_gathering_state: RTCIceGatheringState::Complete, + match_bundle_group: None, + }; + let offer_sdp = populate_sdp( + d, + &[], + &api.media_engine, + &[], + &RTCIceParameters::default(), + &media_sections, + params, + ) + .await?; + + // Test codecs and FID groups + let mut found_vp8 = false; + let mut found_rtx = false; + let mut found_ssrcs: Vec<&str> = Vec::new(); + let mut found_fids = Vec::new(); + for desc in &offer_sdp.media_descriptions { + if desc.media_name.media != "video" { + continue; + } + for a in &desc.attributes { + if a.key.contains("rtpmap") { + if let Some(value) = &a.value { + if value == "96 VP8/90000" { + found_vp8 = true; + } else if value == "97 rtx/90000" { + found_rtx = true; + } + } + } else if a.key == "ssrc" { + if let Some((ssrc, _)) = a.value.as_ref().and_then(|v| v.split_once(' ')) { + found_ssrcs.push(ssrc); + } + } else if a.key == "ssrc-group" { + if let Some(group) = a.value.as_ref().and_then(|v| v.strip_prefix("FID ")) { + let Some((a, b)) = group.split_once(" ") else { + panic!("invalid FID format in sdp") + }; + + found_fids.extend([a, b]); + } + } + } + } + + found_fids.sort(); + + found_ssrcs.sort(); + // the sdp may have multiple attributes for each ssrc + found_ssrcs.dedup(); + + assert!(found_vp8, "vp8 should be present in sdp"); + assert!(found_rtx, "rtx should be present in sdp"); + assert_eq!(found_ssrcs.len(), 4, "all ssrcs should be present in sdp"); + assert_eq!(found_fids.len(), 4, "all fids should be present in sdp"); + + assert_eq!(found_ssrcs, found_fids); + } + Ok(()) } diff --git a/webrtc/src/rtp_transceiver/mod.rs b/webrtc/src/rtp_transceiver/mod.rs index e634cc2d4..d15c09886 100644 --- a/webrtc/src/rtp_transceiver/mod.rs +++ b/webrtc/src/rtp_transceiver/mod.rs @@ -7,7 +7,7 @@ use std::pin::Pin; use std::sync::atomic::Ordering; use std::sync::Arc; -use interceptor::stream_info::{RTPHeaderExtension, StreamInfo}; +use interceptor::stream_info::{AssociatedStreamInfo, RTPHeaderExtension, StreamInfo}; use interceptor::Attributes; use log::trace; use portable_atomic::{AtomicBool, AtomicU8}; @@ -136,6 +136,7 @@ pub(crate) fn create_stream_info( payload_type: PayloadType, codec: RTCRtpCodecCapability, webrtc_header_extensions: &[RTCRtpHeaderExtensionParameters], + associated_stream: Option, ) -> StreamInfo { let header_extensions: Vec = webrtc_header_extensions .iter() @@ -165,6 +166,7 @@ pub(crate) fn create_stream_info( channels: codec.channels, sdp_fmtp_line: codec.sdp_fmtp_line, rtcp_feedback: feedbacks, + associated_stream, } } diff --git a/webrtc/src/rtp_transceiver/rtp_codec.rs b/webrtc/src/rtp_transceiver/rtp_codec.rs index 3bb3aac23..c0fb9e732 100644 --- a/webrtc/src/rtp_transceiver/rtp_codec.rs +++ b/webrtc/src/rtp_transceiver/rtp_codec.rs @@ -168,3 +168,31 @@ pub(crate) fn codec_parameters_fuzzy_search( (RTCRtpCodecParameters::default(), CodecMatch::None) } + +pub(crate) fn codec_rtx_search( + original_codec: &RTCRtpCodecParameters, + available_codecs: &[RTCRtpCodecParameters], +) -> Option { + // find the rtx codec as defined in RFC4588 + + let (mime_kind, _) = original_codec.capability.mime_type.split_once("/")?; + let rtx_mime = format!("{mime_kind}/rtx"); + + for codec in available_codecs { + if codec.capability.mime_type != rtx_mime { + continue; + } + + let params = fmtp::parse(&codec.capability.mime_type, &codec.capability.sdp_fmtp_line); + + if params + .parameter("apt") + .and_then(|v| v.parse::().ok()) + .is_some_and(|apt| apt == original_codec.payload_type) + { + return Some(codec.clone()); + } + } + + None +} diff --git a/webrtc/src/rtp_transceiver/rtp_receiver/mod.rs b/webrtc/src/rtp_transceiver/rtp_receiver/mod.rs index 475c1fd38..55ba74f45 100644 --- a/webrtc/src/rtp_transceiver/rtp_receiver/mod.rs +++ b/webrtc/src/rtp_transceiver/rtp_receiver/mod.rs @@ -5,7 +5,7 @@ use std::fmt; use std::sync::Arc; use arc_swap::ArcSwapOption; -use interceptor::stream_info::RTPHeaderExtension; +use interceptor::stream_info::{AssociatedStreamInfo, RTPHeaderExtension}; use interceptor::{Attributes, Interceptor}; use log::trace; use smol_str::SmolStr; @@ -16,12 +16,12 @@ use crate::dtls_transport::RTCDtlsTransport; use crate::error::{flatten_errs, Error, Result}; use crate::peer_connection::sdp::TrackDetails; use crate::rtp_transceiver::rtp_codec::{ - codec_parameters_fuzzy_search, CodecMatch, RTCRtpCodecCapability, RTCRtpCodecParameters, - RTCRtpParameters, RTPCodecType, + codec_parameters_fuzzy_search, CodecMatch, RTCRtpCodecParameters, RTCRtpParameters, + RTPCodecType, }; use crate::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirection; use crate::rtp_transceiver::{ - create_stream_info, RTCRtpDecodingParameters, RTCRtpReceiveParameters, SSRC, + codec_rtx_search, create_stream_info, RTCRtpDecodingParameters, RTCRtpReceiveParameters, SSRC, }; use crate::track::track_remote::TrackRemote; use crate::track::{TrackStream, TrackStreams}; @@ -519,9 +519,9 @@ impl RTCRtpReceiver { }; let codec = if let Some(codec) = global_params.codecs.first() { - codec.capability.clone() + codec.clone() } else { - RTCRtpCodecCapability::default() + RTCRtpCodecParameters::default() }; for encoding in ¶meters.encodings { @@ -531,8 +531,9 @@ impl RTCRtpReceiver { "".to_owned(), encoding.ssrc, 0, - codec.clone(), + codec.capability.clone(), &global_params.header_extensions, + None, ); let (rtp_read_stream, rtp_interceptor, rtcp_read_stream, rtcp_interceptor) = self.internal @@ -585,12 +586,21 @@ impl RTCRtpReceiver { let rtx_ssrc = encoding.rtx.ssrc; if rtx_ssrc != 0 { + let rtx_info = AssociatedStreamInfo { + ssrc: encoding.ssrc, + payload_type: 0, + }; + + let rtx_codec = + codec_rtx_search(&codec, &global_params.codecs).unwrap_or(codec.clone()); + let stream_info = create_stream_info( "".to_owned(), rtx_ssrc, 0, - codec.clone(), + rtx_codec.capability, &global_params.header_extensions, + Some(rtx_info), ); let (rtp_read_stream, rtp_interceptor, rtcp_read_stream, rtcp_interceptor) = self .internal diff --git a/webrtc/src/rtp_transceiver/rtp_receiver/rtp_receiver_test.rs b/webrtc/src/rtp_transceiver/rtp_receiver/rtp_receiver_test.rs index 7520667db..451b6e2cf 100644 --- a/webrtc/src/rtp_transceiver/rtp_receiver/rtp_receiver_test.rs +++ b/webrtc/src/rtp_transceiver/rtp_receiver/rtp_receiver_test.rs @@ -12,7 +12,7 @@ use crate::peer_connection::peer_connection_test::{ close_pair_now, create_vnet_pair, signal_pair, until_connection_state, }; use crate::rtp_transceiver::rtp_codec::RTCRtpHeaderExtensionParameters; -use crate::rtp_transceiver::RTCPFeedback; +use crate::rtp_transceiver::{RTCPFeedback, RTCRtpCodecCapability}; use crate::track::track_local::track_local_static_sample::TrackLocalStaticSample; use crate::track::track_local::TrackLocal; diff --git a/webrtc/src/rtp_transceiver/rtp_sender/mod.rs b/webrtc/src/rtp_transceiver/rtp_sender/mod.rs index 4de0a0bb9..9b8585bfe 100644 --- a/webrtc/src/rtp_transceiver/rtp_sender/mod.rs +++ b/webrtc/src/rtp_transceiver/rtp_sender/mod.rs @@ -5,17 +5,20 @@ use std::sync::atomic::Ordering; use std::sync::{Arc, Weak}; use ice::rand::generate_crypto_random_string; -use interceptor::stream_info::StreamInfo; +use interceptor::stream_info::{AssociatedStreamInfo, StreamInfo}; use interceptor::{Attributes, Interceptor, RTCPReader, RTPWriter}; use portable_atomic::AtomicBool; +use tokio::select; use tokio::sync::{watch, Mutex, Notify}; use util::sync::Mutex as SyncMutex; use super::srtp_writer_future::SequenceTransformer; +use super::RTCRtpRtxParameters; use crate::api::media_engine::MediaEngine; +use crate::api::setting_engine::SettingEngine; use crate::dtls_transport::RTCDtlsTransport; use crate::error::{Error, Result}; -use crate::rtp_transceiver::rtp_codec::RTPCodecType; +use crate::rtp_transceiver::rtp_codec::{codec_rtx_search, RTPCodecType}; use crate::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirection; use crate::rtp_transceiver::srtp_writer_future::SrtpWriterFuture; use crate::rtp_transceiver::{ @@ -39,6 +42,16 @@ pub(crate) struct TrackEncoding { pub(crate) context: Mutex, pub(crate) ssrc: SSRC, + + pub(crate) rtx: Option, +} + +pub(crate) struct RtxEncoding { + pub(crate) srtp_stream: Arc, + pub(crate) rtcp_interceptor: Arc, + pub(crate) stream_info: Mutex, + + pub(crate) ssrc: SSRC, } /// RTPSender allows an application to control how a given Track is encoded and transmitted to a remote peer @@ -54,12 +67,14 @@ pub struct RTCRtpSender { pub(crate) track_encodings: Mutex>, seq_trans: Arc, + rtx_seq_trans: Arc, pub(crate) transport: Arc, pub(crate) kind: RTPCodecType, pub(crate) payload_type: PayloadType, receive_mtu: usize, + enable_rtx: bool, /// a transceiver sender since we can just check the /// transceiver negotiation status @@ -97,11 +112,11 @@ impl std::fmt::Debug for RTCRtpSender { impl RTCRtpSender { pub async fn new( - receive_mtu: usize, track: Option>, kind: RTPCodecType, transport: Arc, media_engine: Arc, + setting_engine: Arc, interceptor: Arc, start_paused: bool, ) -> Self { @@ -120,6 +135,7 @@ impl RTCRtpSender { }); let seq_trans = Arc::new(SequenceTransformer::new()); + let rtx_seq_trans = Arc::new(SequenceTransformer::new()); let stream_ids = track .as_ref() @@ -129,12 +145,14 @@ impl RTCRtpSender { track_encodings: Mutex::new(vec![]), seq_trans, + rtx_seq_trans, transport, kind, payload_type: 0, - receive_mtu, + receive_mtu: setting_engine.get_receive_mtu(), + enable_rtx: setting_engine.enable_sender_rtx, negotiated: AtomicBool::new(false), @@ -222,6 +240,41 @@ impl RTCRtpSender { let srtp_rtcp_reader = Arc::clone(&srtp_stream) as Arc; let rtcp_interceptor = self.interceptor.bind_rtcp_reader(srtp_rtcp_reader).await; + let create_rtx_stream = self.enable_rtx + && self + .media_engine + .get_codecs_by_kind(track.kind()) + .iter() + .any(|codec| { + matches!(codec.capability.mime_type.split_once("/"), Some((_, "rtx"))) + }); + + let rtx = if create_rtx_stream { + let ssrc = rand::random::(); + + let srtp_stream = Arc::new(SrtpWriterFuture { + closed: AtomicBool::new(false), + ssrc, + rtp_sender: Arc::downgrade(&self.internal), + rtp_transport: Arc::clone(&self.transport), + rtcp_read_stream: Mutex::new(None), + rtp_write_session: Mutex::new(None), + seq_trans: Arc::clone(&self.rtx_seq_trans), + }); + + let srtp_rtcp_reader = Arc::clone(&srtp_stream) as Arc; + let rtcp_interceptor = self.interceptor.bind_rtcp_reader(srtp_rtcp_reader).await; + + Some(RtxEncoding { + srtp_stream, + rtcp_interceptor, + stream_info: Mutex::new(StreamInfo::default()), + ssrc, + }) + } else { + None + }; + let encoding = TrackEncoding { track, srtp_stream, @@ -229,6 +282,7 @@ impl RTCRtpSender { stream_info: Mutex::new(StreamInfo::default()), context: Mutex::new(TrackLocalContext::default()), ssrc, + rtx, }; track_encodings.push(encoding); @@ -273,7 +327,9 @@ impl RTCRtpSender { rid: e.track.rid().unwrap_or_default().into(), ssrc: e.ssrc, payload_type: self.payload_type, - ..Default::default() + rtx: RTCRtpRtxParameters { + ssrc: e.rtx.as_ref().map(|e| e.ssrc).unwrap_or_default(), + }, }); } @@ -340,6 +396,7 @@ impl RTCRtpSender { } self.seq_trans.reset_offset(); + self.rtx_seq_trans.reset_offset(); let mid = self .rtp_transceiver @@ -427,8 +484,9 @@ impl RTCRtpSender { codec.payload_type, codec.capability.clone(), ¶meters.rtp_parameters.header_extensions, + None, ); - context.params.codecs = vec![codec]; + context.params.codecs = vec![codec.clone()]; let srtp_writer = Arc::clone(&encoding.srtp_stream) as Arc; let rtp_writer = self @@ -439,12 +497,66 @@ impl RTCRtpSender { *encoding.context.lock().await = context; *encoding.stream_info.lock().await = stream_info; *write_stream.interceptor_rtp_writer.lock().await = Some(rtp_writer); + + if let (Some(rtx), Some(rtx_codec)) = ( + &encoding.rtx, + codec_rtx_search(&codec, ¶meters.rtp_parameters.codecs), + ) { + let rtx_info = AssociatedStreamInfo { + ssrc: parameters.encodings[idx].ssrc, + payload_type: codec.payload_type, + }; + + let rtx_stream_info = create_stream_info( + self.id.clone(), + parameters.encodings[idx].rtx.ssrc, + rtx_codec.payload_type, + rtx_codec.capability.clone(), + ¶meters.rtp_parameters.header_extensions, + Some(rtx_info), + ); + + let rtx_srtp_writer = + Arc::clone(&rtx.srtp_stream) as Arc; + // ignore the rtp writer, only interceptors can write to the stream + self.interceptor + .bind_local_stream(&rtx_stream_info, rtx_srtp_writer) + .await; + + *rtx.stream_info.lock().await = rtx_stream_info; + + self.receive_rtcp_for_rtx(rtx.rtcp_interceptor.clone()); + } } self.send_called.send_replace(true); Ok(()) } + /// starts a routine that reads the rtx rtcp stream + /// These packets aren't exposed to the user, but we need to process them + /// for TWCC + fn receive_rtcp_for_rtx(&self, rtcp_reader: Arc) { + let receive_mtu = self.receive_mtu; + let stop_called_signal = self.internal.stop_called_signal.clone(); + let stop_called_rx = self.internal.stop_called_rx.clone(); + + tokio::spawn(async move { + let attrs = Attributes::new(); + let mut b = vec![0u8; receive_mtu]; + while !stop_called_signal.load(Ordering::SeqCst) { + select! { + r = rtcp_reader.read(&mut b, &attrs) => { + if r.is_err() { + break + } + }, + _ = stop_called_rx.notified() => break, + } + } + }); + } + /// stop irreversibly stops the RTPSender pub async fn stop(&self) -> Result<()> { if self.stop_called_signal.load(Ordering::SeqCst) { @@ -465,6 +577,13 @@ impl RTCRtpSender { self.interceptor.unbind_local_stream(&stream_info).await; encoding.srtp_stream.close().await?; + + if let Some(rtx) = &encoding.rtx { + let rtx_stream_info = rtx.stream_info.lock().await; + self.interceptor.unbind_local_stream(&rtx_stream_info).await; + + rtx.srtp_stream.close().await?; + } } Ok(()) @@ -543,7 +662,8 @@ impl RTCRtpSender { /// Errors if this [`RTCRtpSender`] has started to send data or sequence /// transforming has been already enabled. pub fn enable_seq_transformer(&self) -> Result<()> { - self.seq_trans.enable() + self.seq_trans.enable()?; + self.rtx_seq_trans.enable() } /// Will asynchronously block/wait until send() has been called diff --git a/webrtc/src/rtp_transceiver/rtp_sender/rtp_sender_test.rs b/webrtc/src/rtp_transceiver/rtp_sender/rtp_sender_test.rs index cdae29c4f..a947e888c 100644 --- a/webrtc/src/rtp_transceiver/rtp_sender/rtp_sender_test.rs +++ b/webrtc/src/rtp_transceiver/rtp_sender/rtp_sender_test.rs @@ -1,4 +1,7 @@ +use async_trait::async_trait; use bytes::Bytes; +use interceptor::registry::Registry; +use interceptor::InterceptorBuilder; use portable_atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -623,3 +626,173 @@ async fn test_rtp_sender_add_encoding() -> Result<()> { close_pair_now(&sender, &receiver).await; Ok(()) } + +#[derive(Debug)] +enum TestInterceptorEvent { + BindLocal(StreamInfo), + BindRemote(StreamInfo), + UnbindLocal(StreamInfo), + UnbindRemote(StreamInfo), +} + +#[derive(Clone)] +struct TestInterceptor(mpsc::UnboundedSender); + +#[async_trait] +impl Interceptor for TestInterceptor { + async fn bind_rtcp_reader( + &self, + reader: Arc, + ) -> Arc { + reader + } + + async fn bind_rtcp_writer( + &self, + writer: Arc, + ) -> Arc { + writer + } + + async fn bind_local_stream( + &self, + info: &StreamInfo, + writer: Arc, + ) -> Arc { + let _ = self.0.send(TestInterceptorEvent::BindLocal(info.clone())); + writer + } + + async fn unbind_local_stream(&self, info: &StreamInfo) { + let _ = self.0.send(TestInterceptorEvent::UnbindLocal(info.clone())); + } + + async fn bind_remote_stream( + &self, + info: &StreamInfo, + reader: Arc, + ) -> Arc { + let _ = self.0.send(TestInterceptorEvent::BindRemote(info.clone())); + reader + } + + async fn unbind_remote_stream(&self, info: &StreamInfo) { + let _ = self + .0 + .send(TestInterceptorEvent::UnbindRemote(info.clone())); + } + + async fn close(&self) -> std::result::Result<(), interceptor::Error> { + Ok(()) + } +} + +impl InterceptorBuilder for TestInterceptor { + fn build( + &self, + _id: &str, + ) -> std::result::Result, interceptor::Error> { + Ok(Arc::new(self.clone())) + } +} + +#[tokio::test] +async fn test_rtp_sender_rtx() -> Result<()> { + let mut s = SettingEngine::default(); + s.enable_sender_rtx(true); + + let (interceptor_tx, mut interceptor_rx) = mpsc::unbounded_channel(); + + let mut registry = Registry::new(); + registry.add(Box::new(TestInterceptor(interceptor_tx))); + + let mut m = MediaEngine::default(); + m.register_default_codecs()?; + // only register rtx for VP8 + m.register_codec( + RTCRtpCodecParameters { + capability: RTCRtpCodecCapability { + mime_type: "video/rtx".to_owned(), + clock_rate: 90000, + channels: 0, + sdp_fmtp_line: "apt=96".to_string(), + rtcp_feedback: vec![], + }, + payload_type: 97, + ..Default::default() + }, + RTPCodecType::Video, + )?; + + let api = APIBuilder::new() + .with_setting_engine(s) + .with_media_engine(m) + .with_interceptor_registry(registry) + .build(); + + let (mut offerer, mut answerer) = new_pair(&api).await?; + + let track_a = Arc::new(TrackLocalStaticSample::new( + RTCRtpCodecCapability { + mime_type: MIME_TYPE_VP8.to_owned(), + ..Default::default() + }, + "video".to_owned(), + "webrtc-rs".to_owned(), + )); + + let track_b = Arc::new(TrackLocalStaticSample::new( + RTCRtpCodecCapability { + mime_type: MIME_TYPE_H264.to_owned(), + ..Default::default() + }, + "video".to_owned(), + "webrtc-rs".to_owned(), + )); + + let rtp_sender_a = offerer + .add_track(Arc::clone(&track_a) as Arc) + .await?; + + let rtp_sender_b = offerer + .add_track(Arc::clone(&track_b) as Arc) + .await?; + + signal_pair(&mut offerer, &mut answerer).await?; + + // rtx enabled for vp8 + assert!(rtp_sender_a.track().await.is_some()); + assert!(rtp_sender_a.track_encodings.lock().await[0].rtx.is_some()); + + // no rtx for h264 + assert!(rtp_sender_b.track().await.is_some()); + assert!(rtp_sender_b.track_encodings.lock().await[0].rtx.is_some()); + + close_pair_now(&offerer, &answerer).await; + + let mut vp8_ssrcs = Vec::new(); + let mut h264_ssrcs = Vec::new(); + let mut rtx_associated_ssrcs = Vec::new(); + + // pair closed, all interceptor events should be buffered + while let Ok(event) = interceptor_rx.try_recv() { + if let TestInterceptorEvent::BindLocal(info) = event { + match info.mime_type.as_str() { + MIME_TYPE_VP8 => vp8_ssrcs.push(info.ssrc), + MIME_TYPE_H264 => h264_ssrcs.push(info.ssrc), + "video/rtx" => rtx_associated_ssrcs.push( + info.associated_stream + .expect("rtx without asscoiated stream") + .ssrc, + ), + mime => panic!("unexpected mime: {mime}"), + } + } + } + + assert_eq!(vp8_ssrcs.len(), 1); + assert_eq!(h264_ssrcs.len(), 1); + assert_eq!(rtx_associated_ssrcs, vp8_ssrcs); + + Ok(()) +}