From 291cc55c4df1ea6573c626df8f6e5ec91a253d5e Mon Sep 17 00:00:00 2001 From: Denis Nutiu Date: Sat, 28 Dec 2024 10:22:18 +0200 Subject: [PATCH] refactor read_stream --- infrastructure/src/redis.rs | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/infrastructure/src/redis.rs b/infrastructure/src/redis.rs index f4b5a33..7e2735f 100644 --- a/infrastructure/src/redis.rs +++ b/infrastructure/src/redis.rs @@ -3,7 +3,7 @@ use log::error; use redis::aio::MultiplexedConnection; use redis::streams::StreamReadReply; use redis::Value::BulkString; -use redis::{AsyncCommands, RedisError, RedisResult}; +use redis::{AsyncCommands, RedisError, RedisResult, Value}; use serde::{Deserialize, Serialize}; pub struct RedisService { @@ -111,23 +111,22 @@ impl RedisService { match result { Ok(data) => { - if data.keys.is_empty() { - return Err(anyhow!("read stream entry with empty keys")); - } - if data.keys[0].ids.is_empty() { - return Err(anyhow!("read stream entry with empty ids")); - } - let stream = data.keys[0].ids[0].map.get("data"); - if let Some(BulkString(data)) = stream { + let stream_data: Option<&Value> = data + .keys + .first() + .and_then(|f| f.ids.first().and_then(|i| i.map.get("data"))); + + if let Some(BulkString(data)) = stream_data { let string_data = std::str::from_utf8(data); - return match string_data { + match string_data { Ok(string_data) => Ok(serde_json::from_str(string_data)?), Err(err) => Err(anyhow!("can't convert data to string: {err}")), - }; + } + } else { + Err(anyhow!( + "invalid type read from streams, expected BulkString" + )) } - Err(anyhow!( - "invalid type read from streams, expected BulkString" - )) } Err(err) => Err(err.into()), }