refactor read_stream
This commit is contained in:
parent
9a2304c8b0
commit
291cc55c4d
1 changed files with 13 additions and 14 deletions
|
@ -3,7 +3,7 @@ use log::error;
|
||||||
use redis::aio::MultiplexedConnection;
|
use redis::aio::MultiplexedConnection;
|
||||||
use redis::streams::StreamReadReply;
|
use redis::streams::StreamReadReply;
|
||||||
use redis::Value::BulkString;
|
use redis::Value::BulkString;
|
||||||
use redis::{AsyncCommands, RedisError, RedisResult};
|
use redis::{AsyncCommands, RedisError, RedisResult, Value};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
pub struct RedisService {
|
pub struct RedisService {
|
||||||
|
@ -111,23 +111,22 @@ impl RedisService {
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
Ok(data) => {
|
Ok(data) => {
|
||||||
if data.keys.is_empty() {
|
let stream_data: Option<&Value> = data
|
||||||
return Err(anyhow!("read stream entry with empty keys"));
|
.keys
|
||||||
}
|
.first()
|
||||||
if data.keys[0].ids.is_empty() {
|
.and_then(|f| f.ids.first().and_then(|i| i.map.get("data")));
|
||||||
return Err(anyhow!("read stream entry with empty ids"));
|
|
||||||
}
|
if let Some(BulkString(data)) = stream_data {
|
||||||
let stream = data.keys[0].ids[0].map.get("data");
|
|
||||||
if let Some(BulkString(data)) = stream {
|
|
||||||
let string_data = std::str::from_utf8(data);
|
let string_data = std::str::from_utf8(data);
|
||||||
return match string_data {
|
match string_data {
|
||||||
Ok(string_data) => Ok(serde_json::from_str(string_data)?),
|
Ok(string_data) => Ok(serde_json::from_str(string_data)?),
|
||||||
Err(err) => Err(anyhow!("can't convert data to string: {err}")),
|
Err(err) => Err(anyhow!("can't convert data to string: {err}")),
|
||||||
};
|
}
|
||||||
|
} else {
|
||||||
|
Err(anyhow!(
|
||||||
|
"invalid type read from streams, expected BulkString"
|
||||||
|
))
|
||||||
}
|
}
|
||||||
Err(anyhow!(
|
|
||||||
"invalid type read from streams, expected BulkString"
|
|
||||||
))
|
|
||||||
}
|
}
|
||||||
Err(err) => Err(err.into()),
|
Err(err) => Err(err.into()),
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue