Skip to content

Commit

Permalink
fix(streamer): fix the bug of redis publishing, we should first start…
Browse files Browse the repository at this point in the history
… consuming then publish data into the channel until atleast 1 subscriber receives it
  • Loading branch information
wildonion committed Sep 19, 2024
1 parent db73cdd commit f6afb6f
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 54 deletions.
8 changes: 4 additions & 4 deletions infra/api.http.json
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,12 @@
"header": [
{
"key": "token_time",
"value": "124kUmD39YUwJRKxaQqTkTRZGDr9L6fszxLz6xKvXrqJSJQNo9vAMjYs5tbbiRPskTGUgRegd"
"value": "124kUmD39YUwJRKxaQqTkFidRW3uAzhcFniiky7b2HjFPME8R5ZLdFgs9z6WncoLbPh72EPNX"
}
],
"body": {
"mode": "raw",
"raw": "{\n \"producer_info\": {\n \"Redis\":{\n \"local_spawn\": true, // spawn the task into the actor thread itself\n \"notif_data\": { \n \"receiver_info\": \"1\",\n \"id\": \"unqie-id0\",\n \"action_data\": {\n \"pid\": 200.4\n }, \n \"actioner_info\": \"2\", \n \"action_type\": \"ProductPurchased\", \n \"fired_at\": 1714316645, \n \"is_seen\": false\n }, \n \"channel\": \"channel-to-produce-msg-to\",\n \"encryptionConfig\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@12345\"\n }\n },\n \"Rmq\": null,\n \"Kafka\": null\n },\n \"consumer_info\": null\n}",
"raw": "{\n \"producer_info\": {\n \"Redis\":{\n \"local_spawn\": true, // spawn the task into the actor thread itself\n \"notif_data\": { \n \"receiver_info\": \"1\",\n \"id\": \"unqie-id0\",\n \"action_data\": {\n \"pid\": 200.4\n }, \n \"actioner_info\": \"2\", \n \"action_type\": \"ProductPurchased\", \n \"fired_at\": 1714316645, \n \"is_seen\": false\n }, \n \"channel\": \"savege channel\",\n \"encryptionConfig\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@12345\"\n }\n },\n \"Rmq\": null,\n \"Kafka\": null\n },\n \"consumer_info\": null\n}",
"options": {
"raw": {
"language": "json"
Expand Down Expand Up @@ -246,12 +246,12 @@
"header": [
{
"key": "token_time",
"value": "124kUmD39YUwJRKxaQqTkRUAJQtFUBmfjrpL99YjTeaJ1jBEDgt7sCVdKKdvoUi4n6Ennr8e2"
"value": "124kUmD39YUwJRKxaQqTkFidRW3uAzhcFniiky7b2HjFPME8R5ZLdFgs9z6WncoLbPh72EPNX"
}
],
"body": {
"mode": "raw",
"raw": "{\n \"producer_info\": null,\n \"consumer_info\": {\n \"Redis\": {\n \"channel\": \"channel-to-consume-msg-from\",\n \"redis_cache_exp\": 300, // this can be 0 to not to store data on redis\n \"decryptionConfig\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@12345\"\n }\n },\n \"Rmq\": null,\n \"Kafka\": null\n }\n}",
"raw": "{\n \"producer_info\": null,\n \"consumer_info\": {\n \"Redis\": {\n \"channel\": \"savege channel\",\n \"redis_cache_exp\": 300, // this can be 0 to not to store data on redis\n \"decryptionConfig\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@12345\"\n }\n },\n \"Rmq\": null,\n \"Kafka\": null\n }\n}",
"options": {
"raw": {
"language": "json"
Expand Down
2 changes: 2 additions & 0 deletions logs/error-kind/zerlog.log
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,5 @@ code: 65535 | message: Message production error: UnknownPartition (Local: Unknow
| time: 1726767152352 | method name: NotifBrokerActor.publishToKafka.deliveryStatus
code: 65531 | message: invalid number at line 1 column 2 | due to: Serde Error
| time: 1726771961937 | method name: NotifBrokerActor.consumeFromKafka.decode_serde
code: 65531 | message: invalid number at line 1 column 2 | due to: Serde Error
| time: 1726773899166 | method name: NotifBrokerActor.consumeFromRedis.decode_serde
90 changes: 40 additions & 50 deletions src/workers/notif/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,26 @@
/* ========================================================================================
REALTIME NOTIF EVENT STREAMING DESIGN PATTERN (README files inside docs folder)
========================================================================================
MAKE SURE YOU'VE STARTED CONSUMERS BEFORE PRODUCING
THEY MUST BE READY FOR CONSUMING WHILE PRODUCERS ARE
SENDING MESSAGES TO THE BROKER.
concurrency tools & notes :
→ an eventloop is a thread safe receiver queue of the mpsc channel which receives tasks and execute them in free background thread
→ actor with Box::pin(async{}), tokio::select, tokio::spawn, tokio::sync::{Mutex, mpsc, RwLock}, std::sync::{Condvar, Arc, Mutex}
→ cpu tasks are graph and geo calculations as well as cryptography algorithms which are resource intensive
→ async io tasks are io and networking calls which must be handled simultaneously in order to scale resources
→ async io task execution inside light threadpool: wait on the task but don't block the thread, continue with executing other tasks
→ cpu task execution inside os threadpool: suspend the thread of execution by blocking it, avoid executing other tasks
→ use await on the async io task to not to block the thread and let the thread execute other tasks meanwhile the task is waiting to be solved
→ await pauses and suspends the function execution not the thread and tells the eventloop to pop out another task while the function is awaited
→ use join on the cpu task to block the thread to suspend the thread execution with all tasks and wait for the result of the thread
→ use Condvar to wait and block the thread until some data changes then notify the blocked thread
→ don't use os threadpool in the context of light threadpool, they block the execution thread as well as the entire async runtime
→ std mutex block the caller thread if the lock is busy it stops the thread from executing all tasks until it acquires the lock
→ tokio mutex suspend the async task if the lock is busy it suspend the io task instead of blocking the executor thread
→ std stuffs block and suspend the thread and stop it from executing other tasks while it doing some heavy operations inside the thread like mutex logics
→ tokio stuffs suspend the async io task process instead of blocking the thread and allows the thread executing other tasks simultaneously
→ use channels for atomic syncing between threads instead of using mutex in both async and none async context, send the mutated/updated data to channel instead of using mutex or condvar
→ if we want some result of an either async io or cpu task we have the options of either using of mutex, channels or joining on the thread (would block cpu threads)
→ as soon as the future or async io task is ready to yeild a value the runtime meanwhile of handling other tasks would notify the caller about the result
→ as soon as the the result of the task is ready to be returned from the os thread the os thread will be stopped blocking and continue with executing other tasks
→ actors have their own os or ligh thread of execution which uses to spawn tasks they've received via message passing channels or mailbox
→ actors receive messages asyncly using their receiver eventloop of their jobq mpsc mailbox, they execute them one at a time to ensure the internal state remains consistent cause there is no mutex
→ to share a data between threads it must be Send Sync and live valid
→ initialize storage and actors data structures once and pack them in AppContext struct then share this between threads
→ what are objects: are an isolated thread objects contains light thread for executing tasks, cron scheudling and jobq mailbox
→ talk between two objects using job/task/msg queue with mpsc and rpc based channels like rmq, redis, kafka
→ receive tasks from the channel by streaming over eventloop with while let Some() = rx.recv().await{}
→ what eventloop does: executing received tasks inside a light thread of execution
→ stream is an eventloop receiver channel of some jobq that can be iterated over to get data as they're coming from the channel
NotifBrokerActor is the worker of handling the process of publishing and consuming
messages through rmq, redis and kafka, talking to the NotifBrokerActor can be done
by sending it a message contains the setup either to publish or consume something
to and from an specific broker, so generally it's a sexy actor to produce/consume
messages from different type of brokers it uses RMQ, Redis and Kafka to produce and
consume massive messages in realtime, kindly it supports data AES256 encryption
through producing messages to the broker. we can send either producing or consuming
message to this actor to start producing or consuming in the background.
************************************************************************************
it's notable that for realtime push notif streaming we MUST start consuming from
the specified broker passed in to the message structure when talking with actor, in
a place where the application logic which is likely a server is being started.
************************************************************************************
========================================================================================
*/

Expand All @@ -50,6 +40,7 @@ use rdkafka::ClientConfig;
use rdkafka::Message;
use redis_async::resp::FromResp;
use tokio::spawn;
use workers::scheduler::CronScheduler;
use crate::*;
use deadpool_lapin::lapin::protocol::channel;
use deadpool_redis::redis::AsyncCommands;
Expand Down Expand Up @@ -83,22 +74,6 @@ use crate::interfaces::crypter::Crypter;


/* ========================================================================================
NotifBrokerActor is the worker of handling the process of publishing and consuming
messages through rmq, redis and kafka, talking to the NotifBrokerActor can be done
by sending it a message contains the setup either to publish or consume something
to and from an specific broker, so generally it's a sexy actor to produce/consume
messages from different type of brokers it uses RMQ, Redis and Kafka to produce and
consume massive messages in realtime, kindly it supports data AES256 encryption
through producing messages to the broker. we can send either producing or consuming
message to this actor to start producing or consuming in the background.
************************************************************************************
it's notable that for realtime push notif streaming we MUST start consuming from
the specified broker passed in to the message structure when talking with actor, in
a place where the application logic which is likely a server is being started.
MAKE SURE YOU'VE STARTED CONSUMING BEFORE PRODUCING
************************************************************************************
brokering is all about queueing, sending and receiving messages way more faster,
safer and reliable than a simple eventloop or a tcp based channel.
all brokers contains message/task/job queue to handle communication between services
Expand Down Expand Up @@ -219,7 +194,7 @@ pub struct PublishNotifToRedis{
pub struct ConsumeNotifFromRedis{
pub channel: String,
pub redis_cache_exp: u64,
pub decryption_config: Option<CryptoConfig>
pub decryptionConfig: Option<CryptoConfig>
}

#[derive(Message, Clone, Serialize, Deserialize, Debug, Default, ToSchema)]
Expand Down Expand Up @@ -390,7 +365,20 @@ impl NotifBrokerActor{
dataString
};

let _: () = redis_conn.publish(channel.clone(), finalData).await.unwrap();
// we should keep sending until a consumer receive the data!
tokio::spawn(async move{
let mut int = tokio::time::interval(tokio::time::Duration::from_secs(1));
loop{
int.tick().await;
let getSubs: RedisResult<u64> = redis_conn.publish(channel.clone(), finalData.clone()).await;
let subs = getSubs.unwrap();
if subs >= 1{
log::info!("Message has been published to Redis PubSub Channel");
break;
}
}
});

});

},
Expand Down Expand Up @@ -472,7 +460,9 @@ impl NotifBrokerActor{
while let Some(message) = pubsubstream.next().await{
let resp_val = message.unwrap();
let mut channelData = String::from_resp(resp_val).unwrap(); // this is the expired key


log::info!("Message has been Received from Redis PubSub Channel: {}", channelData);

// ===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>
// ===>>>===>>>===>>>===>>>===>>> data decryption logic ===>>>===>>>===>>>===>>>===>>>
// ===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>
Expand Down Expand Up @@ -2096,7 +2086,7 @@ impl ActixMessageHandler<ConsumeNotifFromRedis> for NotifBrokerActor{

fn handle(&mut self, msg: ConsumeNotifFromRedis, ctx: &mut Self::Context) -> Self::Result {

let ConsumeNotifFromRedis { channel, decryption_config, redis_cache_exp } = msg.clone();
let ConsumeNotifFromRedis { channel, decryptionConfig, redis_cache_exp } = msg.clone();
let this = self.clone();

let task = async move{
Expand All @@ -2106,7 +2096,7 @@ impl ActixMessageHandler<ConsumeNotifFromRedis> for NotifBrokerActor{
// the caller otherwise suspend the this.publishToRedis() function
// until the task is ready to be polled, meanwhile it executes other
// tasks (won't block the thread)
this.consumeFromRedis(&channel, decryption_config, redis_cache_exp).await;
this.consumeFromRedis(&channel, decryptionConfig, redis_cache_exp).await;

};

Expand Down

0 comments on commit f6afb6f

Please sign in to comment.