refactor RedisService for generic use
This commit is contained in:
parent
eaca4769a0
commit
351f0ae08e
2 changed files with 35 additions and 39 deletions
|
@ -1,54 +1,50 @@
|
||||||
use log::error;
|
use log::error;
|
||||||
use post::NewsPost;
|
|
||||||
use redis::aio::MultiplexedConnection;
|
use redis::aio::MultiplexedConnection;
|
||||||
use redis::{AsyncCommands, RedisError};
|
use redis::{AsyncCommands, RedisError};
|
||||||
|
use serde::Serialize;
|
||||||
|
|
||||||
pub struct RedisService {
|
pub struct RedisService {
|
||||||
multiplexed_connection: MultiplexedConnection,
|
multiplexed_connection: MultiplexedConnection,
|
||||||
stream_name: String,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RedisService {
|
impl RedisService {
|
||||||
/// Creates a new RedisService instance.
|
/// Creates a new RedisService instance.
|
||||||
pub async fn new(connection_string: &str, stream_name: &str) -> Self {
|
pub async fn new(connection_string: &str) -> Self {
|
||||||
let client = redis::Client::open(connection_string).unwrap();
|
let client = redis::Client::open(connection_string).unwrap();
|
||||||
let con = client.get_multiplexed_async_connection().await.unwrap();
|
let con = client.get_multiplexed_async_connection().await.unwrap();
|
||||||
|
|
||||||
RedisService {
|
RedisService {
|
||||||
multiplexed_connection: con,
|
multiplexed_connection: con,
|
||||||
stream_name: stream_name.to_string(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//noinspection RsSelfConvention
|
//noinspection RsSelfConvention
|
||||||
/// Returns true if the key exists in Redis, false otherwise.
|
/// Returns true if the key exists in Redis, false otherwise.
|
||||||
pub async fn is_post_seen(&mut self, title: &str) -> bool {
|
pub async fn is_key_flagged(&mut self, key: &str) -> bool {
|
||||||
let digest = md5::compute(title);
|
let result: Result<bool, RedisError> = self.multiplexed_connection.get(key).await;
|
||||||
let result: Result<bool, RedisError> = self
|
|
||||||
.multiplexed_connection
|
|
||||||
.get(format!("{:x}", digest))
|
|
||||||
.await;
|
|
||||||
result.unwrap_or(false)
|
result.unwrap_or(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Marks the post as seen
|
/// Flags the key by setting it to true.
|
||||||
pub async fn mark_post_seen(&mut self, title: &str, ttl: u64) {
|
pub async fn flag_key(&mut self, key: &str, ttl: u64) {
|
||||||
let digest = md5::compute(title);
|
|
||||||
let _ = self
|
let _ = self
|
||||||
.multiplexed_connection
|
.multiplexed_connection
|
||||||
.set_ex::<String, bool, bool>(format!("{:x}", digest), true, ttl)
|
.set_ex::<String, bool, bool>(key.to_string(), true, ttl)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Publishes the post to the redis stream.
|
/// Publishes the data to the redis stream.
|
||||||
/// Returns a `bool` that is true if the post was published and false otherwise.
|
/// Returns a `bool` that is true if the data was published and false otherwise.
|
||||||
pub async fn publish(&mut self, post: &NewsPost) -> bool {
|
pub async fn publish<ST>(&mut self, stream_name: &str, data: &ST) -> bool
|
||||||
let serialized_post = serde_json::to_string(&post).unwrap();
|
where
|
||||||
|
ST: Serialize,
|
||||||
|
{
|
||||||
|
let serialized_data = serde_json::to_string(&data).unwrap();
|
||||||
let result = redis::cmd("XADD")
|
let result = redis::cmd("XADD")
|
||||||
.arg(format!("posts:{}", self.stream_name))
|
.arg(stream_name)
|
||||||
.arg("*")
|
.arg("*")
|
||||||
.arg("post_data")
|
.arg("data")
|
||||||
.arg(serialized_post)
|
.arg(serialized_data)
|
||||||
.exec_async(&mut self.multiplexed_connection)
|
.exec_async(&mut self.multiplexed_connection)
|
||||||
.await;
|
.await;
|
||||||
if result.is_err() {
|
if result.is_err() {
|
||||||
|
@ -62,6 +58,7 @@ impl RedisService {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use post::NewsPost;
|
||||||
use rand::distributions::{Alphanumeric, DistString};
|
use rand::distributions::{Alphanumeric, DistString};
|
||||||
use redis::RedisResult;
|
use redis::RedisResult;
|
||||||
use serial_test::serial;
|
use serial_test::serial;
|
||||||
|
@ -79,20 +76,19 @@ mod tests {
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
#[serial]
|
#[serial]
|
||||||
async fn test_redis_service_new() {
|
async fn test_redis_service_new() {
|
||||||
let _ = RedisService::new(REDIS_CONNECTION_STRING, "a").await;
|
let _ = RedisService::new(REDIS_CONNECTION_STRING).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
#[serial]
|
#[serial]
|
||||||
async fn test_redis_service_is_post_seen_false() {
|
async fn test_redis_service_key_exists_false() {
|
||||||
// Setup
|
// Setup
|
||||||
let random_stream_name = Alphanumeric.sample_string(&mut rand::thread_rng(), 6);
|
|
||||||
let random_post = Alphanumeric.sample_string(&mut rand::thread_rng(), 6);
|
let random_post = Alphanumeric.sample_string(&mut rand::thread_rng(), 6);
|
||||||
|
|
||||||
let mut service = RedisService::new(REDIS_CONNECTION_STRING, &random_stream_name).await;
|
let mut service = RedisService::new(REDIS_CONNECTION_STRING).await;
|
||||||
|
|
||||||
// Test
|
// Test
|
||||||
let result = service.is_post_seen(&random_post).await;
|
let result = service.is_key_flagged(&random_post).await;
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
assert_eq!(result, false);
|
assert_eq!(result, false);
|
||||||
|
@ -101,16 +97,15 @@ mod tests {
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
#[serial]
|
#[serial]
|
||||||
async fn test_redis_service_is_post_seen_true() {
|
async fn test_redis_service_key_exists_true() {
|
||||||
// Setup
|
// Setup
|
||||||
let random_stream_name = Alphanumeric.sample_string(&mut rand::thread_rng(), 6);
|
|
||||||
let random_post = Alphanumeric.sample_string(&mut rand::thread_rng(), 6);
|
let random_post = Alphanumeric.sample_string(&mut rand::thread_rng(), 6);
|
||||||
|
|
||||||
let mut service = RedisService::new(REDIS_CONNECTION_STRING, &random_stream_name).await;
|
let mut service = RedisService::new(REDIS_CONNECTION_STRING).await;
|
||||||
service.mark_post_seen(&random_post, 10).await;
|
service.flag_key(&random_post, 10).await;
|
||||||
|
|
||||||
// Test
|
// Test
|
||||||
let result = service.is_post_seen(&random_post).await;
|
let result = service.is_key_flagged(&random_post).await;
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
assert_eq!(result, true);
|
assert_eq!(result, true);
|
||||||
|
@ -123,7 +118,7 @@ mod tests {
|
||||||
// Setup
|
// Setup
|
||||||
let random_stream_name = Alphanumeric.sample_string(&mut rand::thread_rng(), 6);
|
let random_stream_name = Alphanumeric.sample_string(&mut rand::thread_rng(), 6);
|
||||||
|
|
||||||
let mut service = RedisService::new(REDIS_CONNECTION_STRING, &random_stream_name).await;
|
let mut service = RedisService::new(REDIS_CONNECTION_STRING).await;
|
||||||
|
|
||||||
// Test
|
// Test
|
||||||
let post = NewsPost {
|
let post = NewsPost {
|
||||||
|
@ -133,10 +128,10 @@ mod tests {
|
||||||
link: Some(String::from("l")),
|
link: Some(String::from("l")),
|
||||||
author: Some(String::from("a")),
|
author: Some(String::from("a")),
|
||||||
};
|
};
|
||||||
let result = service.publish(&post).await;
|
let result = service.publish(&random_stream_name, &post).await;
|
||||||
|
|
||||||
let stream_length: RedisResult<i32> = redis::cmd("XLEN")
|
let stream_length: RedisResult<i32> = redis::cmd("XLEN")
|
||||||
.arg(&format!("posts:{}", random_stream_name))
|
.arg(random_stream_name)
|
||||||
.query_async(&mut service.multiplexed_connection)
|
.query_async(&mut service.multiplexed_connection)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
|
|
@ -56,8 +56,7 @@ async fn main() -> Result<(), anyhow::Error> {
|
||||||
info!("Starting the program");
|
info!("Starting the program");
|
||||||
|
|
||||||
// Redis setup
|
// Redis setup
|
||||||
let mut redis_service =
|
let mut redis_service = RedisService::new(&args.redis_connection_string).await;
|
||||||
RedisService::new(&args.redis_connection_string, &args.redis_stream_name).await;
|
|
||||||
|
|
||||||
// Scheduler setup
|
// Scheduler setup
|
||||||
let mut scheduler = AsyncScheduler::new();
|
let mut scheduler = AsyncScheduler::new();
|
||||||
|
@ -86,11 +85,13 @@ async fn main() -> Result<(), anyhow::Error> {
|
||||||
info!("Received post {:?}", news_post);
|
info!("Received post {:?}", news_post);
|
||||||
if news_post.is_complete() {
|
if news_post.is_complete() {
|
||||||
let title = news_post.title.clone().unwrap();
|
let title = news_post.title.clone().unwrap();
|
||||||
if !redis_service.is_post_seen(&title).await {
|
if !redis_service.is_key_flagged(&title).await {
|
||||||
let published = redis_service.publish(&news_post).await;
|
let published = redis_service
|
||||||
|
.publish(&args.redis_stream_name, &news_post)
|
||||||
|
.await;
|
||||||
if published {
|
if published {
|
||||||
info!("Published {:?}", news_post);
|
info!("Published {:?}", news_post);
|
||||||
redis_service.mark_post_seen(&title, 60 * 60 * 24 * 3).await;
|
redis_service.flag_key(&title, 60 * 60 * 24 * 3).await;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue