diff --git a/infrastructure/src/redis.rs b/infrastructure/src/redis.rs index f4b5a33..7e2735f 100644 --- a/infrastructure/src/redis.rs +++ b/infrastructure/src/redis.rs @@ -3,7 +3,7 @@ use log::error; use redis::aio::MultiplexedConnection; use redis::streams::StreamReadReply; use redis::Value::BulkString; -use redis::{AsyncCommands, RedisError, RedisResult}; +use redis::{AsyncCommands, RedisError, RedisResult, Value}; use serde::{Deserialize, Serialize}; pub struct RedisService { @@ -111,23 +111,22 @@ impl RedisService { match result { Ok(data) => { - if data.keys.is_empty() { - return Err(anyhow!("read stream entry with empty keys")); - } - if data.keys[0].ids.is_empty() { - return Err(anyhow!("read stream entry with empty ids")); - } - let stream = data.keys[0].ids[0].map.get("data"); - if let Some(BulkString(data)) = stream { + let stream_data: Option<&Value> = data + .keys + .first() + .and_then(|f| f.ids.first().and_then(|i| i.map.get("data"))); + + if let Some(BulkString(data)) = stream_data { let string_data = std::str::from_utf8(data); - return match string_data { + match string_data { Ok(string_data) => Ok(serde_json::from_str(string_data)?), Err(err) => Err(anyhow!("can't convert data to string: {err}")), - }; + } + } else { + Err(anyhow!( + "invalid type read from streams, expected BulkString" + )) } - Err(anyhow!( - "invalid type read from streams, expected BulkString" - )) } Err(err) => Err(err.into()), }