Skip to content

Commit

Permalink
Merge pull request #77 from nttcom/refactor/log_levels
Browse files Browse the repository at this point in the history
serverのLog level修正
  • Loading branch information
yuki-uchida committed Sep 10, 2024
2 parents 242d6c7 + 85d938c commit 0fe58b5
Show file tree
Hide file tree
Showing 30 changed files with 213 additions and 159 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ openssl req -newkey rsa:2048 -nodes -keyout key.pem -x509 -out cert.pem -subj '/

#### Specify the log level

- `cargo run -p moqt-server-sample -- -log <Log Level>`
- `cargo run -p moqt-server-sample -- --log <Log Level>`
- Default setting is `DEBUG`

### Run moqt-client-sample
Expand Down
15 changes: 7 additions & 8 deletions moqt-core/src/modules/handlers/announce_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,8 @@ pub(crate) async fn announce_handler(
client: &mut MOQTClient,
track_namespace_manager_repository: &mut dyn TrackNamespaceManagerRepository,
) -> Result<AnnounceResponse> {
tracing::info!("announce_handler!");

tracing::info!(
"announce_handler: track_namespace: \"{}\" is announced by client: {}",
announce_message.track_namespace(),
client.id
);
tracing::trace!("announce_handler start.");
tracing::debug!("announce_message: {:#?}", announce_message);

// Record the announced Track Namespace
let set_result = track_namespace_manager_repository
Expand All @@ -34,12 +29,16 @@ pub(crate) async fn announce_handler(
match set_result {
Ok(_) => {
let track_namespace = announce_message.track_namespace().to_string();

tracing::info!("announced track_namespace: {:#?}", track_namespace);
tracing::trace!("announce_handler complete.");

Ok(AnnounceResponse::Success(AnnounceOk::new(
track_namespace.to_string(),
)))
}
Err(err) => {
tracing::info!("announce_handler: err: {:?}", err.to_string());
tracing::error!("announce_handler: err: {:?}", err.to_string());

Ok(AnnounceResponse::Failure(AnnounceError::new(
announce_message.track_namespace().to_string(),
Expand Down
22 changes: 12 additions & 10 deletions moqt-core/src/modules/handlers/object_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ pub(crate) async fn object_with_payload_length_handler(
track_namespace_manager_repository: &mut dyn TrackNamespaceManagerRepository,
send_stream_dispatcher_repository: &mut dyn SendStreamDispatcherRepository,
) -> Result<()> {
tracing::info!("object_with_payload_length_handler!");
tracing::trace!("object_with_payload_length_handler start.");

tracing::info!(
"object_with_payload_length_handler: track_id: \"{}\"",
object_with_payload_length_message.track_id()
tracing::debug!(
"object_with_payload_length_message: {:#?}",
object_with_payload_length_message
);

// Use track_id to determine the subscriber
Expand All @@ -35,7 +35,7 @@ pub(crate) async fn object_with_payload_length_handler(
for session_id in session_ids.iter() {
let message: Box<dyn MOQTPayload> =
Box::new(object_with_payload_length_message.clone());
tracing::info!(
tracing::debug!(
"message: {:#?} is sent to relay handler for client {:?}",
object_with_payload_length_message,
session_id
Expand All @@ -55,6 +55,7 @@ pub(crate) async fn object_with_payload_length_handler(
}
}
}
tracing::trace!("object_with_payload_length_handler complete.");
result
}
None => {
Expand All @@ -69,11 +70,11 @@ pub(crate) async fn object_without_payload_length_handler(
track_namespace_manager_repository: &mut dyn TrackNamespaceManagerRepository,
send_stream_dispatcher_repository: &mut dyn SendStreamDispatcherRepository,
) -> Result<()> {
tracing::info!("object_without_payload_length_handler!");
tracing::trace!("object_without_payload_length_handler start.");

tracing::info!(
"object_without_payload_length_handler: track_id: \"{}\"",
object_without_payload_length_message.track_id()
tracing::debug!(
"object_without_payload_length_message: {:#?}",
object_without_payload_length_message
);

// Use track_id to determine the subscriber
Expand All @@ -88,7 +89,7 @@ pub(crate) async fn object_without_payload_length_handler(
for session_id in session_ids.iter() {
let message: Box<dyn MOQTPayload> =
Box::new(object_without_payload_length_message.clone());
tracing::info!(
tracing::debug!(
"message: {:#?} is sent to relay handler for client {:?}",
object_without_payload_length_message,
session_id
Expand All @@ -108,6 +109,7 @@ pub(crate) async fn object_without_payload_length_handler(
}
}
}
tracing::trace!("object_without_payload_length_handler complete.");
result
}
None => {
Expand Down
10 changes: 6 additions & 4 deletions moqt-core/src/modules/handlers/server_setup_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ pub(crate) fn setup_handler(
underlay_type: UnderlayType,
client: &mut MOQTClient,
) -> Result<ServerSetup> {
tracing::info!("setup_handler");
tracing::trace!("setup_handler start.");

tracing::info!(
tracing::debug!("client_setup_message: {:#?}", client_setup_message);

tracing::debug!(
"supported_versions: {:#x?}",
client_setup_message.supported_versions
);
Expand All @@ -44,7 +46,7 @@ pub(crate) fn setup_handler(
}
}
SetupParameter::Unknown(v) => {
tracing::info!("Ignore unknown SETUP parameter {}", v);
tracing::warn!("Ignore unknown SETUP parameter {}", v);
}
}
}
Expand All @@ -61,7 +63,7 @@ pub(crate) fn setup_handler(
// State: Connected -> Setup
client.update_status(MOQTClientStatus::SetUp);

tracing::info!("setup_handler completed. {:#?}", client);
tracing::trace!("setup_handler complete.");

Ok(server_setup_message)
}
Expand Down
27 changes: 16 additions & 11 deletions moqt-core/src/modules/handlers/subscribe_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,10 @@ pub(crate) async fn subscribe_handler(
track_namespace_manager_repository: &mut dyn TrackNamespaceManagerRepository,
send_stream_dispatcher_repository: &mut dyn SendStreamDispatcherRepository,
) -> Result<()> {
tracing::info!("subscribe_handler!");
tracing::trace!("subscribe_handler start.");

tracing::debug!("subscribe_message: {:#?}", subscribe_message);

tracing::info!(
"subscribe_handler: track_namespace: \"{}\"",
subscribe_message.track_namespace()
);
tracing::info!(
"subscribe_handler: track_name: \"{}\"",
subscribe_message.track_name()
);
// Since only the track_namespace is recorded in ANNOUNCE, use track_namespace to determine the publisher
let publisher_session_id = track_namespace_manager_repository
.get_publisher_session_id_by_track_namespace(subscribe_message.track_namespace())
Expand All @@ -48,7 +42,7 @@ pub(crate) async fn subscribe_handler(
}
// Notify the publisher about the SUBSCRIBE message
let message: Box<dyn MOQTPayload> = Box::new(subscribe_message.clone());
tracing::info!(
tracing::debug!(
"message: {:#?} is sent to relay handler for client {:?}",
subscribe_message,
session_id
Expand All @@ -58,7 +52,18 @@ pub(crate) async fn subscribe_handler(
.send_message_to_send_stream_thread(session_id, message, StreamType::Bi)
.await
{
Ok(_) => Ok(()),
Ok(_) => {
tracing::info!(
"subscribed track_namespace: {:?}",
subscribe_message.track_namespace(),
);
tracing::info!(
"subscribed track_name: {:?}",
subscribe_message.track_name()
);
tracing::trace!("subscribe_handler complete.");
Ok(())
}
Err(e) => Err(anyhow::anyhow!("relay subscribe failed: {:?}", e)),
}
}
Expand Down
19 changes: 5 additions & 14 deletions moqt-core/src/modules/handlers/subscribe_ok_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,10 @@ pub(crate) async fn subscribe_ok_handler(
track_namespace_manager_repository: &mut dyn TrackNamespaceManagerRepository,
send_stream_dispatcher_repository: &mut dyn SendStreamDispatcherRepository,
) -> Result<()> {
tracing::info!("subscribe_ok_handler!");
tracing::trace!("subscribe_ok_handler start.");

tracing::debug!("subscribe_ok_message: {:#?}", subscribe_ok_message);

tracing::info!(
"subscribe_ok_handler: track_namespace: \"{}\"",
subscribe_ok_message.track_namespace()
);
tracing::info!(
"subscribe_ok_handler: track_name: \"{}\"",
subscribe_ok_message.track_name()
);
tracing::info!(
"subscribe_ok_handler: track_id: \"{}\"",
subscribe_ok_message.track_id()
);
// Determine the SUBSCRIBER who sent the SUBSCRIBE using the track_namespace and track_name
let subscriber_session_ids = track_namespace_manager_repository
.get_subscriber_session_ids_by_track_namespace_and_track_name(
Expand All @@ -43,7 +33,7 @@ pub(crate) async fn subscribe_ok_handler(
// Notify all waiting subscribers with the SUBSCRIBE_OK message
for session_id in session_ids.iter() {
let message: Box<dyn MOQTPayload> = Box::new(subscribe_ok_message.clone());
tracing::info!(
tracing::debug!(
"message: {:#?} is sent to relay handler for client {:?}",
subscribe_ok_message,
session_id
Expand Down Expand Up @@ -79,6 +69,7 @@ pub(crate) async fn subscribe_ok_handler(
}
}
}
tracing::trace!("subscribe_ok_handler complete.");
result
}
None => Err(anyhow::anyhow!("waiting subscriber session ids not found")),
Expand Down
14 changes: 7 additions & 7 deletions moqt-core/src/modules/handlers/unannounce_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,9 @@ pub(crate) async fn unannounce_handler(
_client: &mut MOQTClient, // TODO: Not implemented yet
track_namespace_manager_repository: &mut dyn TrackNamespaceManagerRepository,
) -> Result<()> {
tracing::info!("unannounce_handler!");
tracing::trace!("unannounce_handler start.");

tracing::info!(
"unannounce_handler: track_namespace: \"{}\"",
unannounce_message.track_namespace()
);
tracing::debug!("unannounce_message: {:#?}", unannounce_message);

// Remove the announced Track Namespace
let delete_result = track_namespace_manager_repository
Expand All @@ -27,9 +24,12 @@ pub(crate) async fn unannounce_handler(

match delete_result {
// TODO: Notify connected clients that unannouncing has occurred
Ok(_) => Ok(()),
Ok(_) => {
tracing::trace!("unannounce_handler complete.");
Ok(())
}
Err(err) => {
tracing::info!("unannounce_handler: err: {:?}", err.to_string());
tracing::error!("unannounce_handler: err: {:?}", err.to_string());

Ok(())
}
Expand Down
13 changes: 3 additions & 10 deletions moqt-core/src/modules/handlers/unsubscribe_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,12 @@ pub(crate) async fn _unsubscribe_handler(
_client: &mut MOQTClient, // TODO: Not implemented yet
_track_namespace_manager_repository: &mut dyn TrackNamespaceManagerRepository, // TODO: Not implemented yet
) -> Result<UnSubscribeResponse> {
tracing::info!("unsubscribe_handler!");

tracing::info!(
"unsubscribe_handler: track_namespace: \"{}\"",
unsubscribe_message.track_namespace()
);
tracing::info!(
"unsubscribe_handler: track_name: \"{}\"",
unsubscribe_message.track_name()
);
tracing::trace!("unsubscribe_handler start.");

tracing::debug!("unsubscribe_message: {:#?}", unsubscribe_message);
// TODO: Remove unsubscribe information

// FIXME: tmp
tracing::trace!("unsubscribe_handler complete.");
Ok(UnSubscribeResponse::Success)
}
24 changes: 12 additions & 12 deletions moqt-core/src/modules/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub enum StreamType {
Bi,
}

#[derive(Debug)]
pub enum MessageProcessResult {
Success(BytesMut),
SuccessWithoutResponse,
Expand All @@ -52,7 +53,6 @@ fn read_message_type(read_cur: &mut std::io::Cursor<&[u8]>) -> Result<MessageTyp
Ok(message_type)
}

#[tracing::instrument(name="StableID",skip_all,fields(id=client.id()))]
pub async fn message_handler(
read_buf: &mut BytesMut,
stream_type: StreamType,
Expand All @@ -61,32 +61,32 @@ pub async fn message_handler(
track_namespace_manager_repository: &mut dyn TrackNamespaceManagerRepository,
send_stream_dispatcher_repository: &mut dyn SendStreamDispatcherRepository,
) -> MessageProcessResult {
tracing::info!("message_handler! {}", read_buf.len());
tracing::trace!("message_handler! {}", read_buf.len());

let mut read_cur = Cursor::new(&read_buf[..]);
tracing::info!("read_cur! {:?}", read_cur);
tracing::debug!("read_cur! {:?}", read_cur);

// Read the message type
let message_type = match read_message_type(&mut read_cur) {
Ok(v) => v,
Err(err) => {
read_buf.advance(read_cur.position() as usize);

tracing::info!("message_type is wrong {:?}", err);
tracing::error!("message_type is wrong {:?}", err);
return MessageProcessResult::Failure(
TerminationErrorCode::GenericError,
err.to_string(),
);
}
};
tracing::info!("Message Type: {:?}", message_type);
tracing::info!("Received Message Type: {:?}", message_type);
if message_type.is_setup_message() {
// Setup message must be sent on bidirectional stream
if stream_type == StreamType::Uni {
read_buf.advance(read_cur.position() as usize);

let message = String::from("Setup message must be sent on bidirectional stream");
tracing::info!(message);
tracing::debug!(message);
return MessageProcessResult::Failure(TerminationErrorCode::GenericError, message);
}
} else if message_type.is_control_message() {
Expand All @@ -97,14 +97,14 @@ pub async fn message_handler(
read_buf.advance(read_cur.position() as usize);

let message = String::from("Object message must be sent on unidirectional stream");
tracing::info!(message);
tracing::debug!(message);
return MessageProcessResult::Failure(TerminationErrorCode::ProtocolViolation, message);
}
}

if read_cur.remaining() == 0 {
// The length is insufficient, so do nothing. Do not synchronize with the cursor.
tracing::info!("fragmented {}", read_buf.len());
tracing::error!("fragmented {}", read_buf.len());
return MessageProcessResult::Fragment;
}

Expand Down Expand Up @@ -226,8 +226,7 @@ pub async fn message_handler(
let unsubscribe_message = UnAnnounce::depacketize(&mut payload_buf);

if let Err(err) = unsubscribe_message {
// fix
tracing::info!("{:#?}", err);
tracing::error!("{:#?}", err);
return MessageProcessResult::Failure(
TerminationErrorCode::GenericError,
err.to_string(),
Expand Down Expand Up @@ -284,7 +283,7 @@ pub async fn message_handler(
let unannounce_message = UnAnnounce::depacketize(&mut payload_buf);

if let Err(err) = unannounce_message {
tracing::info!("{:#?}", err);
tracing::error!("{:#?}", err);
return MessageProcessResult::Failure(
TerminationErrorCode::GenericError,
err.to_string(),
Expand Down Expand Up @@ -312,13 +311,14 @@ pub async fn message_handler(
}
};

tracing::info!("Return Message Type: {:?}", return_message_type.clone());
let mut message_buf = BytesMut::with_capacity(write_buf.len() + 8);
// Add type
message_buf.extend(write_variable_integer(u8::from(return_message_type) as u64));
// Add payload
message_buf.extend(write_buf);

tracing::info!("message_buf: {:#x?}", message_buf);
tracing::debug!("message_buf: {:#x?}", message_buf);

MessageProcessResult::Success(message_buf)
}
2 changes: 1 addition & 1 deletion moqt-core/src/modules/message_type.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use num_enum::{IntoPrimitive, TryFromPrimitive};

#[derive(Debug, PartialEq, Eq, TryFromPrimitive, IntoPrimitive)]
#[derive(Debug, PartialEq, Eq, TryFromPrimitive, IntoPrimitive, Clone)]
#[repr(u8)]
pub enum MessageType {
ObjectWithPayloadLength = 0x00,
Expand Down
Loading

0 comments on commit 0fe58b5

Please sign in to comment.