From 02ab5ecaa6d487b30a194cf37920927e2375aad4 Mon Sep 17 00:00:00 2001 From: Denis Nutiu Date: Mon, 23 Dec 2024 14:56:58 +0200 Subject: [PATCH] implement redis streams publishing --- scrapper/Cargo.toml | 5 +++ scrapper/src/cli.rs | 17 +++++++ scrapper/src/main.rs | 95 +++++++++++++++++++++++----------------- scrapper/src/redis.rs | 56 +++++++++++++++++++++++ scrapper/src/scrapper.rs | 4 +- 5 files changed, 137 insertions(+), 40 deletions(-) create mode 100644 scrapper/src/cli.rs create mode 100644 scrapper/src/redis.rs diff --git a/scrapper/Cargo.toml b/scrapper/Cargo.toml index e6c0878..3581bce 100644 --- a/scrapper/Cargo.toml +++ b/scrapper/Cargo.toml @@ -12,3 +12,8 @@ scraper = "0.22.0" clokwerk = "0.4.0" log = "0.4.22" ctrlc = "3.4.5" +clap = { version = "4.5.23", features = ["derive"] } +redis = { version = "0.27.6", features = ["tokio-comp"] } +md5 = "0.7.0" +serde = { version = "1.0.216", features = ["derive"] } +serde_json = "1.0.134" diff --git a/scrapper/src/cli.rs b/scrapper/src/cli.rs new file mode 100644 index 0000000..3a077d4 --- /dev/null +++ b/scrapper/src/cli.rs @@ -0,0 +1,17 @@ +use clap::Parser; + +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +pub struct CliArgs { + /// Redis host + #[arg(short, long)] + pub redis_connection_string: String, + + /// Redis stream name + #[arg(short = 't', long)] + pub redis_stream_name: String, + + /// The scraping interval in minutes + #[arg(short, long, default_value_t = 60)] + pub scrape_interval_minutes: u32, +} diff --git a/scrapper/src/main.rs b/scrapper/src/main.rs index 9ee2cb9..3696028 100644 --- a/scrapper/src/main.rs +++ b/scrapper/src/main.rs @@ -1,5 +1,8 @@ +use crate::cli::CliArgs; +use crate::redis::RedisService; use crate::scrapper::gfourmedia::G4Media; use crate::scrapper::{NewsPost, WebScrapperEngine}; +use clap::Parser; use clokwerk::{AsyncScheduler, Interval, TimeUnits}; use log::{debug, error, info}; use std::sync::atomic::{AtomicBool, Ordering}; @@ -8,47 +11,10 @@ use std::sync::{mpsc, Arc}; use std::time::Duration; use tokio::task::JoinHandle; +mod cli; +mod redis; mod scrapper; -#[tokio::main] -async fn main() -> Result<(), anyhow::Error> { - env_logger::init(); - info!("Starting the program"); - - // Scheduler setup - let mut scheduler = AsyncScheduler::new(); - - // Channel for synchronizing the scrapper and the bot - let (tx, rx): (Sender, Receiver) = mpsc::channel(); - - // Graceful shutdown. - let running = Arc::new(AtomicBool::new(true)); - let r = running.clone(); - ctrlc::set_handler(move || { - r.store(false, Ordering::SeqCst); - }) - .expect("Error setting Ctrl-C handler"); - - run_scrapping_job(&mut scheduler, tx, 60.minutes()); - - // Run the scheduler in a separate thread. - let handle = run_scheduler(scheduler, running.clone()); - - for news_post in rx.iter() { - if !running.load(Ordering::SeqCst) { - debug!("Used requested shutdown."); - break; - } - info!("Received post {:?}", news_post) - } - - info!("Stopped the program"); - - handle.await?; - - Ok(()) -} - /// Runs the scheduler in a separated thread. /// /// If CTRL+C is pressed it will set `running` to `true`. @@ -82,3 +48,54 @@ fn run_scrapping_job(scheduler: &mut AsyncScheduler, tx: Sender, inter } }); } + +#[tokio::main] +async fn main() -> Result<(), anyhow::Error> { + env_logger::init(); + let args = CliArgs::parse(); + info!("Starting the program"); + + // Redis setup + let mut redis_service = + RedisService::new(args.redis_connection_string, args.redis_stream_name).await; + + // Scheduler setup + let mut scheduler = AsyncScheduler::new(); + + // Channel for synchronizing the scrapper and the bot + let (tx, rx): (Sender, Receiver) = mpsc::channel(); + + // Graceful shutdown. + let running = Arc::new(AtomicBool::new(true)); + let r = running.clone(); + ctrlc::set_handler(move || { + r.store(false, Ordering::SeqCst); + }) + .expect("Error setting Ctrl-C handler"); + + run_scrapping_job(&mut scheduler, tx, args.scrape_interval_minutes.minutes()); + + // Run the scheduler in a separate thread. + let handle = run_scheduler(scheduler, running.clone()); + + for news_post in rx.iter() { + if !running.load(Ordering::SeqCst) { + debug!("Used requested shutdown."); + break; + } + info!("Received post {:?}", news_post); + if news_post.is_complete() { + let title = news_post.title.clone().unwrap(); + if redis_service.is_post_seen(&title).await { + redis_service.publish(news_post).await; + redis_service.mark_post_seen(&title, 60 * 60 * 24 * 3).await; + }; + } + } + + info!("Stopped the program"); + + handle.await?; + + Ok(()) +} diff --git a/scrapper/src/redis.rs b/scrapper/src/redis.rs new file mode 100644 index 0000000..aaf53aa --- /dev/null +++ b/scrapper/src/redis.rs @@ -0,0 +1,56 @@ +use crate::scrapper::NewsPost; +use log::error; +use redis::aio::MultiplexedConnection; +use redis::{AsyncCommands, RedisError}; + +pub struct RedisService { + multiplexed_connection: MultiplexedConnection, + stream_name: String, +} + +impl RedisService { + /// Creates a new RedisService instance. + pub async fn new(connection_string: String, stream_name: String) -> Self { + let client = redis::Client::open(connection_string).unwrap(); + let con = client.get_multiplexed_async_connection().await.unwrap(); + + RedisService { + multiplexed_connection: con, + stream_name, + } + } + + //noinspection RsSelfConvention + /// Returns true if the key exists in Redis, false otherwise. + pub async fn is_post_seen(&mut self, title: &str) -> bool { + let digest = md5::compute(title); + let result: Result = self + .multiplexed_connection + .get(format!("{:x}", digest)) + .await; + result.unwrap_or(false) + } + + /// Marks the post as seen + pub async fn mark_post_seen(&mut self, title: &str, ttl: u64) { + let digest = md5::compute(title); + let _ = self + .multiplexed_connection + .set_ex::(format!("{:x}", digest), true, ttl) + .await; + } + + /// Publishes the post to the redis stream. + pub async fn publish(&mut self, post: NewsPost) { + let serialized_post = serde_json::to_string(&post).unwrap(); + let result = redis::cmd("XADD") + .arg(format!("posts:{}", self.stream_name)) + .arg("*") + .arg(serialized_post) + .exec_async(&mut self.multiplexed_connection) + .await; + if result.is_err() { + error!("Failed to publish {:?} to stream", post); + } + } +} diff --git a/scrapper/src/scrapper.rs b/scrapper/src/scrapper.rs index 6e1a451..0c6cbf6 100644 --- a/scrapper/src/scrapper.rs +++ b/scrapper/src/scrapper.rs @@ -1,7 +1,9 @@ +use serde::{Deserialize, Serialize}; + pub(crate) mod gfourmedia; /// NewsPost represents a news post. -#[derive(Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct NewsPost { /// A URL containing the image of the post. pub image: Option,