add graceful shutdown and refactor code

This commit is contained in:
Denis-Cosmin Nutiu 2024-12-21 17:10:08 +02:00
parent 99d91ba303
commit c3440a66c0
3 changed files with 75 additions and 16 deletions

View file

@ -10,3 +10,5 @@ anyhow = "1.0"
reqwest = "0.12.9" reqwest = "0.12.9"
scraper = "0.22.0" scraper = "0.22.0"
clokwerk = "0.4.0" clokwerk = "0.4.0"
log = "0.4.22"
ctrlc = "3.4.5"

View file

@ -1,27 +1,84 @@
use crate::scrapper::gfourmedia::G4Media; use crate::scrapper::gfourmedia::G4Media;
use crate::scrapper::WebScrapperEngine; use crate::scrapper::{NewsPost, WebScrapperEngine};
use clokwerk::{AsyncScheduler, TimeUnits}; use clokwerk::{AsyncScheduler, Interval, TimeUnits};
use log::{debug, error, info};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{mpsc, Arc};
use std::time::Duration; use std::time::Duration;
use tokio::task::JoinHandle;
mod scrapper; mod scrapper;
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), anyhow::Error> { async fn main() -> Result<(), anyhow::Error> {
env_logger::init(); env_logger::init();
info!("Starting the program");
// Scheduler setup
let mut scheduler = AsyncScheduler::new(); let mut scheduler = AsyncScheduler::new();
scheduler.every(60.seconds()).run(|| async {
let posts = WebScrapperEngine::get_posts(G4Media::default()) // Channel for synchronizing the scrapper and the bot
.await let (tx, rx): (Sender<NewsPost>, Receiver<NewsPost>) = mpsc::channel();
.expect("failed to get posts");
posts // Graceful shutdown.
.iter() let running = Arc::new(AtomicBool::new(true));
.filter(|p| p.is_complete()) let r = running.clone();
.for_each(|p| println!("{:?}", p)); ctrlc::set_handler(move || {
}); r.store(false, Ordering::SeqCst);
// Manually run the scheduler forever })
loop { .expect("Error setting Ctrl-C handler");
scheduler.run_pending().await;
tokio::time::sleep(Duration::from_millis(10)).await; run_scrapping_job(&mut scheduler, tx, 60.seconds());
// Run the scheduler in a separate thread.
let handle = run_scheduler(scheduler, running.clone());
for news_post in rx.iter() {
if !running.load(Ordering::SeqCst) {
debug!("Used requested shutdown.");
break;
} }
info!("Received post {:?}", news_post)
}
info!("Stopped the program");
handle.await?;
Ok(()) Ok(())
} }
/// Runs the scheduler in a separated thread.
///
/// If CTRL+C is pressed it will set `running` to `true`.
fn run_scheduler(mut scheduler: AsyncScheduler, running: Arc<AtomicBool>) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
if !running.load(Ordering::SeqCst) {
debug!("Used requested shutdown.");
break;
}
scheduler.run_pending().await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
})
}
/// Runs the scraping job at the specified interval.
fn run_scrapping_job(scheduler: &mut AsyncScheduler, tx: Sender<NewsPost>, interval: Interval) {
scheduler.every(interval).run(move || {
let tx = tx.clone();
async move {
let posts = WebScrapperEngine::get_posts(G4Media::default())
.await
.expect("failed to get posts");
posts.iter().filter(|p| p.is_complete()).for_each(|p| {
let result = tx.send(p.clone());
if result.is_err() {
error!("Failed to send post {:?} to the channel", p)
}
});
}
});
}

View file

@ -1,7 +1,7 @@
pub(crate) mod gfourmedia; pub(crate) mod gfourmedia;
/// NewsPost represents a news post. /// NewsPost represents a news post.
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct NewsPost { pub struct NewsPost {
/// A URL containing the image of the post. /// A URL containing the image of the post.
pub image: Option<String>, pub image: Option<String>,