diff --git a/infrastructure/src/redis.rs b/infrastructure/src/redis.rs index 8dfca2a..4792828 100644 --- a/infrastructure/src/redis.rs +++ b/infrastructure/src/redis.rs @@ -1,54 +1,50 @@ use log::error; -use post::NewsPost; use redis::aio::MultiplexedConnection; use redis::{AsyncCommands, RedisError}; +use serde::Serialize; pub struct RedisService { multiplexed_connection: MultiplexedConnection, - stream_name: String, } impl RedisService { /// Creates a new RedisService instance. - pub async fn new(connection_string: &str, stream_name: &str) -> Self { + pub async fn new(connection_string: &str) -> Self { let client = redis::Client::open(connection_string).unwrap(); let con = client.get_multiplexed_async_connection().await.unwrap(); RedisService { multiplexed_connection: con, - stream_name: stream_name.to_string(), } } //noinspection RsSelfConvention /// Returns true if the key exists in Redis, false otherwise. - pub async fn is_post_seen(&mut self, title: &str) -> bool { - let digest = md5::compute(title); - let result: Result = self - .multiplexed_connection - .get(format!("{:x}", digest)) - .await; + pub async fn is_key_flagged(&mut self, key: &str) -> bool { + let result: Result = self.multiplexed_connection.get(key).await; result.unwrap_or(false) } - /// Marks the post as seen - pub async fn mark_post_seen(&mut self, title: &str, ttl: u64) { - let digest = md5::compute(title); + /// Flags the key by setting it to true. + pub async fn flag_key(&mut self, key: &str, ttl: u64) { let _ = self .multiplexed_connection - .set_ex::(format!("{:x}", digest), true, ttl) + .set_ex::(key.to_string(), true, ttl) .await; } - /// Publishes the post to the redis stream. - /// Returns a `bool` that is true if the post was published and false otherwise. - pub async fn publish(&mut self, post: &NewsPost) -> bool { - let serialized_post = serde_json::to_string(&post).unwrap(); + /// Publishes the data to the redis stream. + /// Returns a `bool` that is true if the data was published and false otherwise. + pub async fn publish(&mut self, stream_name: &str, data: &ST) -> bool + where + ST: Serialize, + { + let serialized_data = serde_json::to_string(&data).unwrap(); let result = redis::cmd("XADD") - .arg(format!("posts:{}", self.stream_name)) + .arg(stream_name) .arg("*") - .arg("post_data") - .arg(serialized_post) + .arg("data") + .arg(serialized_data) .exec_async(&mut self.multiplexed_connection) .await; if result.is_err() { @@ -62,6 +58,7 @@ impl RedisService { #[cfg(test)] mod tests { use super::*; + use post::NewsPost; use rand::distributions::{Alphanumeric, DistString}; use redis::RedisResult; use serial_test::serial; @@ -79,20 +76,19 @@ mod tests { #[tokio::test] #[serial] async fn test_redis_service_new() { - let _ = RedisService::new(REDIS_CONNECTION_STRING, "a").await; + let _ = RedisService::new(REDIS_CONNECTION_STRING).await; } #[tokio::test] #[serial] - async fn test_redis_service_is_post_seen_false() { + async fn test_redis_service_key_exists_false() { // Setup - let random_stream_name = Alphanumeric.sample_string(&mut rand::thread_rng(), 6); let random_post = Alphanumeric.sample_string(&mut rand::thread_rng(), 6); - let mut service = RedisService::new(REDIS_CONNECTION_STRING, &random_stream_name).await; + let mut service = RedisService::new(REDIS_CONNECTION_STRING).await; // Test - let result = service.is_post_seen(&random_post).await; + let result = service.is_key_flagged(&random_post).await; // Assert assert_eq!(result, false); @@ -101,16 +97,15 @@ mod tests { #[tokio::test] #[serial] - async fn test_redis_service_is_post_seen_true() { + async fn test_redis_service_key_exists_true() { // Setup - let random_stream_name = Alphanumeric.sample_string(&mut rand::thread_rng(), 6); let random_post = Alphanumeric.sample_string(&mut rand::thread_rng(), 6); - let mut service = RedisService::new(REDIS_CONNECTION_STRING, &random_stream_name).await; - service.mark_post_seen(&random_post, 10).await; + let mut service = RedisService::new(REDIS_CONNECTION_STRING).await; + service.flag_key(&random_post, 10).await; // Test - let result = service.is_post_seen(&random_post).await; + let result = service.is_key_flagged(&random_post).await; // Assert assert_eq!(result, true); @@ -123,7 +118,7 @@ mod tests { // Setup let random_stream_name = Alphanumeric.sample_string(&mut rand::thread_rng(), 6); - let mut service = RedisService::new(REDIS_CONNECTION_STRING, &random_stream_name).await; + let mut service = RedisService::new(REDIS_CONNECTION_STRING).await; // Test let post = NewsPost { @@ -133,10 +128,10 @@ mod tests { link: Some(String::from("l")), author: Some(String::from("a")), }; - let result = service.publish(&post).await; + let result = service.publish(&random_stream_name, &post).await; let stream_length: RedisResult = redis::cmd("XLEN") - .arg(&format!("posts:{}", random_stream_name)) + .arg(random_stream_name) .query_async(&mut service.multiplexed_connection) .await; diff --git a/scrapper/src/main.rs b/scrapper/src/main.rs index db1ce75..608f864 100644 --- a/scrapper/src/main.rs +++ b/scrapper/src/main.rs @@ -56,8 +56,7 @@ async fn main() -> Result<(), anyhow::Error> { info!("Starting the program"); // Redis setup - let mut redis_service = - RedisService::new(&args.redis_connection_string, &args.redis_stream_name).await; + let mut redis_service = RedisService::new(&args.redis_connection_string).await; // Scheduler setup let mut scheduler = AsyncScheduler::new(); @@ -86,11 +85,13 @@ async fn main() -> Result<(), anyhow::Error> { info!("Received post {:?}", news_post); if news_post.is_complete() { let title = news_post.title.clone().unwrap(); - if !redis_service.is_post_seen(&title).await { - let published = redis_service.publish(&news_post).await; + if !redis_service.is_key_flagged(&title).await { + let published = redis_service + .publish(&args.redis_stream_name, &news_post) + .await; if published { info!("Published {:?}", news_post); - redis_service.mark_post_seen(&title, 60 * 60 * 24 * 3).await; + redis_service.flag_key(&title, 60 * 60 * 24 * 3).await; } }; }