meta_srv/handler/
keep_lease_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::heartbeat_request::NodeWorkloads;
16use api::v1::meta::{FlownodeWorkloads, HeartbeatRequest, Peer, Role};
17use common_meta::heartbeat::utils::get_datanode_workloads;
18use common_meta::rpc::store::PutRequest;
19use common_telemetry::{trace, warn};
20use common_time::util as time_util;
21
22use crate::error::Result;
23use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
24use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue};
25use crate::metasrv::Context;
26
27/// Keeps [Datanode] leases
28pub struct DatanodeKeepLeaseHandler;
29
30#[async_trait::async_trait]
31impl HeartbeatHandler for DatanodeKeepLeaseHandler {
32    fn is_acceptable(&self, role: Role) -> bool {
33        role == Role::Datanode
34    }
35
36    async fn handle(
37        &self,
38        req: &HeartbeatRequest,
39        ctx: &mut Context,
40        _acc: &mut HeartbeatAccumulator,
41    ) -> Result<HandleControl> {
42        let HeartbeatRequest {
43            header,
44            peer,
45            node_workloads,
46            ..
47        } = req;
48        let Some(_header) = &header else {
49            return Ok(HandleControl::Continue);
50        };
51        let Some(peer) = &peer else {
52            return Ok(HandleControl::Continue);
53        };
54
55        let datanode_workloads = get_datanode_workloads(node_workloads.as_ref());
56        let key = DatanodeLeaseKey { node_id: peer.id };
57        let value = LeaseValue {
58            timestamp_millis: time_util::current_time_millis(),
59            node_addr: peer.addr.clone(),
60            workloads: NodeWorkloads::Datanode(datanode_workloads),
61        };
62
63        trace!("Receive a heartbeat from datanode: {key:?}, {value:?}");
64
65        let key = key.try_into()?;
66        let value = value.try_into()?;
67        put_into_memory_store(ctx, key, value, peer).await;
68
69        Ok(HandleControl::Continue)
70    }
71}
72
73/// Keeps [Flownode] leases
74pub struct FlownodeKeepLeaseHandler;
75
76#[async_trait::async_trait]
77impl HeartbeatHandler for FlownodeKeepLeaseHandler {
78    fn is_acceptable(&self, role: Role) -> bool {
79        role == Role::Flownode
80    }
81
82    async fn handle(
83        &self,
84        req: &HeartbeatRequest,
85        ctx: &mut Context,
86        _acc: &mut HeartbeatAccumulator,
87    ) -> Result<HandleControl> {
88        let HeartbeatRequest { header, peer, .. } = req;
89        let Some(_header) = &header else {
90            return Ok(HandleControl::Continue);
91        };
92        let Some(peer) = &peer else {
93            return Ok(HandleControl::Continue);
94        };
95
96        let key = FlownodeLeaseKey { node_id: peer.id };
97        let value = LeaseValue {
98            timestamp_millis: time_util::current_time_millis(),
99            node_addr: peer.addr.clone(),
100            workloads: NodeWorkloads::Flownode(FlownodeWorkloads { types: vec![] }),
101        };
102
103        trace!("Receive a heartbeat from flownode: {key:?}, {value:?}");
104
105        let key = key.try_into()?;
106        let value = value.try_into()?;
107        put_into_memory_store(ctx, key, value, peer).await;
108
109        Ok(HandleControl::Continue)
110    }
111}
112
113async fn put_into_memory_store(ctx: &mut Context, key: Vec<u8>, value: Vec<u8>, peer: &Peer) {
114    let put_req = PutRequest {
115        key,
116        value,
117        ..Default::default()
118    };
119
120    let res = ctx.in_memory.put(put_req).await;
121
122    if let Err(err) = res {
123        warn!(err; "Failed to update lease KV, peer: {peer:?}");
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use std::collections::HashMap;
130    use std::sync::Arc;
131
132    use api::v1::meta::RequestHeader;
133    use common_meta::cache_invalidator::DummyCacheInvalidator;
134    use common_meta::datanode::Stat;
135    use common_meta::key::TableMetadataManager;
136    use common_meta::kv_backend::memory::MemoryKvBackend;
137    use common_meta::region_registry::LeaderRegionRegistry;
138    use common_meta::sequence::SequenceBuilder;
139
140    use super::*;
141    use crate::cluster::MetaPeerClientBuilder;
142    use crate::handler::{HeartbeatMailbox, Pushers};
143    use crate::lease::find_datanode_lease_value;
144    use crate::service::store::cached_kv::LeaderCachedKvBackend;
145
146    #[tokio::test]
147    async fn test_put_into_memory_store() {
148        let in_memory = Arc::new(MemoryKvBackend::new());
149        let kv_backend = Arc::new(MemoryKvBackend::new());
150        let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader(
151            kv_backend.clone(),
152        ));
153        let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build();
154        let mailbox = HeartbeatMailbox::create(Pushers::default(), seq);
155        let meta_peer_client = MetaPeerClientBuilder::default()
156            .election(None)
157            .in_memory(in_memory.clone())
158            .build()
159            .map(Arc::new)
160            // Safety: all required fields set at initialization
161            .unwrap();
162        let ctx = Context {
163            server_addr: "127.0.0.1:0000".to_string(),
164            in_memory,
165            kv_backend: kv_backend.clone(),
166            leader_cached_kv_backend,
167            meta_peer_client,
168            mailbox,
169            election: None,
170            is_infancy: false,
171            table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
172            cache_invalidator: Arc::new(DummyCacheInvalidator),
173            leader_region_registry: Arc::new(LeaderRegionRegistry::new()),
174        };
175
176        let handler = DatanodeKeepLeaseHandler;
177        handle_request_many_times(ctx.clone(), &handler, 1).await;
178
179        let lease_value = find_datanode_lease_value(1, &ctx.in_memory)
180            .await
181            .unwrap()
182            .unwrap();
183        assert_eq!(lease_value.node_addr, "127.0.0.1:1");
184        assert!(lease_value.timestamp_millis != 0);
185    }
186
187    async fn handle_request_many_times(
188        mut ctx: Context,
189        handler: &DatanodeKeepLeaseHandler,
190        loop_times: i32,
191    ) {
192        let req = HeartbeatRequest {
193            header: Some(RequestHeader::new(1, Role::Datanode, HashMap::new())),
194            peer: Some(Peer::new(1, "127.0.0.1:1")),
195            ..Default::default()
196        };
197
198        for i in 1..=loop_times {
199            let mut acc = HeartbeatAccumulator {
200                stat: Some(Stat {
201                    id: 101,
202                    region_num: i as _,
203                    ..Default::default()
204                }),
205                ..Default::default()
206            };
207            handler.handle(&req, &mut ctx, &mut acc).await.unwrap();
208        }
209    }
210}