Skip to main content

meta_srv/handler/
keep_lease_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::heartbeat_request::NodeWorkloads;
16use api::v1::meta::{HeartbeatRequest, Peer, Role};
17use common_meta::heartbeat::utils::{get_datanode_workloads, get_flownode_workloads};
18use common_meta::rpc::store::PutRequest;
19use common_telemetry::{trace, warn};
20use common_time::util as time_util;
21
22use crate::error::Result;
23use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
24use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue};
25use crate::metasrv::Context;
26
27/// Keeps [Datanode] leases
28pub struct DatanodeKeepLeaseHandler;
29
30#[async_trait::async_trait]
31impl HeartbeatHandler for DatanodeKeepLeaseHandler {
32    fn is_acceptable(&self, role: Role) -> bool {
33        role == Role::Datanode
34    }
35
36    async fn handle(
37        &self,
38        req: &HeartbeatRequest,
39        ctx: &mut Context,
40        _acc: &mut HeartbeatAccumulator,
41    ) -> Result<HandleControl> {
42        let HeartbeatRequest {
43            header,
44            peer,
45            node_workloads,
46            ..
47        } = req;
48        let Some(_header) = &header else {
49            return Ok(HandleControl::Continue);
50        };
51        let Some(peer) = &peer else {
52            return Ok(HandleControl::Continue);
53        };
54
55        let datanode_workloads = get_datanode_workloads(node_workloads.as_ref());
56        let key = DatanodeLeaseKey { node_id: peer.id };
57        let value = LeaseValue {
58            timestamp_millis: time_util::current_time_millis(),
59            node_addr: peer.addr.clone(),
60            workloads: NodeWorkloads::Datanode(datanode_workloads),
61        };
62
63        trace!("Receive a heartbeat from datanode: {key:?}, {value:?}");
64
65        let key = key.try_into()?;
66        let value = value.try_into()?;
67        put_into_memory_store(ctx, key, value, peer).await;
68
69        Ok(HandleControl::Continue)
70    }
71}
72
73/// Keeps [Flownode] leases
74pub struct FlownodeKeepLeaseHandler;
75
76#[async_trait::async_trait]
77impl HeartbeatHandler for FlownodeKeepLeaseHandler {
78    fn is_acceptable(&self, role: Role) -> bool {
79        role == Role::Flownode
80    }
81
82    async fn handle(
83        &self,
84        req: &HeartbeatRequest,
85        ctx: &mut Context,
86        _acc: &mut HeartbeatAccumulator,
87    ) -> Result<HandleControl> {
88        let HeartbeatRequest {
89            header,
90            peer,
91            node_workloads,
92            ..
93        } = req;
94        let Some(_header) = &header else {
95            return Ok(HandleControl::Continue);
96        };
97        let Some(peer) = &peer else {
98            return Ok(HandleControl::Continue);
99        };
100
101        let key = FlownodeLeaseKey { node_id: peer.id };
102        let value = LeaseValue {
103            timestamp_millis: time_util::current_time_millis(),
104            node_addr: peer.addr.clone(),
105            workloads: NodeWorkloads::Flownode(get_flownode_workloads(node_workloads.as_ref())),
106        };
107
108        trace!("Receive a heartbeat from flownode: {key:?}, {value:?}");
109
110        let key = key.try_into()?;
111        let value = value.try_into()?;
112        put_into_memory_store(ctx, key, value, peer).await;
113
114        Ok(HandleControl::Continue)
115    }
116}
117
118async fn put_into_memory_store(ctx: &mut Context, key: Vec<u8>, value: Vec<u8>, peer: &Peer) {
119    let put_req = PutRequest {
120        key,
121        value,
122        ..Default::default()
123    };
124
125    let res = ctx.in_memory.put(put_req).await;
126
127    if let Err(err) = res {
128        warn!(err; "Failed to update lease KV, peer: {peer:?}");
129    }
130}
131
132#[cfg(test)]
133mod tests {
134    use std::collections::HashMap;
135
136    use api::v1::meta::RequestHeader;
137    use common_meta::datanode::Stat;
138
139    use super::*;
140    use crate::discovery::utils::find_datanode_lease_value;
141    use crate::handler::test_utils::TestEnv;
142
143    #[tokio::test]
144    async fn test_put_into_memory_store() {
145        let env = TestEnv::new();
146        let ctx = env.ctx();
147
148        let handler = DatanodeKeepLeaseHandler;
149        handle_request_many_times(ctx.clone(), &handler, 1).await;
150
151        let in_memory = ctx.in_memory.clone() as _;
152        let lease_value = find_datanode_lease_value(&in_memory, 1)
153            .await
154            .unwrap()
155            .unwrap();
156        assert_eq!(lease_value.node_addr, "127.0.0.1:1");
157        assert!(lease_value.timestamp_millis != 0);
158    }
159
160    async fn handle_request_many_times(
161        mut ctx: Context,
162        handler: &DatanodeKeepLeaseHandler,
163        loop_times: i32,
164    ) {
165        let req = HeartbeatRequest {
166            header: Some(RequestHeader::new(1, Role::Datanode, HashMap::new())),
167            peer: Some(Peer::new(1, "127.0.0.1:1")),
168            ..Default::default()
169        };
170
171        for i in 1..=loop_times {
172            let mut acc = HeartbeatAccumulator {
173                stat: Some(Stat {
174                    id: 101,
175                    region_num: i as _,
176                    ..Default::default()
177                }),
178                ..Default::default()
179            };
180            handler.handle(&req, &mut ctx, &mut acc).await.unwrap();
181        }
182    }
183}