Skip to main content

common_meta/heartbeat/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::heartbeat_request::NodeWorkloads;
16use api::v1::meta::mailbox_message::Payload;
17use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads, MailboxMessage};
18use common_telemetry::tracing_context::TracingContext;
19use common_telemetry::warn;
20use common_time::util::current_time_millis;
21use common_workload::DatanodeWorkloadType;
22use snafu::{OptionExt, ResultExt};
23
24use crate::error::{self, Result};
25use crate::heartbeat::mailbox::{IncomingMessage, MessageMeta, OutgoingMessage};
26use crate::instruction::Instruction;
27
28pub fn mailbox_message_to_incoming_message(m: MailboxMessage) -> Result<IncomingMessage> {
29    m.payload
30        .map(|payload| match payload {
31            Payload::Json(json) => {
32                let instruction: Instruction = serde_json::from_str(&json)?;
33                Ok((
34                    MessageMeta {
35                        id: m.id,
36                        subject: m.subject,
37                        to: m.to,
38                        from: m.from,
39                    },
40                    m.header
41                        .map(|h| TracingContext::from_w3c(&h.tracing_context))
42                        .unwrap_or_default(),
43                    instruction,
44                ))
45            }
46        })
47        .transpose()
48        .context(error::DecodeJsonSnafu)?
49        .context(error::PayloadNotExistSnafu)
50}
51
52pub fn outgoing_message_to_mailbox_message(
53    (meta, reply): OutgoingMessage,
54) -> Result<MailboxMessage> {
55    Ok(MailboxMessage {
56        // Design note: mailbox communication is one-way for tracing propagation.
57        // We only carry tracing headers on incoming command messages, and replies
58        // intentionally do not echo/propagate tracing context.
59        header: None,
60        id: meta.id,
61        subject: meta.subject,
62        from: meta.to,
63        to: meta.from,
64        timestamp_millis: current_time_millis(),
65        payload: Some(Payload::Json(
66            serde_json::to_string(&reply).context(error::EncodeJsonSnafu)?,
67        )),
68    })
69}
70
71/// Extracts datanode workloads from the provided optional `NodeWorkloads`.
72///
73/// Returns default datanode workloads if the input is `None`.
74pub fn get_datanode_workloads(node_workloads: Option<&NodeWorkloads>) -> DatanodeWorkloads {
75    match node_workloads {
76        Some(NodeWorkloads::Datanode(datanode_workloads)) => {
77            let mut datanode_workloads = datanode_workloads.clone();
78            let unexpected_workloads = datanode_workloads
79                .types
80                .extract_if(.., |t| DatanodeWorkloadType::from_i32(*t).is_none())
81                .collect::<Vec<_>>();
82            if !unexpected_workloads.is_empty() {
83                warn!("Unexpected datanode workloads: {:?}", unexpected_workloads);
84            }
85            datanode_workloads
86        }
87        _ => DatanodeWorkloads {
88            types: vec![DatanodeWorkloadType::Hybrid.to_i32()],
89        },
90    }
91}
92
93/// Extracts flownode workloads from the provided optional `NodeWorkloads`.
94///
95/// Returns empty flownode workloads if the input is `None` or not a flownode payload.
96pub fn get_flownode_workloads(node_workloads: Option<&NodeWorkloads>) -> FlownodeWorkloads {
97    match node_workloads {
98        Some(NodeWorkloads::Flownode(flownode_workloads)) => flownode_workloads.clone(),
99        _ => FlownodeWorkloads { types: vec![] },
100    }
101}
102
103#[cfg(test)]
104mod tests {
105    use super::*;
106
107    #[test]
108    fn test_get_datanode_workloads() {
109        let node_workloads = Some(NodeWorkloads::Datanode(DatanodeWorkloads {
110            types: vec![DatanodeWorkloadType::Hybrid.to_i32(), 100],
111        }));
112        let workloads = get_datanode_workloads(node_workloads.as_ref());
113        assert_eq!(workloads.types, vec![DatanodeWorkloadType::Hybrid.to_i32()]);
114    }
115
116    #[test]
117    fn test_get_flownode_workloads() {
118        let node_workloads = Some(NodeWorkloads::Flownode(FlownodeWorkloads {
119            types: vec![7],
120        }));
121        let workloads = get_flownode_workloads(node_workloads.as_ref());
122        assert_eq!(workloads.types, vec![7]);
123
124        let workloads = get_flownode_workloads(None);
125        assert!(workloads.types.is_empty());
126    }
127}