common_meta/heartbeat/
utils.rs1use api::v1::meta::heartbeat_request::NodeWorkloads;
16use api::v1::meta::mailbox_message::Payload;
17use api::v1::meta::{DatanodeWorkloads, MailboxMessage};
18use common_telemetry::warn;
19use common_time::util::current_time_millis;
20use common_workload::DatanodeWorkloadType;
21use snafu::{OptionExt, ResultExt};
22
23use crate::error::{self, Result};
24use crate::heartbeat::mailbox::{IncomingMessage, MessageMeta, OutgoingMessage};
25use crate::instruction::Instruction;
26
27pub fn mailbox_message_to_incoming_message(m: MailboxMessage) -> Result<IncomingMessage> {
28 m.payload
29 .map(|payload| match payload {
30 Payload::Json(json) => {
31 let instruction: Instruction = serde_json::from_str(&json)?;
32 Ok((
33 MessageMeta {
34 id: m.id,
35 subject: m.subject,
36 to: m.to,
37 from: m.from,
38 },
39 instruction,
40 ))
41 }
42 })
43 .transpose()
44 .context(error::DecodeJsonSnafu)?
45 .context(error::PayloadNotExistSnafu)
46}
47
48pub fn outgoing_message_to_mailbox_message(
49 (meta, reply): OutgoingMessage,
50) -> Result<MailboxMessage> {
51 Ok(MailboxMessage {
52 id: meta.id,
53 subject: meta.subject,
54 from: meta.to,
55 to: meta.from,
56 timestamp_millis: current_time_millis(),
57 payload: Some(Payload::Json(
58 serde_json::to_string(&reply).context(error::EncodeJsonSnafu)?,
59 )),
60 })
61}
62
63pub fn get_datanode_workloads(node_workloads: Option<&NodeWorkloads>) -> DatanodeWorkloads {
67 match node_workloads {
68 Some(NodeWorkloads::Datanode(datanode_workloads)) => {
69 let mut datanode_workloads = datanode_workloads.clone();
70 let unexpected_workloads = datanode_workloads
71 .types
72 .extract_if(.., |t| DatanodeWorkloadType::from_i32(*t).is_none())
73 .collect::<Vec<_>>();
74 if !unexpected_workloads.is_empty() {
75 warn!("Unexpected datanode workloads: {:?}", unexpected_workloads);
76 }
77 datanode_workloads
78 }
79 _ => DatanodeWorkloads {
80 types: vec![DatanodeWorkloadType::Hybrid.to_i32()],
81 },
82 }
83}
84
85#[cfg(test)]
86mod tests {
87 use super::*;
88
89 #[test]
90 fn test_get_datanode_workloads() {
91 let node_workloads = Some(NodeWorkloads::Datanode(DatanodeWorkloads {
92 types: vec![DatanodeWorkloadType::Hybrid.to_i32(), 100],
93 }));
94 let workloads = get_datanode_workloads(node_workloads.as_ref());
95 assert_eq!(workloads.types, vec![DatanodeWorkloadType::Hybrid.to_i32()]);
96 }
97}