meta_srv/handler/
collect_stats_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16
17use api::v1::meta::{HeartbeatRequest, Role};
18use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, Stat};
19use common_meta::instruction::CacheIdent;
20use common_meta::key::node_address::{NodeAddressKey, NodeAddressValue};
21use common_meta::key::{MetadataKey, MetadataValue};
22use common_meta::peer::Peer;
23use common_meta::rpc::store::PutRequest;
24use common_telemetry::{error, info, warn};
25use dashmap::DashMap;
26use snafu::ResultExt;
27
28use crate::error::{self, Result};
29use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
30use crate::metasrv::Context;
31
32#[derive(Debug, Default)]
33struct EpochStats {
34    stats: Vec<Stat>,
35    epoch: Option<u64>,
36}
37
38impl EpochStats {
39    #[inline]
40    fn drain_all(&mut self) -> Vec<Stat> {
41        self.stats.drain(..).collect()
42    }
43
44    #[inline]
45    fn clear_stats(&mut self) {
46        self.stats.clear();
47    }
48
49    #[inline]
50    fn push_stat(&mut self, stat: Stat) {
51        self.stats.push(stat);
52    }
53
54    #[inline]
55    fn len(&self) -> usize {
56        self.stats.len()
57    }
58
59    #[inline]
60    fn epoch(&self) -> Option<u64> {
61        self.epoch
62    }
63
64    #[inline]
65    fn set_epoch(&mut self, epoch: u64) {
66        self.epoch = Some(epoch);
67    }
68}
69
70const DEFAULT_FLUSH_STATS_FACTOR: usize = 3;
71
72pub struct CollectStatsHandler {
73    stats_cache: DashMap<DatanodeStatKey, EpochStats>,
74    flush_stats_factor: usize,
75}
76
77impl Default for CollectStatsHandler {
78    fn default() -> Self {
79        Self::new(None)
80    }
81}
82
83impl CollectStatsHandler {
84    pub fn new(flush_stats_factor: Option<usize>) -> Self {
85        Self {
86            flush_stats_factor: flush_stats_factor.unwrap_or(DEFAULT_FLUSH_STATS_FACTOR),
87            stats_cache: DashMap::default(),
88        }
89    }
90}
91
92#[async_trait::async_trait]
93impl HeartbeatHandler for CollectStatsHandler {
94    fn is_acceptable(&self, role: Role) -> bool {
95        role == Role::Datanode
96    }
97
98    async fn handle(
99        &self,
100        _req: &HeartbeatRequest,
101        ctx: &mut Context,
102        acc: &mut HeartbeatAccumulator,
103    ) -> Result<HandleControl> {
104        let Some(current_stat) = acc.stat.take() else {
105            return Ok(HandleControl::Continue);
106        };
107
108        let key = current_stat.stat_key();
109        let mut entry = self.stats_cache.entry(key).or_default();
110
111        let key: Vec<u8> = key.into();
112        let epoch_stats = entry.value_mut();
113
114        let refresh = if let Some(epoch) = epoch_stats.epoch() {
115            match current_stat.node_epoch.cmp(&epoch) {
116                Ordering::Greater => {
117                    // This node may have been redeployed.
118                    epoch_stats.clear_stats();
119                    epoch_stats.set_epoch(current_stat.node_epoch);
120                    epoch_stats.push_stat(current_stat);
121                    true
122                }
123                Ordering::Equal => {
124                    epoch_stats.push_stat(current_stat);
125                    false
126                }
127                Ordering::Less => {
128                    warn!("Ignore stale heartbeat: {:?}", current_stat);
129                    false
130                }
131            }
132        } else {
133            epoch_stats.set_epoch(current_stat.node_epoch);
134            epoch_stats.push_stat(current_stat);
135            // If the epoch is empty, it indicates that the current node sending the heartbeat
136            // for the first time to the current meta leader, so it is necessary to save
137            // the data to the KV store as soon as possible.
138            true
139        };
140
141        // Need to refresh the [datanode -> address] mapping
142        if refresh {
143            // Safety: `epoch_stats.stats` is not empty
144            let last = epoch_stats.stats.last().unwrap();
145            rewrite_node_address(ctx, last).await;
146        }
147
148        if !refresh && epoch_stats.len() < self.flush_stats_factor {
149            return Ok(HandleControl::Continue);
150        }
151
152        let value: Vec<u8> = DatanodeStatValue {
153            stats: epoch_stats.drain_all(),
154        }
155        .try_into()
156        .context(error::InvalidDatanodeStatFormatSnafu {})?;
157        let put = PutRequest {
158            key,
159            value,
160            prev_kv: false,
161        };
162
163        let _ = ctx
164            .in_memory
165            .put(put)
166            .await
167            .context(error::KvBackendSnafu)?;
168
169        Ok(HandleControl::Continue)
170    }
171}
172
173async fn rewrite_node_address(ctx: &mut Context, stat: &Stat) {
174    let peer = Peer {
175        id: stat.id,
176        addr: stat.addr.clone(),
177    };
178    let key = NodeAddressKey::with_datanode(peer.id).to_bytes();
179    if let Ok(value) = NodeAddressValue::new(peer.clone()).try_as_raw_value() {
180        let put = PutRequest {
181            key,
182            value,
183            prev_kv: false,
184        };
185
186        match ctx.leader_cached_kv_backend.put(put).await {
187            Ok(_) => {
188                info!(
189                    "Successfully updated datanode `NodeAddressValue`: {:?}",
190                    peer
191                );
192                // broadcast invalidating cache
193                let cache_idents = stat
194                    .table_ids()
195                    .into_iter()
196                    .map(CacheIdent::TableId)
197                    .collect::<Vec<_>>();
198                if let Err(e) = ctx
199                    .cache_invalidator
200                    .invalidate(&Default::default(), &cache_idents)
201                    .await
202                {
203                    error!(e; "Failed to invalidate {} `NodeAddressKey` cache, peer: {:?}", cache_idents.len(), peer);
204                }
205            }
206            Err(e) => {
207                error!(e; "Failed to update datanode `NodeAddressValue`: {:?}", peer);
208            }
209        }
210    } else {
211        warn!(
212            "Failed to serialize datanode `NodeAddressValue`: {:?}",
213            peer
214        );
215    }
216}
217
218#[cfg(test)]
219mod tests {
220    use std::sync::Arc;
221
222    use common_meta::cache_invalidator::DummyCacheInvalidator;
223    use common_meta::datanode::DatanodeStatKey;
224    use common_meta::key::TableMetadataManager;
225    use common_meta::kv_backend::memory::MemoryKvBackend;
226    use common_meta::region_registry::LeaderRegionRegistry;
227    use common_meta::sequence::SequenceBuilder;
228
229    use super::*;
230    use crate::cluster::MetaPeerClientBuilder;
231    use crate::handler::{HeartbeatMailbox, Pushers};
232    use crate::service::store::cached_kv::LeaderCachedKvBackend;
233
234    #[tokio::test]
235    async fn test_handle_datanode_stats() {
236        let in_memory = Arc::new(MemoryKvBackend::new());
237        let kv_backend = Arc::new(MemoryKvBackend::new());
238        let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader(
239            kv_backend.clone(),
240        ));
241        let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build();
242        let mailbox = HeartbeatMailbox::create(Pushers::default(), seq);
243        let meta_peer_client = MetaPeerClientBuilder::default()
244            .election(None)
245            .in_memory(in_memory.clone())
246            .build()
247            .map(Arc::new)
248            // Safety: all required fields set at initialization
249            .unwrap();
250        let ctx = Context {
251            server_addr: "127.0.0.1:0000".to_string(),
252            in_memory,
253            kv_backend: kv_backend.clone(),
254            leader_cached_kv_backend,
255            meta_peer_client,
256            mailbox,
257            election: None,
258            is_infancy: false,
259            table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
260            cache_invalidator: Arc::new(DummyCacheInvalidator),
261            leader_region_registry: Arc::new(LeaderRegionRegistry::new()),
262        };
263
264        let handler = CollectStatsHandler::default();
265        handle_request_many_times(ctx.clone(), &handler, 1).await;
266
267        let key = DatanodeStatKey { node_id: 101 };
268        let key: Vec<u8> = key.into();
269        let res = ctx.in_memory.get(&key).await.unwrap();
270        let kv = res.unwrap();
271        let key: DatanodeStatKey = kv.key.clone().try_into().unwrap();
272        assert_eq!(101, key.node_id);
273        let val: DatanodeStatValue = kv.value.try_into().unwrap();
274        // first new stat must be set in kv store immediately
275        assert_eq!(1, val.stats.len());
276        assert_eq!(1, val.stats[0].region_num);
277
278        handle_request_many_times(ctx.clone(), &handler, 10).await;
279
280        let key: Vec<u8> = key.into();
281        let res = ctx.in_memory.get(&key).await.unwrap();
282        let kv = res.unwrap();
283        let val: DatanodeStatValue = kv.value.try_into().unwrap();
284        assert_eq!(handler.flush_stats_factor, val.stats.len());
285    }
286
287    async fn handle_request_many_times(
288        mut ctx: Context,
289        handler: &CollectStatsHandler,
290        loop_times: i32,
291    ) {
292        let req = HeartbeatRequest::default();
293        for i in 1..=loop_times {
294            let mut acc = HeartbeatAccumulator {
295                stat: Some(Stat {
296                    id: 101,
297                    region_num: i as _,
298                    ..Default::default()
299                }),
300                ..Default::default()
301            };
302            handler.handle(&req, &mut ctx, &mut acc).await.unwrap();
303        }
304    }
305}