meta_srv/handler/
collect_stats_handler.rs1use std::cmp::Ordering;
16
17use api::v1::meta::{HeartbeatRequest, Role};
18use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, Stat};
19use common_meta::instruction::CacheIdent;
20use common_meta::key::node_address::{NodeAddressKey, NodeAddressValue};
21use common_meta::key::{MetadataKey, MetadataValue};
22use common_meta::peer::Peer;
23use common_meta::rpc::store::PutRequest;
24use common_telemetry::{error, info, warn};
25use dashmap::DashMap;
26use snafu::ResultExt;
27
28use crate::error::{self, Result};
29use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
30use crate::metasrv::Context;
31
32#[derive(Debug, Default)]
33struct EpochStats {
34 stats: Vec<Stat>,
35 epoch: Option<u64>,
36}
37
38impl EpochStats {
39 #[inline]
40 fn drain_all(&mut self) -> Vec<Stat> {
41 self.stats.drain(..).collect()
42 }
43
44 #[inline]
45 fn clear_stats(&mut self) {
46 self.stats.clear();
47 }
48
49 #[inline]
50 fn push_stat(&mut self, stat: Stat) {
51 self.stats.push(stat);
52 }
53
54 #[inline]
55 fn len(&self) -> usize {
56 self.stats.len()
57 }
58
59 #[inline]
60 fn epoch(&self) -> Option<u64> {
61 self.epoch
62 }
63
64 #[inline]
65 fn set_epoch(&mut self, epoch: u64) {
66 self.epoch = Some(epoch);
67 }
68}
69
70const DEFAULT_FLUSH_STATS_FACTOR: usize = 3;
71
72pub struct CollectStatsHandler {
73 stats_cache: DashMap<DatanodeStatKey, EpochStats>,
74 flush_stats_factor: usize,
75}
76
77impl Default for CollectStatsHandler {
78 fn default() -> Self {
79 Self::new(None)
80 }
81}
82
83impl CollectStatsHandler {
84 pub fn new(flush_stats_factor: Option<usize>) -> Self {
85 Self {
86 flush_stats_factor: flush_stats_factor.unwrap_or(DEFAULT_FLUSH_STATS_FACTOR),
87 stats_cache: DashMap::default(),
88 }
89 }
90}
91
92#[async_trait::async_trait]
93impl HeartbeatHandler for CollectStatsHandler {
94 fn is_acceptable(&self, role: Role) -> bool {
95 role == Role::Datanode
96 }
97
98 async fn handle(
99 &self,
100 _req: &HeartbeatRequest,
101 ctx: &mut Context,
102 acc: &mut HeartbeatAccumulator,
103 ) -> Result<HandleControl> {
104 let Some(current_stat) = acc.stat.take() else {
105 return Ok(HandleControl::Continue);
106 };
107
108 let key = current_stat.stat_key();
109 let mut entry = self.stats_cache.entry(key).or_default();
110
111 let key: Vec<u8> = key.into();
112 let epoch_stats = entry.value_mut();
113
114 let refresh = if let Some(epoch) = epoch_stats.epoch() {
115 match current_stat.node_epoch.cmp(&epoch) {
116 Ordering::Greater => {
117 epoch_stats.clear_stats();
119 epoch_stats.set_epoch(current_stat.node_epoch);
120 epoch_stats.push_stat(current_stat);
121 true
122 }
123 Ordering::Equal => {
124 epoch_stats.push_stat(current_stat);
125 false
126 }
127 Ordering::Less => {
128 warn!("Ignore stale heartbeat: {:?}", current_stat);
129 false
130 }
131 }
132 } else {
133 epoch_stats.set_epoch(current_stat.node_epoch);
134 epoch_stats.push_stat(current_stat);
135 true
139 };
140
141 if refresh {
143 let last = epoch_stats.stats.last().unwrap();
145 rewrite_node_address(ctx, last).await;
146 }
147
148 if !refresh && epoch_stats.len() < self.flush_stats_factor {
149 return Ok(HandleControl::Continue);
150 }
151
152 let value: Vec<u8> = DatanodeStatValue {
153 stats: epoch_stats.drain_all(),
154 }
155 .try_into()
156 .context(error::InvalidDatanodeStatFormatSnafu {})?;
157 let put = PutRequest {
158 key,
159 value,
160 prev_kv: false,
161 };
162
163 let _ = ctx
164 .in_memory
165 .put(put)
166 .await
167 .context(error::KvBackendSnafu)?;
168
169 Ok(HandleControl::Continue)
170 }
171}
172
173async fn rewrite_node_address(ctx: &mut Context, stat: &Stat) {
174 let peer = Peer {
175 id: stat.id,
176 addr: stat.addr.clone(),
177 };
178 let key = NodeAddressKey::with_datanode(peer.id).to_bytes();
179 if let Ok(value) = NodeAddressValue::new(peer.clone()).try_as_raw_value() {
180 let put = PutRequest {
181 key,
182 value,
183 prev_kv: false,
184 };
185
186 match ctx.leader_cached_kv_backend.put(put).await {
187 Ok(_) => {
188 info!(
189 "Successfully updated datanode `NodeAddressValue`: {:?}",
190 peer
191 );
192 let cache_idents = stat
194 .table_ids()
195 .into_iter()
196 .map(CacheIdent::TableId)
197 .collect::<Vec<_>>();
198 if let Err(e) = ctx
199 .cache_invalidator
200 .invalidate(&Default::default(), &cache_idents)
201 .await
202 {
203 error!(e; "Failed to invalidate {} `NodeAddressKey` cache, peer: {:?}", cache_idents.len(), peer);
204 }
205 }
206 Err(e) => {
207 error!(e; "Failed to update datanode `NodeAddressValue`: {:?}", peer);
208 }
209 }
210 } else {
211 warn!(
212 "Failed to serialize datanode `NodeAddressValue`: {:?}",
213 peer
214 );
215 }
216}
217
218#[cfg(test)]
219mod tests {
220 use common_meta::datanode::DatanodeStatKey;
221
222 use super::*;
223 use crate::handler::test_utils::TestEnv;
224
225 #[tokio::test]
226 async fn test_handle_datanode_stats() {
227 let env = TestEnv::new();
228 let ctx = env.ctx();
229
230 let handler = CollectStatsHandler::default();
231 handle_request_many_times(ctx.clone(), &handler, 1).await;
232
233 let key = DatanodeStatKey { node_id: 101 };
234 let key: Vec<u8> = key.into();
235 let res = ctx.in_memory.get(&key).await.unwrap();
236 let kv = res.unwrap();
237 let key: DatanodeStatKey = kv.key.clone().try_into().unwrap();
238 assert_eq!(101, key.node_id);
239 let val: DatanodeStatValue = kv.value.try_into().unwrap();
240 assert_eq!(1, val.stats.len());
242 assert_eq!(1, val.stats[0].region_num);
243
244 handle_request_many_times(ctx.clone(), &handler, 10).await;
245
246 let key: Vec<u8> = key.into();
247 let res = ctx.in_memory.get(&key).await.unwrap();
248 let kv = res.unwrap();
249 let val: DatanodeStatValue = kv.value.try_into().unwrap();
250 assert_eq!(handler.flush_stats_factor, val.stats.len());
251 }
252
253 async fn handle_request_many_times(
254 mut ctx: Context,
255 handler: &CollectStatsHandler,
256 loop_times: i32,
257 ) {
258 let req = HeartbeatRequest::default();
259 for i in 1..=loop_times {
260 let mut acc = HeartbeatAccumulator {
261 stat: Some(Stat {
262 id: 101,
263 region_num: i as _,
264 ..Default::default()
265 }),
266 ..Default::default()
267 };
268 handler.handle(&req, &mut ctx, &mut acc).await.unwrap();
269 }
270 }
271}