Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

serverのLog level修正 #77

Merged
merged 5 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ openssl req -newkey rsa:2048 -nodes -keyout key.pem -x509 -out cert.pem -subj '/

#### Specify the log level

- `cargo run -p moqt-server-sample -- -log <Log Level>`
- `cargo run -p moqt-server-sample -- --log <Log Level>`
- Default setting is `DEBUG`

### Run moqt-client-sample
Expand Down
15 changes: 7 additions & 8 deletions moqt-core/src/modules/handlers/announce_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,8 @@ pub(crate) async fn announce_handler(
client: &mut MOQTClient,
track_namespace_manager_repository: &mut dyn TrackNamespaceManagerRepository,
) -> Result<AnnounceResponse> {
tracing::info!("announce_handler!");

tracing::info!(
"announce_handler: track_namespace: \"{}\" is announced by client: {}",
announce_message.track_namespace(),
client.id
);
tracing::trace!("announce_handler start.");
tracing::debug!("announce_message: {:#?}", announce_message);

// Record the announced Track Namespace
let set_result = track_namespace_manager_repository
Expand All @@ -34,12 +29,16 @@ pub(crate) async fn announce_handler(
match set_result {
Ok(_) => {
let track_namespace = announce_message.track_namespace().to_string();

tracing::info!("announced track_namespace: {:#?}", track_namespace);
tracing::trace!("announce_handler complete.");

Ok(AnnounceResponse::Success(AnnounceOk::new(
track_namespace.to_string(),
)))
}
Err(err) => {
tracing::info!("announce_handler: err: {:?}", err.to_string());
tracing::error!("announce_handler: err: {:?}", err.to_string());

Ok(AnnounceResponse::Failure(AnnounceError::new(
announce_message.track_namespace().to_string(),
Expand Down
22 changes: 12 additions & 10 deletions moqt-core/src/modules/handlers/object_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ pub(crate) async fn object_with_payload_length_handler(
track_namespace_manager_repository: &mut dyn TrackNamespaceManagerRepository,
send_stream_dispatcher_repository: &mut dyn SendStreamDispatcherRepository,
) -> Result<()> {
tracing::info!("object_with_payload_length_handler!");
tracing::trace!("object_with_payload_length_handler start.");

tracing::info!(
"object_with_payload_length_handler: track_id: \"{}\"",
object_with_payload_length_message.track_id()
tracing::debug!(
"object_with_payload_length_message: {:#?}",
object_with_payload_length_message
);

// Use track_id to determine the subscriber
Expand All @@ -35,7 +35,7 @@ pub(crate) async fn object_with_payload_length_handler(
for session_id in session_ids.iter() {
let message: Box<dyn MOQTPayload> =
Box::new(object_with_payload_length_message.clone());
tracing::info!(
tracing::debug!(
"message: {:#?} is sent to relay handler for client {:?}",
object_with_payload_length_message,
session_id
Expand All @@ -55,6 +55,7 @@ pub(crate) async fn object_with_payload_length_handler(
}
}
}
tracing::trace!("object_with_payload_length_handler complete.");
result
}
None => {
Expand All @@ -69,11 +70,11 @@ pub(crate) async fn object_without_payload_length_handler(
track_namespace_manager_repository: &mut dyn TrackNamespaceManagerRepository,
send_stream_dispatcher_repository: &mut dyn SendStreamDispatcherRepository,
) -> Result<()> {
tracing::info!("object_without_payload_length_handler!");
tracing::trace!("object_without_payload_length_handler start.");

tracing::info!(
"object_without_payload_length_handler: track_id: \"{}\"",
object_without_payload_length_message.track_id()
tracing::debug!(
"object_without_payload_length_message: {:#?}",
object_without_payload_length_message
);

// Use track_id to determine the subscriber
Expand All @@ -88,7 +89,7 @@ pub(crate) async fn object_without_payload_length_handler(
for session_id in session_ids.iter() {
let message: Box<dyn MOQTPayload> =
Box::new(object_without_payload_length_message.clone());
tracing::info!(
tracing::debug!(
"message: {:#?} is sent to relay handler for client {:?}",
object_without_payload_length_message,
session_id
Expand All @@ -108,6 +109,7 @@ pub(crate) async fn object_without_payload_length_handler(
}
}
}
tracing::trace!("object_without_payload_length_handler complete.");
result
}
None => {
Expand Down
10 changes: 6 additions & 4 deletions moqt-core/src/modules/handlers/server_setup_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ pub(crate) fn setup_handler(
underlay_type: UnderlayType,
client: &mut MOQTClient,
) -> Result<ServerSetup> {
tracing::info!("setup_handler");
tracing::trace!("setup_handler start.");

tracing::info!(
tracing::debug!("client_setup_message: {:#?}", client_setup_message);

tracing::debug!(
"supported_versions: {:#x?}",
client_setup_message.supported_versions
);
Expand All @@ -44,7 +46,7 @@ pub(crate) fn setup_handler(
}
}
SetupParameter::Unknown(v) => {
tracing::info!("Ignore unknown SETUP parameter {}", v);
tracing::warn!("Ignore unknown SETUP parameter {}", v);
}
}
}
Expand All @@ -61,7 +63,7 @@ pub(crate) fn setup_handler(
// State: Connected -> Setup
client.update_status(MOQTClientStatus::SetUp);

tracing::info!("setup_handler completed. {:#?}", client);
tracing::trace!("setup_handler complete.");

Ok(server_setup_message)
}
Expand Down
27 changes: 16 additions & 11 deletions moqt-core/src/modules/handlers/subscribe_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,10 @@ pub(crate) async fn subscribe_handler(
track_namespace_manager_repository: &mut dyn TrackNamespaceManagerRepository,
send_stream_dispatcher_repository: &mut dyn SendStreamDispatcherRepository,
) -> Result<()> {
tracing::info!("subscribe_handler!");
tracing::trace!("subscribe_handler start.");

tracing::debug!("subscribe_message: {:#?}", subscribe_message);

tracing::info!(
"subscribe_handler: track_namespace: \"{}\"",
subscribe_message.track_namespace()
);
tracing::info!(
"subscribe_handler: track_name: \"{}\"",
subscribe_message.track_name()
);
// Since only the track_namespace is recorded in ANNOUNCE, use track_namespace to determine the publisher
let publisher_session_id = track_namespace_manager_repository
.get_publisher_session_id_by_track_namespace(subscribe_message.track_namespace())
Expand All @@ -48,7 +42,7 @@ pub(crate) async fn subscribe_handler(
}
// Notify the publisher about the SUBSCRIBE message
let message: Box<dyn MOQTPayload> = Box::new(subscribe_message.clone());
tracing::info!(
tracing::debug!(
"message: {:#?} is sent to relay handler for client {:?}",
subscribe_message,
session_id
Expand All @@ -58,7 +52,18 @@ pub(crate) async fn subscribe_handler(
.send_message_to_send_stream_thread(session_id, message, StreamType::Bi)
.await
{
Ok(_) => Ok(()),
Ok(_) => {
tracing::info!(
"subscribed track_namespace: {:?}",
subscribe_message.track_namespace(),
);
tracing::info!(
"subscribed track_name: {:?}",
subscribe_message.track_name()
);
tracing::trace!("subscribe_handler complete.");
Ok(())
}
Err(e) => Err(anyhow::anyhow!("relay subscribe failed: {:?}", e)),
}
}
Expand Down
19 changes: 5 additions & 14 deletions moqt-core/src/modules/handlers/subscribe_ok_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,10 @@ pub(crate) async fn subscribe_ok_handler(
track_namespace_manager_repository: &mut dyn TrackNamespaceManagerRepository,
send_stream_dispatcher_repository: &mut dyn SendStreamDispatcherRepository,
) -> Result<()> {
tracing::info!("subscribe_ok_handler!");
tracing::trace!("subscribe_ok_handler start.");

tracing::debug!("subscribe_ok_message: {:#?}", subscribe_ok_message);

tracing::info!(
"subscribe_ok_handler: track_namespace: \"{}\"",
subscribe_ok_message.track_namespace()
);
tracing::info!(
"subscribe_ok_handler: track_name: \"{}\"",
subscribe_ok_message.track_name()
);
tracing::info!(
"subscribe_ok_handler: track_id: \"{}\"",
subscribe_ok_message.track_id()
);
// Determine the SUBSCRIBER who sent the SUBSCRIBE using the track_namespace and track_name
let subscriber_session_ids = track_namespace_manager_repository
.get_subscriber_session_ids_by_track_namespace_and_track_name(
Expand All @@ -43,7 +33,7 @@ pub(crate) async fn subscribe_ok_handler(
// Notify all waiting subscribers with the SUBSCRIBE_OK message
for session_id in session_ids.iter() {
let message: Box<dyn MOQTPayload> = Box::new(subscribe_ok_message.clone());
tracing::info!(
tracing::debug!(
"message: {:#?} is sent to relay handler for client {:?}",
subscribe_ok_message,
session_id
Expand Down Expand Up @@ -79,6 +69,7 @@ pub(crate) async fn subscribe_ok_handler(
}
}
}
tracing::trace!("subscribe_ok_handler complete.");
result
}
None => Err(anyhow::anyhow!("waiting subscriber session ids not found")),
Expand Down
14 changes: 7 additions & 7 deletions moqt-core/src/modules/handlers/unannounce_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,9 @@ pub(crate) async fn unannounce_handler(
_client: &mut MOQTClient, // TODO: Not implemented yet
track_namespace_manager_repository: &mut dyn TrackNamespaceManagerRepository,
) -> Result<()> {
tracing::info!("unannounce_handler!");
tracing::trace!("unannounce_handler start.");

tracing::info!(
"unannounce_handler: track_namespace: \"{}\"",
unannounce_message.track_namespace()
);
tracing::debug!("unannounce_message: {:#?}", unannounce_message);

// Remove the announced Track Namespace
let delete_result = track_namespace_manager_repository
Expand All @@ -27,9 +24,12 @@ pub(crate) async fn unannounce_handler(

match delete_result {
// TODO: Notify connected clients that unannouncing has occurred
Ok(_) => Ok(()),
Ok(_) => {
tracing::trace!("unannounce_handler complete.");
Ok(())
}
Err(err) => {
tracing::info!("unannounce_handler: err: {:?}", err.to_string());
tracing::error!("unannounce_handler: err: {:?}", err.to_string());

Ok(())
}
Expand Down
13 changes: 3 additions & 10 deletions moqt-core/src/modules/handlers/unsubscribe_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,12 @@ pub(crate) async fn _unsubscribe_handler(
_client: &mut MOQTClient, // TODO: Not implemented yet
_track_namespace_manager_repository: &mut dyn TrackNamespaceManagerRepository, // TODO: Not implemented yet
) -> Result<UnSubscribeResponse> {
tracing::info!("unsubscribe_handler!");

tracing::info!(
"unsubscribe_handler: track_namespace: \"{}\"",
unsubscribe_message.track_namespace()
);
tracing::info!(
"unsubscribe_handler: track_name: \"{}\"",
unsubscribe_message.track_name()
);
tracing::trace!("unsubscribe_handler start.");

tracing::debug!("unsubscribe_message: {:#?}", unsubscribe_message);
// TODO: Remove unsubscribe information

// FIXME: tmp
tracing::trace!("unsubscribe_handler complete.");
Ok(UnSubscribeResponse::Success)
}
24 changes: 12 additions & 12 deletions moqt-core/src/modules/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub enum StreamType {
Bi,
}

#[derive(Debug)]
pub enum MessageProcessResult {
Success(BytesMut),
SuccessWithoutResponse,
Expand All @@ -52,7 +53,6 @@ fn read_message_type(read_cur: &mut std::io::Cursor<&[u8]>) -> Result<MessageTyp
Ok(message_type)
}

#[tracing::instrument(name="StableID",skip_all,fields(id=client.id()))]
pub async fn message_handler(
read_buf: &mut BytesMut,
stream_type: StreamType,
Expand All @@ -61,32 +61,32 @@ pub async fn message_handler(
track_namespace_manager_repository: &mut dyn TrackNamespaceManagerRepository,
send_stream_dispatcher_repository: &mut dyn SendStreamDispatcherRepository,
) -> MessageProcessResult {
tracing::info!("message_handler! {}", read_buf.len());
tracing::trace!("message_handler! {}", read_buf.len());

let mut read_cur = Cursor::new(&read_buf[..]);
tracing::info!("read_cur! {:?}", read_cur);
tracing::debug!("read_cur! {:?}", read_cur);

// Read the message type
let message_type = match read_message_type(&mut read_cur) {
Ok(v) => v,
Err(err) => {
read_buf.advance(read_cur.position() as usize);

tracing::info!("message_type is wrong {:?}", err);
tracing::error!("message_type is wrong {:?}", err);
return MessageProcessResult::Failure(
TerminationErrorCode::GenericError,
err.to_string(),
);
}
};
tracing::info!("Message Type: {:?}", message_type);
tracing::info!("Received Message Type: {:?}", message_type);
if message_type.is_setup_message() {
// Setup message must be sent on bidirectional stream
if stream_type == StreamType::Uni {
read_buf.advance(read_cur.position() as usize);

let message = String::from("Setup message must be sent on bidirectional stream");
tracing::info!(message);
tracing::debug!(message);
return MessageProcessResult::Failure(TerminationErrorCode::GenericError, message);
}
} else if message_type.is_control_message() {
Expand All @@ -97,14 +97,14 @@ pub async fn message_handler(
read_buf.advance(read_cur.position() as usize);

let message = String::from("Object message must be sent on unidirectional stream");
tracing::info!(message);
tracing::debug!(message);
return MessageProcessResult::Failure(TerminationErrorCode::ProtocolViolation, message);
}
}

if read_cur.remaining() == 0 {
// The length is insufficient, so do nothing. Do not synchronize with the cursor.
tracing::info!("fragmented {}", read_buf.len());
tracing::error!("fragmented {}", read_buf.len());
return MessageProcessResult::Fragment;
}

Expand Down Expand Up @@ -226,8 +226,7 @@ pub async fn message_handler(
let unsubscribe_message = UnAnnounce::depacketize(&mut payload_buf);

if let Err(err) = unsubscribe_message {
// fix
tracing::info!("{:#?}", err);
tracing::error!("{:#?}", err);
return MessageProcessResult::Failure(
TerminationErrorCode::GenericError,
err.to_string(),
Expand Down Expand Up @@ -284,7 +283,7 @@ pub async fn message_handler(
let unannounce_message = UnAnnounce::depacketize(&mut payload_buf);

if let Err(err) = unannounce_message {
tracing::info!("{:#?}", err);
tracing::error!("{:#?}", err);
return MessageProcessResult::Failure(
TerminationErrorCode::GenericError,
err.to_string(),
Expand Down Expand Up @@ -312,13 +311,14 @@ pub async fn message_handler(
}
};

tracing::info!("Return Message Type: {:?}", return_message_type.clone());
let mut message_buf = BytesMut::with_capacity(write_buf.len() + 8);
// Add type
message_buf.extend(write_variable_integer(u8::from(return_message_type) as u64));
// Add payload
message_buf.extend(write_buf);

tracing::info!("message_buf: {:#x?}", message_buf);
tracing::debug!("message_buf: {:#x?}", message_buf);

MessageProcessResult::Success(message_buf)
}
2 changes: 1 addition & 1 deletion moqt-core/src/modules/message_type.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use num_enum::{IntoPrimitive, TryFromPrimitive};

#[derive(Debug, PartialEq, Eq, TryFromPrimitive, IntoPrimitive)]
#[derive(Debug, PartialEq, Eq, TryFromPrimitive, IntoPrimitive, Clone)]
#[repr(u8)]
pub enum MessageType {
ObjectWithPayloadLength = 0x00,
Expand Down
Loading