common_meta/heartbeat/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::heartbeat_request::NodeWorkloads;
16use api::v1::meta::mailbox_message::Payload;
17use api::v1::meta::{DatanodeWorkloads, MailboxMessage};
18use common_telemetry::tracing_context::TracingContext;
19use common_telemetry::warn;
20use common_time::util::current_time_millis;
21use common_workload::DatanodeWorkloadType;
22use snafu::{OptionExt, ResultExt};
23
24use crate::error::{self, Result};
25use crate::heartbeat::mailbox::{IncomingMessage, MessageMeta, OutgoingMessage};
26use crate::instruction::Instruction;
27
28pub fn mailbox_message_to_incoming_message(m: MailboxMessage) -> Result<IncomingMessage> {
29    m.payload
30        .map(|payload| match payload {
31            Payload::Json(json) => {
32                let instruction: Instruction = serde_json::from_str(&json)?;
33                Ok((
34                    MessageMeta {
35                        id: m.id,
36                        subject: m.subject,
37                        to: m.to,
38                        from: m.from,
39                    },
40                    m.header
41                        .map(|h| TracingContext::from_w3c(&h.tracing_context))
42                        .unwrap_or_default(),
43                    instruction,
44                ))
45            }
46        })
47        .transpose()
48        .context(error::DecodeJsonSnafu)?
49        .context(error::PayloadNotExistSnafu)
50}
51
52pub fn outgoing_message_to_mailbox_message(
53    (meta, reply): OutgoingMessage,
54) -> Result<MailboxMessage> {
55    Ok(MailboxMessage {
56        // Design note: mailbox communication is one-way for tracing propagation.
57        // We only carry tracing headers on incoming command messages, and replies
58        // intentionally do not echo/propagate tracing context.
59        header: None,
60        id: meta.id,
61        subject: meta.subject,
62        from: meta.to,
63        to: meta.from,
64        timestamp_millis: current_time_millis(),
65        payload: Some(Payload::Json(
66            serde_json::to_string(&reply).context(error::EncodeJsonSnafu)?,
67        )),
68    })
69}
70
71/// Extracts datanode workloads from the provided optional `NodeWorkloads`.
72///
73/// Returns default datanode workloads if the input is `None`.
74pub fn get_datanode_workloads(node_workloads: Option<&NodeWorkloads>) -> DatanodeWorkloads {
75    match node_workloads {
76        Some(NodeWorkloads::Datanode(datanode_workloads)) => {
77            let mut datanode_workloads = datanode_workloads.clone();
78            let unexpected_workloads = datanode_workloads
79                .types
80                .extract_if(.., |t| DatanodeWorkloadType::from_i32(*t).is_none())
81                .collect::<Vec<_>>();
82            if !unexpected_workloads.is_empty() {
83                warn!("Unexpected datanode workloads: {:?}", unexpected_workloads);
84            }
85            datanode_workloads
86        }
87        _ => DatanodeWorkloads {
88            types: vec![DatanodeWorkloadType::Hybrid.to_i32()],
89        },
90    }
91}
92
93#[cfg(test)]
94mod tests {
95    use super::*;
96
97    #[test]
98    fn test_get_datanode_workloads() {
99        let node_workloads = Some(NodeWorkloads::Datanode(DatanodeWorkloads {
100            types: vec![DatanodeWorkloadType::Hybrid.to_i32(), 100],
101        }));
102        let workloads = get_datanode_workloads(node_workloads.as_ref());
103        assert_eq!(workloads.types, vec![DatanodeWorkloadType::Hybrid.to_i32()]);
104    }
105}