implement redis streams publishing

This commit is contained in:
Denis-Cosmin Nutiu 2024-12-23 14:56:58 +02:00
parent 1f386fd002
commit 02ab5ecaa6
5 changed files with 137 additions and 40 deletions

View file

@ -12,3 +12,8 @@ scraper = "0.22.0"
clokwerk = "0.4.0"
log = "0.4.22"
ctrlc = "3.4.5"
clap = { version = "4.5.23", features = ["derive"] }
redis = { version = "0.27.6", features = ["tokio-comp"] }
md5 = "0.7.0"
serde = { version = "1.0.216", features = ["derive"] }
serde_json = "1.0.134"

17
scrapper/src/cli.rs Normal file
View file

@ -0,0 +1,17 @@
use clap::Parser;
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
pub struct CliArgs {
/// Redis host
#[arg(short, long)]
pub redis_connection_string: String,
/// Redis stream name
#[arg(short = 't', long)]
pub redis_stream_name: String,
/// The scraping interval in minutes
#[arg(short, long, default_value_t = 60)]
pub scrape_interval_minutes: u32,
}

View file

@ -1,5 +1,8 @@
use crate::cli::CliArgs;
use crate::redis::RedisService;
use crate::scrapper::gfourmedia::G4Media;
use crate::scrapper::{NewsPost, WebScrapperEngine};
use clap::Parser;
use clokwerk::{AsyncScheduler, Interval, TimeUnits};
use log::{debug, error, info};
use std::sync::atomic::{AtomicBool, Ordering};
@ -8,47 +11,10 @@ use std::sync::{mpsc, Arc};
use std::time::Duration;
use tokio::task::JoinHandle;
mod cli;
mod redis;
mod scrapper;
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
env_logger::init();
info!("Starting the program");
// Scheduler setup
let mut scheduler = AsyncScheduler::new();
// Channel for synchronizing the scrapper and the bot
let (tx, rx): (Sender<NewsPost>, Receiver<NewsPost>) = mpsc::channel();
// Graceful shutdown.
let running = Arc::new(AtomicBool::new(true));
let r = running.clone();
ctrlc::set_handler(move || {
r.store(false, Ordering::SeqCst);
})
.expect("Error setting Ctrl-C handler");
run_scrapping_job(&mut scheduler, tx, 60.minutes());
// Run the scheduler in a separate thread.
let handle = run_scheduler(scheduler, running.clone());
for news_post in rx.iter() {
if !running.load(Ordering::SeqCst) {
debug!("Used requested shutdown.");
break;
}
info!("Received post {:?}", news_post)
}
info!("Stopped the program");
handle.await?;
Ok(())
}
/// Runs the scheduler in a separated thread.
///
/// If CTRL+C is pressed it will set `running` to `true`.
@ -82,3 +48,54 @@ fn run_scrapping_job(scheduler: &mut AsyncScheduler, tx: Sender<NewsPost>, inter
}
});
}
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
env_logger::init();
let args = CliArgs::parse();
info!("Starting the program");
// Redis setup
let mut redis_service =
RedisService::new(args.redis_connection_string, args.redis_stream_name).await;
// Scheduler setup
let mut scheduler = AsyncScheduler::new();
// Channel for synchronizing the scrapper and the bot
let (tx, rx): (Sender<NewsPost>, Receiver<NewsPost>) = mpsc::channel();
// Graceful shutdown.
let running = Arc::new(AtomicBool::new(true));
let r = running.clone();
ctrlc::set_handler(move || {
r.store(false, Ordering::SeqCst);
})
.expect("Error setting Ctrl-C handler");
run_scrapping_job(&mut scheduler, tx, args.scrape_interval_minutes.minutes());
// Run the scheduler in a separate thread.
let handle = run_scheduler(scheduler, running.clone());
for news_post in rx.iter() {
if !running.load(Ordering::SeqCst) {
debug!("Used requested shutdown.");
break;
}
info!("Received post {:?}", news_post);
if news_post.is_complete() {
let title = news_post.title.clone().unwrap();
if redis_service.is_post_seen(&title).await {
redis_service.publish(news_post).await;
redis_service.mark_post_seen(&title, 60 * 60 * 24 * 3).await;
};
}
}
info!("Stopped the program");
handle.await?;
Ok(())
}

56
scrapper/src/redis.rs Normal file
View file

@ -0,0 +1,56 @@
use crate::scrapper::NewsPost;
use log::error;
use redis::aio::MultiplexedConnection;
use redis::{AsyncCommands, RedisError};
pub struct RedisService {
multiplexed_connection: MultiplexedConnection,
stream_name: String,
}
impl RedisService {
/// Creates a new RedisService instance.
pub async fn new(connection_string: String, stream_name: String) -> Self {
let client = redis::Client::open(connection_string).unwrap();
let con = client.get_multiplexed_async_connection().await.unwrap();
RedisService {
multiplexed_connection: con,
stream_name,
}
}
//noinspection RsSelfConvention
/// Returns true if the key exists in Redis, false otherwise.
pub async fn is_post_seen(&mut self, title: &str) -> bool {
let digest = md5::compute(title);
let result: Result<bool, RedisError> = self
.multiplexed_connection
.get(format!("{:x}", digest))
.await;
result.unwrap_or(false)
}
/// Marks the post as seen
pub async fn mark_post_seen(&mut self, title: &str, ttl: u64) {
let digest = md5::compute(title);
let _ = self
.multiplexed_connection
.set_ex::<String, bool, bool>(format!("{:x}", digest), true, ttl)
.await;
}
/// Publishes the post to the redis stream.
pub async fn publish(&mut self, post: NewsPost) {
let serialized_post = serde_json::to_string(&post).unwrap();
let result = redis::cmd("XADD")
.arg(format!("posts:{}", self.stream_name))
.arg("*")
.arg(serialized_post)
.exec_async(&mut self.multiplexed_connection)
.await;
if result.is_err() {
error!("Failed to publish {:?} to stream", post);
}
}
}

View file

@ -1,7 +1,9 @@
use serde::{Deserialize, Serialize};
pub(crate) mod gfourmedia;
/// NewsPost represents a news post.
#[derive(Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct NewsPost {
/// A URL containing the image of the post.
pub image: Option<String>,