From 7b521953b10093bcfc1bd2fa618be3029b52b700 Mon Sep 17 00:00:00 2001 From: tetta maeda Date: Mon, 9 Sep 2024 17:59:49 +0900 Subject: [PATCH 1/4] fix: typo in log level cmd --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index b2fe054..346a498 100644 --- a/README.md +++ b/README.md @@ -54,7 +54,7 @@ openssl req -newkey rsa:2048 -nodes -keyout key.pem -x509 -out cert.pem -subj '/ #### Specify the log level -- `cargo run -p moqt-server-sample -- -log ` +- `cargo run -p moqt-server-sample -- --log ` - Default setting is `DEBUG` ### Run moqt-client-sample From de2f1c42216632a0b207f26cb9df857ab7fb1a48 Mon Sep 17 00:00:00 2001 From: tetta maeda Date: Mon, 9 Sep 2024 19:25:54 +0900 Subject: [PATCH 2/4] refactor: log level --- .../src/modules/handlers/announce_handler.rs | 15 ++- .../src/modules/handlers/object_handler.rs | 22 ++-- .../modules/handlers/server_setup_handler.rs | 10 +- .../src/modules/handlers/subscribe_handler.rs | 27 +++-- .../modules/handlers/subscribe_ok_handler.rs | 19 +--- .../modules/handlers/unannounce_handler.rs | 14 +-- .../modules/handlers/unsubscribe_handler.rs | 13 +-- moqt-core/src/modules/message_handler.rs | 24 ++-- moqt-core/src/modules/message_type.rs | 2 +- .../src/modules/messages/announce_message.rs | 3 +- .../modules/messages/client_setup_message.rs | 8 +- .../src/modules/messages/setup_parameters.rs | 2 +- .../messages/subscribe_request_message.rs | 2 +- .../messages/version_specific_parameters.rs | 2 +- .../server_processes/object_message.rs | 4 +- .../server_processes/subscribe_ok_message.rs | 2 +- .../subscribe_request_message.rs | 2 +- moqt-server/src/lib.rs | 103 +++++++++++------- moqt-server/src/modules/buffer_manager.rs | 10 +- .../src/modules/send_stream_dispatcher.rs | 10 +- .../src/modules/track_namespace_manager.rs | 18 +-- 21 files changed, 167 insertions(+), 145 deletions(-) diff --git a/moqt-core/src/modules/handlers/announce_handler.rs b/moqt-core/src/modules/handlers/announce_handler.rs index d559932..0d75238 100644 --- a/moqt-core/src/modules/handlers/announce_handler.rs +++ b/moqt-core/src/modules/handlers/announce_handler.rs @@ -21,13 +21,8 @@ pub(crate) async fn announce_handler( client: &mut MOQTClient, track_namespace_manager_repository: &mut dyn TrackNamespaceManagerRepository, ) -> Result { - tracing::info!("announce_handler!"); - - tracing::info!( - "announce_handler: track_namespace: \"{}\" is announced by client: {}", - announce_message.track_namespace(), - client.id - ); + tracing::trace!("announce_handler start."); + tracing::debug!("announce_message: {:#?}", announce_message); // Record the announced Track Namespace let set_result = track_namespace_manager_repository @@ -37,12 +32,16 @@ pub(crate) async fn announce_handler( match set_result { Ok(_) => { let track_namespace = announce_message.track_namespace().to_string(); + + tracing::info!("announced track_namespace: {:#?}", track_namespace); + tracing::trace!("announce_handler complete."); + Ok(AnnounceResponse::Success(AnnounceOk::new( track_namespace.to_string(), ))) } Err(err) => { - tracing::info!("announce_handler: err: {:?}", err.to_string()); + tracing::warn!("announce_handler: err: {:?}", err.to_string()); Ok(AnnounceResponse::Failure(AnnounceError::new( announce_message.track_namespace().to_string(), diff --git a/moqt-core/src/modules/handlers/object_handler.rs b/moqt-core/src/modules/handlers/object_handler.rs index 5bb0b7a..c346381 100644 --- a/moqt-core/src/modules/handlers/object_handler.rs +++ b/moqt-core/src/modules/handlers/object_handler.rs @@ -15,11 +15,11 @@ pub(crate) async fn object_with_payload_length_handler( track_namespace_manager_repository: &mut dyn TrackNamespaceManagerRepository, send_stream_dispatcher_repository: &mut dyn SendStreamDispatcherRepository, ) -> Result<()> { - tracing::info!("object_with_payload_length_handler!"); + tracing::trace!("object_with_payload_length_handler start."); - tracing::info!( - "object_with_payload_length_handler: track_id: \"{}\"", - object_with_payload_length_message.track_id() + tracing::debug!( + "object_with_payload_length_message: {:#?}", + object_with_payload_length_message ); // Use track_id to determine the subscriber @@ -35,7 +35,7 @@ pub(crate) async fn object_with_payload_length_handler( for session_id in session_ids.iter() { let message: Box = Box::new(object_with_payload_length_message.clone()); - tracing::info!( + tracing::debug!( "message: {:#?} is sent to relay handler for client {:?}", object_with_payload_length_message, session_id @@ -55,6 +55,7 @@ pub(crate) async fn object_with_payload_length_handler( } } } + tracing::trace!("object_with_payload_length_handler complete."); result } None => { @@ -69,11 +70,11 @@ pub(crate) async fn object_without_payload_length_handler( track_namespace_manager_repository: &mut dyn TrackNamespaceManagerRepository, send_stream_dispatcher_repository: &mut dyn SendStreamDispatcherRepository, ) -> Result<()> { - tracing::info!("object_without_payload_length_handler!"); + tracing::trace!("object_without_payload_length_handler start."); - tracing::info!( - "object_without_payload_length_handler: track_id: \"{}\"", - object_without_payload_length_message.track_id() + tracing::debug!( + "object_without_payload_length_message: {:#?}", + object_without_payload_length_message ); // Use track_id to determine the subscriber @@ -88,7 +89,7 @@ pub(crate) async fn object_without_payload_length_handler( for session_id in session_ids.iter() { let message: Box = Box::new(object_without_payload_length_message.clone()); - tracing::info!( + tracing::debug!( "message: {:#?} is sent to relay handler for client {:?}", object_without_payload_length_message, session_id @@ -108,6 +109,7 @@ pub(crate) async fn object_without_payload_length_handler( } } } + tracing::trace!("object_without_payload_length_handler complete."); result } None => { diff --git a/moqt-core/src/modules/handlers/server_setup_handler.rs b/moqt-core/src/modules/handlers/server_setup_handler.rs index 0552c33..7cd461d 100644 --- a/moqt-core/src/modules/handlers/server_setup_handler.rs +++ b/moqt-core/src/modules/handlers/server_setup_handler.rs @@ -18,9 +18,11 @@ pub(crate) fn setup_handler( underlay_type: UnderlayType, client: &mut MOQTClient, ) -> Result { - tracing::info!("setup_handler"); + tracing::trace!("setup_handler start."); - tracing::info!( + tracing::debug!("client_setup_message: {:#?}", client_setup_message); + + tracing::debug!( "supported_versions: {:#x?}", client_setup_message.supported_versions ); @@ -44,7 +46,7 @@ pub(crate) fn setup_handler( } } SetupParameter::Unknown(v) => { - tracing::info!("Ignore unknown SETUP parameter {}", v); + tracing::warn!("Ignore unknown SETUP parameter {}", v); } } } @@ -61,7 +63,7 @@ pub(crate) fn setup_handler( // State: Connected -> Setup client.update_status(MOQTClientStatus::SetUp); - tracing::info!("setup_handler completed. {:#?}", client); + tracing::trace!("setup_handler complete."); Ok(server_setup_message) } diff --git a/moqt-core/src/modules/handlers/subscribe_handler.rs b/moqt-core/src/modules/handlers/subscribe_handler.rs index 5bc682a..9fc5713 100644 --- a/moqt-core/src/modules/handlers/subscribe_handler.rs +++ b/moqt-core/src/modules/handlers/subscribe_handler.rs @@ -16,16 +16,10 @@ pub(crate) async fn subscribe_handler( track_namespace_manager_repository: &mut dyn TrackNamespaceManagerRepository, send_stream_dispatcher_repository: &mut dyn SendStreamDispatcherRepository, ) -> Result<()> { - tracing::info!("subscribe_handler!"); + tracing::trace!("subscribe_handler start."); + + tracing::debug!("subscribe_message: {:#?}", subscribe_message); - tracing::info!( - "subscribe_handler: track_namespace: \"{}\"", - subscribe_message.track_namespace() - ); - tracing::info!( - "subscribe_handler: track_name: \"{}\"", - subscribe_message.track_name() - ); // Since only the track_namespace is recorded in ANNOUNCE, use track_namespace to determine the publisher let publisher_session_id = track_namespace_manager_repository .get_publisher_session_id_by_track_namespace(subscribe_message.track_namespace()) @@ -48,7 +42,7 @@ pub(crate) async fn subscribe_handler( } // Notify the publisher about the SUBSCRIBE message let message: Box = Box::new(subscribe_message.clone()); - tracing::info!( + tracing::debug!( "message: {:#?} is sent to relay handler for client {:?}", subscribe_message, session_id @@ -58,7 +52,18 @@ pub(crate) async fn subscribe_handler( .send_message_to_send_stream_thread(session_id, message, StreamType::Bi) .await { - Ok(_) => Ok(()), + Ok(_) => { + tracing::info!( + "subscribed track_namespace: {:?}", + subscribe_message.track_namespace(), + ); + tracing::info!( + "subscribed track_name: {:?}", + subscribe_message.track_name() + ); + tracing::trace!("subscribe_handler complete."); + Ok(()) + } Err(e) => Err(anyhow::anyhow!("relay subscribe failed: {:?}", e)), } } diff --git a/moqt-core/src/modules/handlers/subscribe_ok_handler.rs b/moqt-core/src/modules/handlers/subscribe_ok_handler.rs index a81cc01..dee2997 100644 --- a/moqt-core/src/modules/handlers/subscribe_ok_handler.rs +++ b/moqt-core/src/modules/handlers/subscribe_ok_handler.rs @@ -15,20 +15,10 @@ pub(crate) async fn subscribe_ok_handler( track_namespace_manager_repository: &mut dyn TrackNamespaceManagerRepository, send_stream_dispatcher_repository: &mut dyn SendStreamDispatcherRepository, ) -> Result<()> { - tracing::info!("subscribe_ok_handler!"); + tracing::trace!("subscribe_ok_handler start."); + + tracing::debug!("subscribe_ok_message: {:#?}", subscribe_ok_message); - tracing::info!( - "subscribe_ok_handler: track_namespace: \"{}\"", - subscribe_ok_message.track_namespace() - ); - tracing::info!( - "subscribe_ok_handler: track_name: \"{}\"", - subscribe_ok_message.track_name() - ); - tracing::info!( - "subscribe_ok_handler: track_id: \"{}\"", - subscribe_ok_message.track_id() - ); // Determine the SUBSCRIBER who sent the SUBSCRIBE using the track_namespace and track_name let subscriber_session_ids = track_namespace_manager_repository .get_subscriber_session_ids_by_track_namespace_and_track_name( @@ -43,7 +33,7 @@ pub(crate) async fn subscribe_ok_handler( // Notify all waiting subscribers with the SUBSCRIBE_OK message for session_id in session_ids.iter() { let message: Box = Box::new(subscribe_ok_message.clone()); - tracing::info!( + tracing::debug!( "message: {:#?} is sent to relay handler for client {:?}", subscribe_ok_message, session_id @@ -79,6 +69,7 @@ pub(crate) async fn subscribe_ok_handler( } } } + tracing::trace!("subscribe_ok_handler complete."); result } None => Err(anyhow::anyhow!("waiting subscriber session ids not found")), diff --git a/moqt-core/src/modules/handlers/unannounce_handler.rs b/moqt-core/src/modules/handlers/unannounce_handler.rs index 8aa77b1..995371e 100644 --- a/moqt-core/src/modules/handlers/unannounce_handler.rs +++ b/moqt-core/src/modules/handlers/unannounce_handler.rs @@ -13,12 +13,9 @@ pub(crate) async fn unannounce_handler( _client: &mut MOQTClient, // TODO: Not implemented yet track_namespace_manager_repository: &mut dyn TrackNamespaceManagerRepository, ) -> Result<()> { - tracing::info!("unannounce_handler!"); + tracing::trace!("unannounce_handler start."); - tracing::info!( - "unannounce_handler: track_namespace: \"{}\"", - unannounce_message.track_namespace() - ); + tracing::debug!("unannounce_message: {:#?}", unannounce_message); // Remove the announced Track Namespace let delete_result = track_namespace_manager_repository @@ -27,9 +24,12 @@ pub(crate) async fn unannounce_handler( match delete_result { // TODO: Notify connected clients that unannouncing has occurred - Ok(_) => Ok(()), + Ok(_) => { + tracing::trace!("unannounce_handler complete."); + Ok(()) + } Err(err) => { - tracing::info!("unannounce_handler: err: {:?}", err.to_string()); + tracing::error!("unannounce_handler: err: {:?}", err.to_string()); Ok(()) } diff --git a/moqt-core/src/modules/handlers/unsubscribe_handler.rs b/moqt-core/src/modules/handlers/unsubscribe_handler.rs index bcb8cc1..998329e 100644 --- a/moqt-core/src/modules/handlers/unsubscribe_handler.rs +++ b/moqt-core/src/modules/handlers/unsubscribe_handler.rs @@ -18,19 +18,12 @@ pub(crate) async fn _unsubscribe_handler( _client: &mut MOQTClient, // TODO: Not implemented yet _track_namespace_manager_repository: &mut dyn TrackNamespaceManagerRepository, // TODO: Not implemented yet ) -> Result { - tracing::info!("unsubscribe_handler!"); - - tracing::info!( - "unsubscribe_handler: track_namespace: \"{}\"", - unsubscribe_message.track_namespace() - ); - tracing::info!( - "unsubscribe_handler: track_name: \"{}\"", - unsubscribe_message.track_name() - ); + tracing::trace!("unsubscribe_handler start."); + tracing::debug!("unsubscribe_message: {:#?}", unsubscribe_message); // TODO: Remove unsubscribe information // FIXME: tmp + tracing::trace!("unsubscribe_handler complete."); Ok(UnSubscribeResponse::Success) } diff --git a/moqt-core/src/modules/message_handler.rs b/moqt-core/src/modules/message_handler.rs index c65a0a1..588e632 100644 --- a/moqt-core/src/modules/message_handler.rs +++ b/moqt-core/src/modules/message_handler.rs @@ -28,6 +28,7 @@ pub enum StreamType { Bi, } +#[derive(Debug)] pub enum MessageProcessResult { Success(BytesMut), SuccessWithoutResponse, @@ -52,7 +53,6 @@ fn read_message_type(read_cur: &mut std::io::Cursor<&[u8]>) -> Result MessageProcessResult { - tracing::info!("message_handler! {}", read_buf.len()); + tracing::trace!("message_handler! {}", read_buf.len()); let mut read_cur = Cursor::new(&read_buf[..]); - tracing::info!("read_cur! {:?}", read_cur); + tracing::debug!("read_cur! {:?}", read_cur); // Read the message type let message_type = match read_message_type(&mut read_cur) { @@ -72,21 +72,21 @@ pub async fn message_handler( Err(err) => { read_buf.advance(read_cur.position() as usize); - tracing::info!("message_type is wrong {:?}", err); + tracing::error!("message_type is wrong {:?}", err); return MessageProcessResult::Failure( TerminationErrorCode::GenericError, err.to_string(), ); } }; - tracing::info!("Message Type: {:?}", message_type); + tracing::info!("Received Message Type: {:?}", message_type); if message_type.is_setup_message() { // Setup message must be sent on bidirectional stream if stream_type == StreamType::Uni { read_buf.advance(read_cur.position() as usize); let message = String::from("Setup message must be sent on bidirectional stream"); - tracing::info!(message); + tracing::debug!(message); return MessageProcessResult::Failure(TerminationErrorCode::GenericError, message); } } else if message_type.is_control_message() { @@ -97,14 +97,14 @@ pub async fn message_handler( read_buf.advance(read_cur.position() as usize); let message = String::from("Object message must be sent on unidirectional stream"); - tracing::info!(message); + tracing::debug!(message); return MessageProcessResult::Failure(TerminationErrorCode::ProtocolViolation, message); } } if read_cur.remaining() == 0 { // The length is insufficient, so do nothing. Do not synchronize with the cursor. - tracing::info!("fragmented {}", read_buf.len()); + tracing::error!("fragmented {}", read_buf.len()); return MessageProcessResult::Fragment; } @@ -226,8 +226,7 @@ pub async fn message_handler( let unsubscribe_message = UnAnnounce::depacketize(&mut payload_buf); if let Err(err) = unsubscribe_message { - // fix - tracing::info!("{:#?}", err); + tracing::error!("{:#?}", err); return MessageProcessResult::Failure( TerminationErrorCode::GenericError, err.to_string(), @@ -284,7 +283,7 @@ pub async fn message_handler( let unannounce_message = UnAnnounce::depacketize(&mut payload_buf); if let Err(err) = unannounce_message { - tracing::info!("{:#?}", err); + tracing::error!("{:#?}", err); return MessageProcessResult::Failure( TerminationErrorCode::GenericError, err.to_string(), @@ -312,13 +311,14 @@ pub async fn message_handler( } }; + tracing::info!("Return Message Type: {:?}", return_message_type.clone()); let mut message_buf = BytesMut::with_capacity(write_buf.len() + 8); // Add type message_buf.extend(write_variable_integer(u8::from(return_message_type) as u64)); // Add payload message_buf.extend(write_buf); - tracing::info!("message_buf: {:#x?}", message_buf); + tracing::debug!("message_buf: {:#x?}", message_buf); MessageProcessResult::Success(message_buf) } diff --git a/moqt-core/src/modules/message_type.rs b/moqt-core/src/modules/message_type.rs index 874229f..7fe1244 100644 --- a/moqt-core/src/modules/message_type.rs +++ b/moqt-core/src/modules/message_type.rs @@ -1,6 +1,6 @@ use num_enum::{IntoPrimitive, TryFromPrimitive}; -#[derive(Debug, PartialEq, Eq, TryFromPrimitive, IntoPrimitive)] +#[derive(Debug, PartialEq, Eq, TryFromPrimitive, IntoPrimitive, Clone)] #[repr(u8)] pub enum MessageType { ObjectWithPayloadLength = 0x00, diff --git a/moqt-core/src/modules/messages/announce_message.rs b/moqt-core/src/modules/messages/announce_message.rs index f2f21e5..2bd9dd1 100644 --- a/moqt-core/src/modules/messages/announce_message.rs +++ b/moqt-core/src/modules/messages/announce_message.rs @@ -40,7 +40,7 @@ impl Announce { impl MOQTPayload for Announce { fn depacketize(buf: &mut bytes::BytesMut) -> Result { let read_cur = Cursor::new(&buf[..]); - tracing::info!("read_cur! {:?}", read_cur); + tracing::debug!("read_cur! {:?}", read_cur); let track_namespace = String::from_utf8(read_variable_bytes_from_buffer(buf)?).context("track namespace")?; let number_of_parameters = u8::try_from(read_variable_integer_from_buffer(buf)?) @@ -56,7 +56,6 @@ impl MOQTPayload for Announce { number_of_parameters, parameters, }; - tracing::info!("announce_message! {:?}", announce_message); Ok(announce_message) } diff --git a/moqt-core/src/modules/messages/client_setup_message.rs b/moqt-core/src/modules/messages/client_setup_message.rs index 0ca46c5..a3d7bf1 100644 --- a/moqt-core/src/modules/messages/client_setup_message.rs +++ b/moqt-core/src/modules/messages/client_setup_message.rs @@ -26,7 +26,7 @@ impl MOQTPayload for ClientSetup { fn depacketize(buf: &mut bytes::BytesMut) -> Result { let number_of_supported_versions = u8::try_from(read_variable_integer_from_buffer(buf)?) .context("number of supported versions")?; - tracing::debug!( + tracing::trace!( "Depacketized client setup message number_of_supported_versions: {:#?}", number_of_supported_versions ); @@ -37,14 +37,14 @@ impl MOQTPayload for ClientSetup { .context("supported version")?; supported_versions.push(supported_version); } - tracing::debug!( + tracing::trace!( "Depacketized client setup message supported_versions: {:#?}", supported_versions ); let number_of_parameters = u8::try_from(read_variable_integer_from_buffer(buf)?) .context("number of parameters")?; - tracing::debug!( + tracing::trace!( "Depacketized client setup message number_of_parameters: {:#?}", number_of_parameters ); @@ -53,7 +53,7 @@ impl MOQTPayload for ClientSetup { for _ in 0..number_of_parameters { setup_parameters.push(SetupParameter::depacketize(buf)?); } - tracing::debug!( + tracing::trace!( "Depacketized client setup message setup_parameters: {:#?}", setup_parameters ); diff --git a/moqt-core/src/modules/messages/setup_parameters.rs b/moqt-core/src/modules/messages/setup_parameters.rs index ba375a6..144fe00 100644 --- a/moqt-core/src/modules/messages/setup_parameters.rs +++ b/moqt-core/src/modules/messages/setup_parameters.rs @@ -20,7 +20,7 @@ impl MOQTPayload for SetupParameter { read_variable_integer_from_buffer(buf).context("key")?, )?); if let Err(err) = key { - tracing::info!("Unknown SETUP parameter {:#04x}", err.number); + tracing::warn!("Unknown SETUP parameter {:#04x}", err.number); return Ok(SetupParameter::Unknown(err.number)); } diff --git a/moqt-core/src/modules/messages/subscribe_request_message.rs b/moqt-core/src/modules/messages/subscribe_request_message.rs index 8ed6a56..aec4e74 100644 --- a/moqt-core/src/modules/messages/subscribe_request_message.rs +++ b/moqt-core/src/modules/messages/subscribe_request_message.rs @@ -73,7 +73,7 @@ impl MOQTPayload for SubscribeRequest { for _ in 0..number_of_parameters { let version_specific_parameter = VersionSpecificParameter::depacketize(buf)?; if let VersionSpecificParameter::Unknown(code) = version_specific_parameter { - tracing::info!("unknown track request parameter {}", code); + tracing::warn!("unknown track request parameter {}", code); } else { // NOTE: // According to "6.1.1. Version Specific Parameters", the parameters used diff --git a/moqt-core/src/modules/messages/version_specific_parameters.rs b/moqt-core/src/modules/messages/version_specific_parameters.rs index 7d63f86..1f1eba1 100644 --- a/moqt-core/src/modules/messages/version_specific_parameters.rs +++ b/moqt-core/src/modules/messages/version_specific_parameters.rs @@ -32,7 +32,7 @@ impl MOQTPayload for VersionSpecificParameter { if let Err(err) = parameter_type { // If it appears in some other type of message, it MUST be ignored. - tracing::info!("Unknown version specific parameter {:#04x}", err.number); + tracing::warn!("Unknown version specific parameter {:#04x}", err.number); return Ok(VersionSpecificParameter::Unknown(err.number)); } diff --git a/moqt-core/src/modules/server_processes/object_message.rs b/moqt-core/src/modules/server_processes/object_message.rs index 0b270fe..4d4ad49 100644 --- a/moqt-core/src/modules/server_processes/object_message.rs +++ b/moqt-core/src/modules/server_processes/object_message.rs @@ -27,7 +27,7 @@ pub(crate) async fn process_object_with_payload_length( let object_message = match ObjectWithPayloadLength::depacketize(payload_buf) { Ok(object_message) => object_message, Err(err) => { - tracing::info!("{:#?}", err); + tracing::error!("{:#?}", err); bail!(err.to_string()); } }; @@ -55,7 +55,7 @@ pub(crate) async fn process_object_without_payload_length( let object_message = match ObjectWithoutPayloadLength::depacketize(payload_buf) { Ok(object_message) => object_message, Err(err) => { - tracing::info!("{:#?}", err); + tracing::error!("{:#?}", err); bail!(err.to_string()); } }; diff --git a/moqt-core/src/modules/server_processes/subscribe_ok_message.rs b/moqt-core/src/modules/server_processes/subscribe_ok_message.rs index 9a9c152..9bfd132 100644 --- a/moqt-core/src/modules/server_processes/subscribe_ok_message.rs +++ b/moqt-core/src/modules/server_processes/subscribe_ok_message.rs @@ -22,7 +22,7 @@ pub(crate) async fn process_subscribe_ok_message( let subscribe_ok_message = match SubscribeOk::depacketize(payload_buf) { Ok(subscribe_ok_message) => subscribe_ok_message, Err(err) => { - tracing::info!("{:#?}", err); + tracing::error!("{:#?}", err); bail!(err.to_string()); } }; diff --git a/moqt-core/src/modules/server_processes/subscribe_request_message.rs b/moqt-core/src/modules/server_processes/subscribe_request_message.rs index 04eff33..7ef6f95 100644 --- a/moqt-core/src/modules/server_processes/subscribe_request_message.rs +++ b/moqt-core/src/modules/server_processes/subscribe_request_message.rs @@ -22,7 +22,7 @@ pub(crate) async fn process_subscribe_message( let subscribe_request_message = match SubscribeRequest::depacketize(payload_buf) { Ok(subscribe_request_message) => subscribe_request_message, Err(err) => { - tracing::info!("{:#?}", err); + tracing::error!("{:#?}", err); bail!(err.to_string()); } }; diff --git a/moqt-server/src/lib.rs b/moqt-server/src/lib.rs index 94a2a66..6625a50 100644 --- a/moqt-server/src/lib.rs +++ b/moqt-server/src/lib.rs @@ -136,6 +136,8 @@ impl MOQT { let track_namespace_tx = track_namespace_tx.clone(); let send_stream_tx = send_stream_tx.clone(); let incoming_session = server.accept().await; + let connection_span = tracing::info_span!("Connection", id); + // Create a thread for each session tokio::spawn( handle_connection( @@ -144,7 +146,7 @@ impl MOQT { send_stream_tx, incoming_session, ) - .instrument(tracing::info_span!("Connection", id)), + .instrument(connection_span), ); } @@ -174,7 +176,7 @@ async fn handle_connection_impl( send_stream_tx: mpsc::Sender, incoming_session: IncomingSession, ) -> Result<()> { - tracing::info!("Waiting for session request..."); + tracing::trace!("Waiting for session request..."); let session_request = incoming_session.await?; @@ -187,12 +189,12 @@ async fn handle_connection_impl( let connection = session_request.accept().await?; let stable_id = connection.stable_id(); - let span = tracing::info_span!("sid", stable_id); - let _guard = span.enter(); - let client = Arc::new(Mutex::new(MOQTClient::new(stable_id))); - tracing::info!("Waiting for data from client..."); + let session_span = tracing::info_span!("Session", stable_id); + session_span.in_scope(|| { + tracing::info!("Waiting for data from client..."); + }); let (close_tx, mut close_rx) = mpsc::channel::<(u64, String)>(32); @@ -214,20 +216,19 @@ async fn handle_connection_impl( stream = connection.accept_bi() => { if is_control_stream_opened { // Only 1 control stream is allowed - tracing::info!("Control stream already opened"); + tracing::error!("Control stream already opened"); close_tx.send((u8::from(constants::TerminationErrorCode::ProtocolViolation) as u64, "Control stream already opened".to_string())).await?; break; } is_control_stream_opened = true; - let span = tracing::info_span!("sid", stable_id); - - let stream = stream?; - - span.in_scope(|| { + let session_span = tracing::info_span!("Session", stable_id); + session_span.in_scope(|| { tracing::info!("Accepted BI stream"); }); + let stream = stream?; + // The send_stream is wrapped with a Mutex to make it thread-safe since it can be called from multiple threads for returning and relaying messages. let (send_stream, recv_stream) = stream; let shread_send_stream = Arc::new(Mutex::new(send_stream)); @@ -249,6 +250,7 @@ async fn handle_connection_impl( // Thread that listens for WebTransport messages let send_stream = Arc::clone(&shread_send_stream); + let session_span_clone = session_span.clone(); tokio::spawn(async move { let mut stream = BiStream { stable_id, @@ -256,23 +258,28 @@ async fn handle_connection_impl( recv_stream, shread_send_stream: send_stream, }; - handle_incoming_bi_stream(&mut stream, client, buffer_tx, track_namespace_tx, close_tx, send_stream_tx).await - }); + handle_incoming_bi_stream(&mut stream, client, buffer_tx, track_namespace_tx, close_tx, send_stream_tx).instrument(session_span_clone).await + + // Propagate the current span (Connection) + }.in_current_span()); let send_stream = Arc::clone(&shread_send_stream); // Thread to relay messages (ANNOUNCE SUBSCRIBE) from the server tokio::spawn(async move { - wait_and_relay_control_message(send_stream, message_rx).await; - }); + let session_span = tracing::info_span!("Session", stable_id); + wait_and_relay_control_message(send_stream, message_rx).instrument(session_span).await; + + // Propagate the current span (Connection) + }.in_current_span()); }, // Waiting for a uni-directional recv stream and processing the received message stream = connection.accept_uni() => { let recv_stream = stream.unwrap(); - let span = tracing::info_span!("sid", stable_id); // TODO: Not implemented yet - span.in_scope(|| { - tracing::info!("Accepted UNI stream"); + let session_span = tracing::info_span!("Session", stable_id); // TODO: Not implemented yet + session_span.in_scope(|| { + tracing::info!("Accepted UNI Recv stream"); }); let stream_id = recv_stream.id().into_u64(); @@ -287,16 +294,23 @@ async fn handle_connection_impl( stream_id, recv_stream, }; - let _ = handle_incoming_uni_stream(&mut stream, client, buffer_tx, track_namespace_tx, close_tx, send_stream_tx).await; + let _ = handle_incoming_uni_stream(&mut stream, client, buffer_tx, track_namespace_tx, close_tx, send_stream_tx) + .instrument(session_span) + .await; }, // Waiting for a uni-directional relay request and relaying the message Some(message) = uni_relay_rx.recv() => { + + let session_span = tracing::info_span!("Session", stable_id); + session_span.in_scope(|| { + tracing::info!("Open UNI Send stream"); + }); // A sender MUST send each object over a dedicated stream. let send_stream = connection.open_uni().await?.await?; // Send relayed messages (OBJECT) from the server - relay_object_message(send_stream, message).await; + relay_object_message(send_stream, message).instrument(session_span).await; }, _ = connection.closed() => { tracing::info!("Connection closed, rtt={:?}", connection.rtt()); @@ -304,7 +318,7 @@ async fn handle_connection_impl( }, // TODO: Not implemented yet Some((_code, _reason)) = close_rx.recv() => { - tracing::info!("close channel received"); + tracing::error!("Close channel received"); // FIXME: I want to close the connection, but VarInt is not exported, so I'll leave it as is // Maybe it's in wtransport-proto? // connection.close(VarInt) @@ -348,13 +362,12 @@ async fn handle_incoming_uni_stream( let mut send_stream_dispatcher = modules::send_stream_dispatcher::RelayHandlerManager::new(send_stream_tx.clone()); - let span = tracing::info_span!("sid", stable_id); - let bytes_read = match recv_stream.read(&mut buffer).instrument(span).await? { + let bytes_read = match recv_stream.read(&mut buffer).await? { Some(bytes_read) => bytes_read, None => return Err(anyhow::anyhow!("Failed to read from stream")), }; - tracing::info!("bytes_read: {}", bytes_read); + tracing::debug!("bytes_read: {}", bytes_read); let read_buf = BytesMut::from(&buffer[..bytes_read]); let buf = buffer_manager::request_buffer(buffer_tx.clone(), stable_id, stream_id).await; @@ -373,10 +386,10 @@ async fn handle_incoming_uni_stream( ) .await; + tracing::debug!("message_result: {:?}", message_result); + match message_result { - MessageProcessResult::SuccessWithoutResponse => { - tracing::info!("SuccessWithoutResponse"); - } + MessageProcessResult::SuccessWithoutResponse => {} MessageProcessResult::Failure(code, message) => { close_tx .send((u8::from(code) as u64, message.clone())) @@ -434,13 +447,12 @@ async fn handle_incoming_bi_stream( modules::send_stream_dispatcher::RelayHandlerManager::new(send_stream_tx.clone()); loop { - let span = tracing::info_span!("sid", stable_id); - let bytes_read = match recv_stream.read(&mut buffer).instrument(span).await? { + let bytes_read = match recv_stream.read(&mut buffer).await? { Some(bytes_read) => bytes_read, None => break, }; - tracing::info!("bytes_read: {}", bytes_read); + tracing::debug!("bytes_read: {}", bytes_read); let read_buf = BytesMut::from(&buffer[..bytes_read]); let buf = buffer_manager::request_buffer(buffer_tx.clone(), stable_id, stream_id).await; @@ -459,15 +471,17 @@ async fn handle_incoming_bi_stream( ) .await; + tracing::debug!("message_result: {:?}", message_result); + match message_result { MessageProcessResult::Success(buf) => { let mut shread_send_stream = shread_send_stream.lock().await; shread_send_stream.write_all(&buf).await?; - tracing::info!("sent {:x?}", buf.to_vec()); - } - MessageProcessResult::SuccessWithoutResponse => { - tracing::info!("SuccessWithoutResponse"); + + tracing::info!("Message is sent."); + tracing::debug!("sent message: {:x?}", buf.to_vec()); } + MessageProcessResult::SuccessWithoutResponse => {} MessageProcessResult::Failure(code, message) => { close_tx.send((u8::from(code) as u64, message)).await?; break; @@ -499,6 +513,10 @@ async fn relay_object_message(mut send_stream: SendStream, message: Arc() @@ -507,6 +525,10 @@ async fn relay_object_message(mut send_stream: SendStream, message: Arc().is_some() { message_buf.extend(write_variable_integer( u8::from(MessageType::SubscribeOk) as u64 )); + tracing::info!("Relayed Message Type: {:?}", MessageType::SubscribeOk); } else if message.as_any().downcast_ref::().is_some() { message_buf.extend(write_variable_integer( u8::from(MessageType::SubscribeError) as u64, )); + tracing::info!("Relayed Message Type: {:?}", MessageType::SubscribeError); } else { tracing::error!("Unsupported message type for bi-directional stream"); continue; } message_buf.extend(write_buf); - tracing::info!("message relayed: {:?}", message_buf.to_vec()); let mut shread_send_stream = send_stream.lock().await; if let Err(e) = shread_send_stream.write_all(&message_buf).await { tracing::error!("Failed to write to stream: {:?}", e); break; } + + tracing::info!("Control message is relayed."); + tracing::debug!("relayed message: {:?}", message_buf.to_vec()); } } diff --git a/moqt-server/src/modules/buffer_manager.rs b/moqt-server/src/modules/buffer_manager.rs index 2143cd3..708130a 100644 --- a/moqt-server/src/modules/buffer_manager.rs +++ b/moqt-server/src/modules/buffer_manager.rs @@ -6,7 +6,7 @@ use BufferCommand::*; // Called as a separate thread pub(crate) async fn buffer_manager(rx: &mut mpsc::Receiver) { - tracing::info!("buffer_manager start"); + tracing::trace!("buffer_manager start"); // { // "${session_id}" : { @@ -16,7 +16,7 @@ pub(crate) async fn buffer_manager(rx: &mut mpsc::Receiver) { let mut buffers = HashMap::>::new(); while let Some(cmd) = rx.recv().await { - tracing::info!("command received"); + tracing::debug!("command received: {:#?}", cmd); match cmd { Get { session_id, @@ -58,7 +58,7 @@ pub(crate) async fn buffer_manager(rx: &mut mpsc::Receiver) { } } - tracing::info!("buffer_manager end"); + tracing::trace!("buffer_manager end"); } #[derive(Debug)] @@ -85,7 +85,7 @@ pub(crate) async fn request_buffer( session_id: usize, stream_id: u64, ) -> BufferType { - tracing::info!("request_buffer start"); + tracing::trace!("request_buffer start"); let (resp_tx, resp_rx) = oneshot::channel::(); @@ -98,7 +98,7 @@ pub(crate) async fn request_buffer( let buf = resp_rx.await.unwrap(); - tracing::info!("request_buffer end"); + tracing::trace!("request_buffer end"); buf } diff --git a/moqt-server/src/modules/send_stream_dispatcher.rs b/moqt-server/src/modules/send_stream_dispatcher.rs index 7f13ef6..f4f7f79 100644 --- a/moqt-server/src/modules/send_stream_dispatcher.rs +++ b/moqt-server/src/modules/send_stream_dispatcher.rs @@ -12,7 +12,7 @@ type SenderToSendStreamThread = mpsc::Sender>>; use SendStreamDispatchCommand::*; // Called as a separate thread pub(crate) async fn send_stream_dispatcher(rx: &mut mpsc::Receiver) { - tracing::info!("send_stream_dispatcher start"); + tracing::trace!("send_stream_dispatcher start"); // { // "${session_id}" : { // "unidirectional_stream" : tx, @@ -22,7 +22,7 @@ pub(crate) async fn send_stream_dispatcher(rx: &mut mpsc::Receiver>::new(); while let Some(cmd) = rx.recv().await { - tracing::info!("command received"); + tracing::debug!("command received: {:#?}", cmd); match cmd { Set { session_id, @@ -31,7 +31,7 @@ pub(crate) async fn send_stream_dispatcher(rx: &mut mpsc::Receiver { let inner_map = relay_senders.entry(session_id).or_default(); inner_map.insert(stream_type.to_string(), sender); - tracing::info!("set: {:?}", relay_senders); + tracing::debug!("set: {:?}", relay_senders); } List { stream_type, @@ -60,11 +60,13 @@ pub(crate) async fn send_stream_dispatcher(rx: &mut mpsc::Receiver) { - tracing::info!("track_namespace_manager start"); + tracing::trace!("track_namespace_manager start"); // TrackNamespaces // { @@ -332,7 +332,7 @@ pub(crate) async fn track_namespace_manager(rx: &mut mpsc::Receiver match namespaces.set_publisher(track_namespace, publisher_session_id) { Ok(_) => resp.send(true).unwrap(), Err(err) => { - tracing::info!("set_publisher: err: {:?}", err.to_string()); + tracing::error!("set_publisher: err: {:?}", err.to_string()); resp.send(false).unwrap(); } }, @@ -351,7 +351,7 @@ pub(crate) async fn track_namespace_manager(rx: &mut mpsc::Receiver match namespaces.delete_publisher(track_namespace) { Ok(_) => resp.send(true).unwrap(), Err(err) => { - tracing::info!("set_publisher: err: {:?}", err.to_string()); + tracing::error!("set_publisher: err: {:?}", err.to_string()); resp.send(false).unwrap(); } }, @@ -380,7 +380,7 @@ pub(crate) async fn track_namespace_manager(rx: &mut mpsc::Receiver resp.send(true).unwrap(), Err(err) => { - tracing::info!("set_subscriber: err: {:?}", err.to_string()); + tracing::error!("set_subscriber: err: {:?}", err.to_string()); resp.send(false).unwrap(); } } @@ -397,7 +397,7 @@ pub(crate) async fn track_namespace_manager(rx: &mut mpsc::Receiver resp.send(true).unwrap(), Err(err) => { - tracing::info!("delete_subscriber: err: {:?}", err.to_string()); + tracing::error!("delete_subscriber: err: {:?}", err.to_string()); resp.send(false).unwrap(); } }, @@ -409,7 +409,7 @@ pub(crate) async fn track_namespace_manager(rx: &mut mpsc::Receiver match namespaces.set_track_id(track_namespace, track_name, track_id) { Ok(_) => resp.send(true).unwrap(), Err(err) => { - tracing::info!("set_track_id: err: {:?}", err.to_string()); + tracing::error!("set_track_id: err: {:?}", err.to_string()); resp.send(false).unwrap(); } }, @@ -427,7 +427,7 @@ pub(crate) async fn track_namespace_manager(rx: &mut mpsc::Receiver resp.send(true).unwrap(), Err(err) => { - tracing::info!("set_status: err: {:?}", err.to_string()); + tracing::error!("set_status: err: {:?}", err.to_string()); resp.send(false).unwrap(); } }, @@ -450,7 +450,7 @@ pub(crate) async fn track_namespace_manager(rx: &mut mpsc::Receiver Date: Mon, 9 Sep 2024 19:54:33 +0900 Subject: [PATCH 3/4] refactor: add trace log --- .../messages/announce_error_message.rs | 4 ++++ .../src/modules/messages/announce_message.rs | 4 ++++ .../modules/messages/announce_ok_message.rs | 4 ++++ .../modules/messages/client_setup_message.rs | 20 ++++--------------- .../src/modules/messages/object_message.rs | 8 ++++++++ .../modules/messages/server_setup_message.rs | 4 ++++ .../messages/subscribe_error_message.rs | 4 ++++ .../modules/messages/subscribe_ok_message.rs | 4 ++++ .../messages/subscribe_request_message.rs | 4 ++++ .../modules/messages/unannounce_message.rs | 6 +++++- .../modules/messages/unsubscribe_message.rs | 4 ++++ 11 files changed, 49 insertions(+), 17 deletions(-) diff --git a/moqt-core/src/modules/messages/announce_error_message.rs b/moqt-core/src/modules/messages/announce_error_message.rs index 12f090c..2932880 100644 --- a/moqt-core/src/modules/messages/announce_error_message.rs +++ b/moqt-core/src/modules/messages/announce_error_message.rs @@ -41,6 +41,8 @@ impl MOQTPayload for AnnounceError { let reason_phrase = String::from_utf8(read_variable_bytes_from_buffer(buf)?).context("reason phrase")?; + tracing::trace!("Depacketized Announce Error message."); + Ok(AnnounceError { track_namespace, error_code, @@ -66,6 +68,8 @@ impl MOQTPayload for AnnounceError { buf.extend(write_variable_bytes( &self.reason_phrase.as_bytes().to_vec(), )); + + tracing::trace!("Packetized Announce Error message."); } /// Method to enable downcasting from MOQTPayload to AnnounceError fn as_any(&self) -> &dyn Any { diff --git a/moqt-core/src/modules/messages/announce_message.rs b/moqt-core/src/modules/messages/announce_message.rs index 2bd9dd1..facf54d 100644 --- a/moqt-core/src/modules/messages/announce_message.rs +++ b/moqt-core/src/modules/messages/announce_message.rs @@ -57,6 +57,8 @@ impl MOQTPayload for Announce { parameters, }; + tracing::trace!("Depacketized Announce message."); + Ok(announce_message) } @@ -79,6 +81,8 @@ impl MOQTPayload for Announce { for param in &self.parameters { param.packetize(buf); } + + tracing::trace!("Packetized Announce message."); } /// Method to enable downcasting from MOQTPayload to Announce fn as_any(&self) -> &dyn Any { diff --git a/moqt-core/src/modules/messages/announce_ok_message.rs b/moqt-core/src/modules/messages/announce_ok_message.rs index 7220f24..446e17b 100644 --- a/moqt-core/src/modules/messages/announce_ok_message.rs +++ b/moqt-core/src/modules/messages/announce_ok_message.rs @@ -24,6 +24,8 @@ impl MOQTPayload for AnnounceOk { let track_namespace = String::from_utf8(read_variable_bytes_from_buffer(buf)?).context("track namespace")?; + tracing::trace!("Depacketized Announce OK message."); + Ok(AnnounceOk { track_namespace }) } @@ -32,6 +34,8 @@ impl MOQTPayload for AnnounceOk { buf.extend(write_variable_bytes( &self.track_namespace.as_bytes().to_vec(), )); + + tracing::trace!("Packetized Announce OK message."); } /// Method to enable downcasting from MOQTPayload to AnnounceOk fn as_any(&self) -> &dyn Any { diff --git a/moqt-core/src/modules/messages/client_setup_message.rs b/moqt-core/src/modules/messages/client_setup_message.rs index a3d7bf1..77d6507 100644 --- a/moqt-core/src/modules/messages/client_setup_message.rs +++ b/moqt-core/src/modules/messages/client_setup_message.rs @@ -26,10 +26,6 @@ impl MOQTPayload for ClientSetup { fn depacketize(buf: &mut bytes::BytesMut) -> Result { let number_of_supported_versions = u8::try_from(read_variable_integer_from_buffer(buf)?) .context("number of supported versions")?; - tracing::trace!( - "Depacketized client setup message number_of_supported_versions: {:#?}", - number_of_supported_versions - ); let mut supported_versions = Vec::with_capacity(number_of_supported_versions as usize); for _ in 0..number_of_supported_versions { @@ -37,26 +33,14 @@ impl MOQTPayload for ClientSetup { .context("supported version")?; supported_versions.push(supported_version); } - tracing::trace!( - "Depacketized client setup message supported_versions: {:#?}", - supported_versions - ); let number_of_parameters = u8::try_from(read_variable_integer_from_buffer(buf)?) .context("number of parameters")?; - tracing::trace!( - "Depacketized client setup message number_of_parameters: {:#?}", - number_of_parameters - ); let mut setup_parameters = vec![]; for _ in 0..number_of_parameters { setup_parameters.push(SetupParameter::depacketize(buf)?); } - tracing::trace!( - "Depacketized client setup message setup_parameters: {:#?}", - setup_parameters - ); let client_setup_message = ClientSetup { number_of_supported_versions, @@ -65,6 +49,8 @@ impl MOQTPayload for ClientSetup { setup_parameters, }; + tracing::trace!("Depacketized Client Setup message."); + Ok(client_setup_message) } @@ -88,6 +74,8 @@ impl MOQTPayload for ClientSetup { for setup_parameter in &self.setup_parameters { setup_parameter.packetize(buf); } + + tracing::trace!("Packetized Client Setup message."); } /// Method to enable downcasting from MOQTPayload to ClientSetup fn as_any(&self) -> &dyn Any { diff --git a/moqt-core/src/modules/messages/object_message.rs b/moqt-core/src/modules/messages/object_message.rs index 996f699..7803f18 100644 --- a/moqt-core/src/modules/messages/object_message.rs +++ b/moqt-core/src/modules/messages/object_message.rs @@ -66,6 +66,8 @@ impl MOQTPayload for ObjectWithPayloadLength { read_fixed_length_bytes_from_buffer(buf, object_payload_length as usize) .context("object payload")?; + tracing::trace!("Depacketized Object With Payload Length message."); + Ok(ObjectWithPayloadLength { track_id, group_sequence, @@ -83,6 +85,8 @@ impl MOQTPayload for ObjectWithPayloadLength { buf.extend(write_variable_integer(self.object_send_order)); buf.extend(write_variable_integer(self.object_payload_length)); buf.extend(write_variable_bytes(&self.object_payload)); + + tracing::trace!("Packetized Object With Payload Length message."); } /// Method to enable downcasting from MOQTPayload to ObjectWithPayloadLength fn as_any(&self) -> &dyn Any { @@ -137,6 +141,8 @@ impl MOQTPayload for ObjectWithoutPayloadLength { let object_payload = read_variable_bytes_to_end_from_buffer(buf).context("object payload")?; + tracing::trace!("Depacketized Object Without Payload Length message."); + Ok(ObjectWithoutPayloadLength { track_id, group_sequence, @@ -152,6 +158,8 @@ impl MOQTPayload for ObjectWithoutPayloadLength { buf.extend(write_variable_integer(self.object_sequence)); buf.extend(write_variable_integer(self.object_send_order)); buf.extend(write_variable_bytes(&self.object_payload)); + + tracing::trace!("Packetized Object Without Payload Length message."); } /// Method to enable downcasting from MOQTPayload to ObjectWithoutPayloadLength fn as_any(&self) -> &dyn Any { diff --git a/moqt-core/src/modules/messages/server_setup_message.rs b/moqt-core/src/modules/messages/server_setup_message.rs index 420ef1c..31d8822 100644 --- a/moqt-core/src/modules/messages/server_setup_message.rs +++ b/moqt-core/src/modules/messages/server_setup_message.rs @@ -43,6 +43,8 @@ impl MOQTPayload for ServerSetup { setup_parameters, }; + tracing::trace!("Depacketized Server Setup message."); + Ok(server_setup_message) } @@ -56,6 +58,8 @@ impl MOQTPayload for ServerSetup { for setup_parameter in self.setup_parameters.iter() { setup_parameter.packetize(buf); } + + tracing::trace!("Packetized Server Setup message."); } /// Method to enable downcasting from MOQTPayload to ServerSetup fn as_any(&self) -> &dyn Any { diff --git a/moqt-core/src/modules/messages/subscribe_error_message.rs b/moqt-core/src/modules/messages/subscribe_error_message.rs index ea9eb49..9465180 100644 --- a/moqt-core/src/modules/messages/subscribe_error_message.rs +++ b/moqt-core/src/modules/messages/subscribe_error_message.rs @@ -60,6 +60,8 @@ impl MOQTPayload for SubscribeError { let reason_phrase = String::from_utf8(read_variable_bytes_from_buffer(buf)?).context("reason phrase")?; + tracing::trace!("Depacketized Subscribe Error message."); + Ok(SubscribeError { track_namespace, track_name, @@ -79,6 +81,8 @@ impl MOQTPayload for SubscribeError { buf.extend(write_variable_bytes( &self.reason_phrase.as_bytes().to_vec(), )); + + tracing::trace!("Packetized Subscribe Error message."); } /// Method to enable downcasting from MOQTPayload to SubscribeError fn as_any(&self) -> &dyn Any { diff --git a/moqt-core/src/modules/messages/subscribe_ok_message.rs b/moqt-core/src/modules/messages/subscribe_ok_message.rs index 425e7bf..a6e8b90 100644 --- a/moqt-core/src/modules/messages/subscribe_ok_message.rs +++ b/moqt-core/src/modules/messages/subscribe_ok_message.rs @@ -55,6 +55,8 @@ impl MOQTPayload for SubscribeOk { let track_id = read_variable_integer_from_buffer(buf).context("track id")?; let expires = read_variable_integer_from_buffer(buf).context("expires")?; + tracing::trace!("Depacketized Subscribe OK message."); + Ok(SubscribeOk { track_namespace: full_track_namespace, track_name: full_track_name, @@ -70,6 +72,8 @@ impl MOQTPayload for SubscribeOk { buf.extend(write_variable_bytes(&self.track_name.as_bytes().to_vec())); buf.extend(write_variable_integer(self.track_id)); buf.extend(write_variable_integer(self.expires)); + + tracing::trace!("Packetized Subscribe OK message."); } /// Method to enable downcasting from MOQTPayload to SubscribeOk fn as_any(&self) -> &dyn Any { diff --git a/moqt-core/src/modules/messages/subscribe_request_message.rs b/moqt-core/src/modules/messages/subscribe_request_message.rs index aec4e74..f956244 100644 --- a/moqt-core/src/modules/messages/subscribe_request_message.rs +++ b/moqt-core/src/modules/messages/subscribe_request_message.rs @@ -86,6 +86,8 @@ impl MOQTPayload for SubscribeRequest { } } + tracing::trace!("Depacketized Subscribe message."); + Ok(SubscribeRequest { track_namespace, track_name, @@ -115,6 +117,8 @@ impl MOQTPayload for SubscribeRequest { for version_specific_parameter in &self.track_request_parameters { version_specific_parameter.packetize(buf); } + + tracing::trace!("Packetized Subscribe OK message."); } /// Method to enable downcasting from MOQTPayload to SubscribeRequest fn as_any(&self) -> &dyn Any { diff --git a/moqt-core/src/modules/messages/unannounce_message.rs b/moqt-core/src/modules/messages/unannounce_message.rs index ab3ba85..0201cdd 100644 --- a/moqt-core/src/modules/messages/unannounce_message.rs +++ b/moqt-core/src/modules/messages/unannounce_message.rs @@ -24,13 +24,17 @@ impl MOQTPayload for UnAnnounce { track_namespace: String::from_utf8(track_namespace)?, }; + tracing::trace!("Depacketized Unannounce message."); + Ok(unannounce_message) } fn packetize(&self, buf: &mut bytes::BytesMut) { buf.extend(write_variable_bytes( &self.track_namespace.as_bytes().to_vec(), - )) + )); + + tracing::trace!("Packetized Unannounce message."); } /// Method to enable downcasting from MOQTPayload to UnAnnounce fn as_any(&self) -> &dyn Any { diff --git a/moqt-core/src/modules/messages/unsubscribe_message.rs b/moqt-core/src/modules/messages/unsubscribe_message.rs index 73a8cb3..ff68103 100644 --- a/moqt-core/src/modules/messages/unsubscribe_message.rs +++ b/moqt-core/src/modules/messages/unsubscribe_message.rs @@ -42,6 +42,8 @@ impl MOQTPayload for Unsubscribe { track_name: String::from_utf8(track_name)?, }; + tracing::trace!("Depacketized Unsubscribe message."); + Ok(unsubscribe_message) } @@ -50,6 +52,8 @@ impl MOQTPayload for Unsubscribe { &self.track_namespace.as_bytes().to_vec(), )); buf.extend(write_variable_bytes(&self.track_name.as_bytes().to_vec())); + + tracing::trace!("Packetized Unsubscribe message."); } /// Method to enable downcasting from MOQTPayload to Unsubscribe fn as_any(&self) -> &dyn Any { From a5f422624f249115479b7d9f3e90d4441fe0bf58 Mon Sep 17 00:00:00 2001 From: tetta maeda Date: Mon, 9 Sep 2024 19:56:04 +0900 Subject: [PATCH 4/4] fix: wrong level --- moqt-core/src/modules/handlers/announce_handler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/moqt-core/src/modules/handlers/announce_handler.rs b/moqt-core/src/modules/handlers/announce_handler.rs index 0d75238..de1929d 100644 --- a/moqt-core/src/modules/handlers/announce_handler.rs +++ b/moqt-core/src/modules/handlers/announce_handler.rs @@ -41,7 +41,7 @@ pub(crate) async fn announce_handler( ))) } Err(err) => { - tracing::warn!("announce_handler: err: {:?}", err.to_string()); + tracing::error!("announce_handler: err: {:?}", err.to_string()); Ok(AnnounceResponse::Failure(AnnounceError::new( announce_message.track_namespace().to_string(),