common_meta/heartbeat/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::heartbeat_request::NodeWorkloads;
16use api::v1::meta::mailbox_message::Payload;
17use api::v1::meta::{DatanodeWorkloads, MailboxMessage};
18use common_telemetry::warn;
19use common_time::util::current_time_millis;
20use common_workload::DatanodeWorkloadType;
21use snafu::{OptionExt, ResultExt};
22
23use crate::error::{self, Result};
24use crate::heartbeat::mailbox::{IncomingMessage, MessageMeta, OutgoingMessage};
25use crate::instruction::Instruction;
26
27pub fn mailbox_message_to_incoming_message(m: MailboxMessage) -> Result<IncomingMessage> {
28    m.payload
29        .map(|payload| match payload {
30            Payload::Json(json) => {
31                let instruction: Instruction = serde_json::from_str(&json)?;
32                Ok((
33                    MessageMeta {
34                        id: m.id,
35                        subject: m.subject,
36                        to: m.to,
37                        from: m.from,
38                    },
39                    instruction,
40                ))
41            }
42        })
43        .transpose()
44        .context(error::DecodeJsonSnafu)?
45        .context(error::PayloadNotExistSnafu)
46}
47
48pub fn outgoing_message_to_mailbox_message(
49    (meta, reply): OutgoingMessage,
50) -> Result<MailboxMessage> {
51    Ok(MailboxMessage {
52        id: meta.id,
53        subject: meta.subject,
54        from: meta.to,
55        to: meta.from,
56        timestamp_millis: current_time_millis(),
57        payload: Some(Payload::Json(
58            serde_json::to_string(&reply).context(error::EncodeJsonSnafu)?,
59        )),
60    })
61}
62
63/// Extracts datanode workloads from the provided optional `NodeWorkloads`.
64///
65/// Returns default datanode workloads if the input is `None`.
66pub fn get_datanode_workloads(node_workloads: Option<&NodeWorkloads>) -> DatanodeWorkloads {
67    match node_workloads {
68        Some(NodeWorkloads::Datanode(datanode_workloads)) => {
69            let mut datanode_workloads = datanode_workloads.clone();
70            let unexpected_workloads = datanode_workloads
71                .types
72                .extract_if(.., |t| DatanodeWorkloadType::from_i32(*t).is_none())
73                .collect::<Vec<_>>();
74            if !unexpected_workloads.is_empty() {
75                warn!("Unexpected datanode workloads: {:?}", unexpected_workloads);
76            }
77            datanode_workloads
78        }
79        _ => DatanodeWorkloads {
80            types: vec![DatanodeWorkloadType::Hybrid.to_i32()],
81        },
82    }
83}
84
85#[cfg(test)]
86mod tests {
87    use super::*;
88
89    #[test]
90    fn test_get_datanode_workloads() {
91        let node_workloads = Some(NodeWorkloads::Datanode(DatanodeWorkloads {
92            types: vec![DatanodeWorkloadType::Hybrid.to_i32(), 100],
93        }));
94        let workloads = get_datanode_workloads(node_workloads.as_ref());
95        assert_eq!(workloads.types, vec![DatanodeWorkloadType::Hybrid.to_i32()]);
96    }
97}