fix redis streams publishing

This commit is contained in:
Denis-Cosmin Nutiu 2024-12-23 15:15:11 +02:00
parent 48bc873881
commit 9a3d65b26d
2 changed files with 12 additions and 6 deletions

View file

@ -73,7 +73,7 @@ async fn main() -> Result<(), anyhow::Error> {
}) })
.expect("Error setting Ctrl-C handler"); .expect("Error setting Ctrl-C handler");
run_scrapping_job(&mut scheduler, tx, 60.seconds()); run_scrapping_job(&mut scheduler, tx, args.scrape_interval_minutes.minutes());
// Run the scheduler in a separate thread. // Run the scheduler in a separate thread.
let handle = run_scheduler(scheduler, running.clone()); let handle = run_scheduler(scheduler, running.clone());
@ -87,9 +87,11 @@ async fn main() -> Result<(), anyhow::Error> {
if news_post.is_complete() { if news_post.is_complete() {
let title = news_post.title.clone().unwrap(); let title = news_post.title.clone().unwrap();
if !redis_service.is_post_seen(&title).await { if !redis_service.is_post_seen(&title).await {
redis_service.publish(&news_post).await; let published = redis_service.publish(&news_post).await;
if published {
info!("Published {:?}", news_post); info!("Published {:?}", news_post);
redis_service.mark_post_seen(&title, 60 * 60 * 24 * 3).await; redis_service.mark_post_seen(&title, 60 * 60 * 24 * 3).await;
}
}; };
} }
} }

View file

@ -41,16 +41,20 @@ impl RedisService {
} }
/// Publishes the post to the redis stream. /// Publishes the post to the redis stream.
pub async fn publish(&mut self, post: &NewsPost) { /// Returns a `bool` that is true if the post was published and false otherwise.
pub async fn publish(&mut self, post: &NewsPost) -> bool {
let serialized_post = serde_json::to_string(&post).unwrap(); let serialized_post = serde_json::to_string(&post).unwrap();
let result = redis::cmd("XADD") let result = redis::cmd("XADD")
.arg(format!("posts:{}", self.stream_name)) .arg(format!("posts:{}", self.stream_name))
.arg("*") .arg("*")
.arg("post_data")
.arg(serialized_post) .arg(serialized_post)
.exec_async(&mut self.multiplexed_connection) .exec_async(&mut self.multiplexed_connection)
.await; .await;
if result.is_err() { if result.is_err() {
error!("Failed to publish {:?} to stream", result); error!("Failed to publish {:?} to stream", result);
} return false;
};
true
} }
} }