From c3440a66c0e50b63bcb9cfdbbc74eb8b7888a16e Mon Sep 17 00:00:00 2001 From: Denis Nutiu Date: Sat, 21 Dec 2024 17:10:08 +0200 Subject: [PATCH] add graceful shutdown and refactor code --- Cargo.toml | 2 ++ src/main.rs | 87 ++++++++++++++++++++++++++++++++++++++++--------- src/scrapper.rs | 2 +- 3 files changed, 75 insertions(+), 16 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4e1150f..78b2718 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,3 +10,5 @@ anyhow = "1.0" reqwest = "0.12.9" scraper = "0.22.0" clokwerk = "0.4.0" +log = "0.4.22" +ctrlc = "3.4.5" diff --git a/src/main.rs b/src/main.rs index 56dc51d..d127805 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,27 +1,84 @@ use crate::scrapper::gfourmedia::G4Media; -use crate::scrapper::WebScrapperEngine; -use clokwerk::{AsyncScheduler, TimeUnits}; +use crate::scrapper::{NewsPost, WebScrapperEngine}; +use clokwerk::{AsyncScheduler, Interval, TimeUnits}; +use log::{debug, error, info}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{Receiver, Sender}; +use std::sync::{mpsc, Arc}; use std::time::Duration; +use tokio::task::JoinHandle; + mod scrapper; #[tokio::main] async fn main() -> Result<(), anyhow::Error> { env_logger::init(); + info!("Starting the program"); + + // Scheduler setup let mut scheduler = AsyncScheduler::new(); - scheduler.every(60.seconds()).run(|| async { - let posts = WebScrapperEngine::get_posts(G4Media::default()) - .await - .expect("failed to get posts"); - posts - .iter() - .filter(|p| p.is_complete()) - .for_each(|p| println!("{:?}", p)); - }); - // Manually run the scheduler forever - loop { - scheduler.run_pending().await; - tokio::time::sleep(Duration::from_millis(10)).await; + + // Channel for synchronizing the scrapper and the bot + let (tx, rx): (Sender, Receiver) = mpsc::channel(); + + // Graceful shutdown. + let running = Arc::new(AtomicBool::new(true)); + let r = running.clone(); + ctrlc::set_handler(move || { + r.store(false, Ordering::SeqCst); + }) + .expect("Error setting Ctrl-C handler"); + + run_scrapping_job(&mut scheduler, tx, 60.seconds()); + + // Run the scheduler in a separate thread. + let handle = run_scheduler(scheduler, running.clone()); + + for news_post in rx.iter() { + if !running.load(Ordering::SeqCst) { + debug!("Used requested shutdown."); + break; + } + info!("Received post {:?}", news_post) } + info!("Stopped the program"); + + handle.await?; + Ok(()) } + +/// Runs the scheduler in a separated thread. +/// +/// If CTRL+C is pressed it will set `running` to `true`. +fn run_scheduler(mut scheduler: AsyncScheduler, running: Arc) -> JoinHandle<()> { + tokio::spawn(async move { + loop { + if !running.load(Ordering::SeqCst) { + debug!("Used requested shutdown."); + break; + } + scheduler.run_pending().await; + tokio::time::sleep(Duration::from_millis(100)).await; + } + }) +} + +/// Runs the scraping job at the specified interval. +fn run_scrapping_job(scheduler: &mut AsyncScheduler, tx: Sender, interval: Interval) { + scheduler.every(interval).run(move || { + let tx = tx.clone(); + async move { + let posts = WebScrapperEngine::get_posts(G4Media::default()) + .await + .expect("failed to get posts"); + posts.iter().filter(|p| p.is_complete()).for_each(|p| { + let result = tx.send(p.clone()); + if result.is_err() { + error!("Failed to send post {:?} to the channel", p) + } + }); + } + }); +} diff --git a/src/scrapper.rs b/src/scrapper.rs index b65d6c8..6e1a451 100644 --- a/src/scrapper.rs +++ b/src/scrapper.rs @@ -1,7 +1,7 @@ pub(crate) mod gfourmedia; /// NewsPost represents a news post. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct NewsPost { /// A URL containing the image of the post. pub image: Option,