meta_srv/handler/
collect_stats_handler.rs1use std::cmp::Ordering;
16
17use api::v1::meta::{HeartbeatRequest, Role};
18use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, Stat};
19use common_meta::instruction::CacheIdent;
20use common_meta::key::node_address::{NodeAddressKey, NodeAddressValue};
21use common_meta::key::{MetadataKey, MetadataValue};
22use common_meta::peer::Peer;
23use common_meta::rpc::store::PutRequest;
24use common_telemetry::{error, info, warn};
25use dashmap::DashMap;
26use snafu::ResultExt;
27
28use crate::error::{self, Result};
29use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
30use crate::metasrv::Context;
31
32#[derive(Debug, Default)]
33struct EpochStats {
34 stats: Vec<Stat>,
35 epoch: Option<u64>,
36}
37
38impl EpochStats {
39 #[inline]
40 fn drain_all(&mut self) -> Vec<Stat> {
41 self.stats.drain(..).collect()
42 }
43
44 #[inline]
45 fn clear_stats(&mut self) {
46 self.stats.clear();
47 }
48
49 #[inline]
50 fn push_stat(&mut self, stat: Stat) {
51 self.stats.push(stat);
52 }
53
54 #[inline]
55 fn len(&self) -> usize {
56 self.stats.len()
57 }
58
59 #[inline]
60 fn epoch(&self) -> Option<u64> {
61 self.epoch
62 }
63
64 #[inline]
65 fn set_epoch(&mut self, epoch: u64) {
66 self.epoch = Some(epoch);
67 }
68}
69
70const DEFAULT_FLUSH_STATS_FACTOR: usize = 3;
71
72pub struct CollectStatsHandler {
73 stats_cache: DashMap<DatanodeStatKey, EpochStats>,
74 flush_stats_factor: usize,
75}
76
77impl Default for CollectStatsHandler {
78 fn default() -> Self {
79 Self::new(None)
80 }
81}
82
83impl CollectStatsHandler {
84 pub fn new(flush_stats_factor: Option<usize>) -> Self {
85 Self {
86 flush_stats_factor: flush_stats_factor.unwrap_or(DEFAULT_FLUSH_STATS_FACTOR),
87 stats_cache: DashMap::default(),
88 }
89 }
90}
91
92#[async_trait::async_trait]
93impl HeartbeatHandler for CollectStatsHandler {
94 fn is_acceptable(&self, role: Role) -> bool {
95 role == Role::Datanode
96 }
97
98 async fn handle(
99 &self,
100 _req: &HeartbeatRequest,
101 ctx: &mut Context,
102 acc: &mut HeartbeatAccumulator,
103 ) -> Result<HandleControl> {
104 let Some(current_stat) = acc.stat.take() else {
105 return Ok(HandleControl::Continue);
106 };
107
108 let key = current_stat.stat_key();
109 let mut entry = self.stats_cache.entry(key).or_default();
110
111 let key: Vec<u8> = key.into();
112 let epoch_stats = entry.value_mut();
113
114 let refresh = if let Some(epoch) = epoch_stats.epoch() {
115 match current_stat.node_epoch.cmp(&epoch) {
116 Ordering::Greater => {
117 epoch_stats.clear_stats();
119 epoch_stats.set_epoch(current_stat.node_epoch);
120 epoch_stats.push_stat(current_stat);
121 true
122 }
123 Ordering::Equal => {
124 epoch_stats.push_stat(current_stat);
125 false
126 }
127 Ordering::Less => {
128 warn!("Ignore stale heartbeat: {:?}", current_stat);
129 false
130 }
131 }
132 } else {
133 epoch_stats.set_epoch(current_stat.node_epoch);
134 epoch_stats.push_stat(current_stat);
135 true
139 };
140
141 if refresh {
143 let last = epoch_stats.stats.last().unwrap();
145 rewrite_node_address(ctx, last).await;
146 }
147
148 if !refresh && epoch_stats.len() < self.flush_stats_factor {
149 return Ok(HandleControl::Continue);
150 }
151
152 let value: Vec<u8> = DatanodeStatValue {
153 stats: epoch_stats.drain_all(),
154 }
155 .try_into()
156 .context(error::InvalidDatanodeStatFormatSnafu {})?;
157 let put = PutRequest {
158 key,
159 value,
160 prev_kv: false,
161 };
162
163 let _ = ctx
164 .in_memory
165 .put(put)
166 .await
167 .context(error::KvBackendSnafu)?;
168
169 Ok(HandleControl::Continue)
170 }
171}
172
173async fn rewrite_node_address(ctx: &mut Context, stat: &Stat) {
174 let peer = Peer {
175 id: stat.id,
176 addr: stat.addr.clone(),
177 };
178 let key = NodeAddressKey::with_datanode(peer.id).to_bytes();
179 if let Ok(value) = NodeAddressValue::new(peer.clone()).try_as_raw_value() {
180 let put = PutRequest {
181 key,
182 value,
183 prev_kv: false,
184 };
185
186 match ctx.leader_cached_kv_backend.put(put).await {
187 Ok(_) => {
188 info!(
189 "Successfully updated datanode `NodeAddressValue`: {:?}",
190 peer
191 );
192 let cache_idents = stat
194 .table_ids()
195 .into_iter()
196 .map(CacheIdent::TableId)
197 .collect::<Vec<_>>();
198 if let Err(e) = ctx
199 .cache_invalidator
200 .invalidate(&Default::default(), &cache_idents)
201 .await
202 {
203 error!(e; "Failed to invalidate {} `NodeAddressKey` cache, peer: {:?}", cache_idents.len(), peer);
204 }
205 }
206 Err(e) => {
207 error!(e; "Failed to update datanode `NodeAddressValue`: {:?}", peer);
208 }
209 }
210 } else {
211 warn!(
212 "Failed to serialize datanode `NodeAddressValue`: {:?}",
213 peer
214 );
215 }
216}
217
218#[cfg(test)]
219mod tests {
220 use std::sync::Arc;
221
222 use common_meta::cache_invalidator::DummyCacheInvalidator;
223 use common_meta::datanode::DatanodeStatKey;
224 use common_meta::key::TableMetadataManager;
225 use common_meta::kv_backend::memory::MemoryKvBackend;
226 use common_meta::region_registry::LeaderRegionRegistry;
227 use common_meta::sequence::SequenceBuilder;
228
229 use super::*;
230 use crate::cluster::MetaPeerClientBuilder;
231 use crate::handler::{HeartbeatMailbox, Pushers};
232 use crate::service::store::cached_kv::LeaderCachedKvBackend;
233
234 #[tokio::test]
235 async fn test_handle_datanode_stats() {
236 let in_memory = Arc::new(MemoryKvBackend::new());
237 let kv_backend = Arc::new(MemoryKvBackend::new());
238 let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader(
239 kv_backend.clone(),
240 ));
241 let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build();
242 let mailbox = HeartbeatMailbox::create(Pushers::default(), seq);
243 let meta_peer_client = MetaPeerClientBuilder::default()
244 .election(None)
245 .in_memory(in_memory.clone())
246 .build()
247 .map(Arc::new)
248 .unwrap();
250 let ctx = Context {
251 server_addr: "127.0.0.1:0000".to_string(),
252 in_memory,
253 kv_backend: kv_backend.clone(),
254 leader_cached_kv_backend,
255 meta_peer_client,
256 mailbox,
257 election: None,
258 is_infancy: false,
259 table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
260 cache_invalidator: Arc::new(DummyCacheInvalidator),
261 leader_region_registry: Arc::new(LeaderRegionRegistry::new()),
262 };
263
264 let handler = CollectStatsHandler::default();
265 handle_request_many_times(ctx.clone(), &handler, 1).await;
266
267 let key = DatanodeStatKey { node_id: 101 };
268 let key: Vec<u8> = key.into();
269 let res = ctx.in_memory.get(&key).await.unwrap();
270 let kv = res.unwrap();
271 let key: DatanodeStatKey = kv.key.clone().try_into().unwrap();
272 assert_eq!(101, key.node_id);
273 let val: DatanodeStatValue = kv.value.try_into().unwrap();
274 assert_eq!(1, val.stats.len());
276 assert_eq!(1, val.stats[0].region_num);
277
278 handle_request_many_times(ctx.clone(), &handler, 10).await;
279
280 let key: Vec<u8> = key.into();
281 let res = ctx.in_memory.get(&key).await.unwrap();
282 let kv = res.unwrap();
283 let val: DatanodeStatValue = kv.value.try_into().unwrap();
284 assert_eq!(handler.flush_stats_factor, val.stats.len());
285 }
286
287 async fn handle_request_many_times(
288 mut ctx: Context,
289 handler: &CollectStatsHandler,
290 loop_times: i32,
291 ) {
292 let req = HeartbeatRequest::default();
293 for i in 1..=loop_times {
294 let mut acc = HeartbeatAccumulator {
295 stat: Some(Stat {
296 id: 101,
297 region_num: i as _,
298 ..Default::default()
299 }),
300 ..Default::default()
301 };
302 handler.handle(&req, &mut ctx, &mut acc).await.unwrap();
303 }
304 }
305}