common_meta/heartbeat/
utils.rs1use api::v1::meta::heartbeat_request::NodeWorkloads;
16use api::v1::meta::mailbox_message::Payload;
17use api::v1::meta::{DatanodeWorkloads, MailboxMessage};
18use common_telemetry::tracing_context::TracingContext;
19use common_telemetry::warn;
20use common_time::util::current_time_millis;
21use common_workload::DatanodeWorkloadType;
22use snafu::{OptionExt, ResultExt};
23
24use crate::error::{self, Result};
25use crate::heartbeat::mailbox::{IncomingMessage, MessageMeta, OutgoingMessage};
26use crate::instruction::Instruction;
27
28pub fn mailbox_message_to_incoming_message(m: MailboxMessage) -> Result<IncomingMessage> {
29 m.payload
30 .map(|payload| match payload {
31 Payload::Json(json) => {
32 let instruction: Instruction = serde_json::from_str(&json)?;
33 Ok((
34 MessageMeta {
35 id: m.id,
36 subject: m.subject,
37 to: m.to,
38 from: m.from,
39 },
40 m.header
41 .map(|h| TracingContext::from_w3c(&h.tracing_context))
42 .unwrap_or_default(),
43 instruction,
44 ))
45 }
46 })
47 .transpose()
48 .context(error::DecodeJsonSnafu)?
49 .context(error::PayloadNotExistSnafu)
50}
51
52pub fn outgoing_message_to_mailbox_message(
53 (meta, reply): OutgoingMessage,
54) -> Result<MailboxMessage> {
55 Ok(MailboxMessage {
56 header: None,
60 id: meta.id,
61 subject: meta.subject,
62 from: meta.to,
63 to: meta.from,
64 timestamp_millis: current_time_millis(),
65 payload: Some(Payload::Json(
66 serde_json::to_string(&reply).context(error::EncodeJsonSnafu)?,
67 )),
68 })
69}
70
71pub fn get_datanode_workloads(node_workloads: Option<&NodeWorkloads>) -> DatanodeWorkloads {
75 match node_workloads {
76 Some(NodeWorkloads::Datanode(datanode_workloads)) => {
77 let mut datanode_workloads = datanode_workloads.clone();
78 let unexpected_workloads = datanode_workloads
79 .types
80 .extract_if(.., |t| DatanodeWorkloadType::from_i32(*t).is_none())
81 .collect::<Vec<_>>();
82 if !unexpected_workloads.is_empty() {
83 warn!("Unexpected datanode workloads: {:?}", unexpected_workloads);
84 }
85 datanode_workloads
86 }
87 _ => DatanodeWorkloads {
88 types: vec![DatanodeWorkloadType::Hybrid.to_i32()],
89 },
90 }
91}
92
93#[cfg(test)]
94mod tests {
95 use super::*;
96
97 #[test]
98 fn test_get_datanode_workloads() {
99 let node_workloads = Some(NodeWorkloads::Datanode(DatanodeWorkloads {
100 types: vec![DatanodeWorkloadType::Hybrid.to_i32(), 100],
101 }));
102 let workloads = get_datanode_workloads(node_workloads.as_ref());
103 assert_eq!(workloads.types, vec![DatanodeWorkloadType::Hybrid.to_i32()]);
104 }
105}