1use std::collections::HashSet;
16use std::str::FromStr;
17
18use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, RequestHeader};
19use common_time::util as time_util;
20use lazy_static::lazy_static;
21use regex::Regex;
22use serde::{Deserialize, Serialize};
23use snafu::{ensure, OptionExt, ResultExt};
24use store_api::region_engine::{RegionRole, RegionStatistic};
25use store_api::storage::RegionId;
26use table::metadata::TableId;
27
28use crate::error;
29use crate::error::Result;
30use crate::heartbeat::utils::get_datanode_workloads;
31
32pub(crate) const DATANODE_LEASE_PREFIX: &str = "__meta_datanode_lease";
33const INACTIVE_REGION_PREFIX: &str = "__meta_inactive_region";
34
35const DATANODE_STAT_PREFIX: &str = "__meta_datanode_stat";
36
37pub const REGION_STATISTIC_KEY: &str = "__region_statistic";
38
39lazy_static! {
40 pub(crate) static ref DATANODE_LEASE_KEY_PATTERN: Regex =
41 Regex::new(&format!("^{DATANODE_LEASE_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
42 static ref DATANODE_STAT_KEY_PATTERN: Regex =
43 Regex::new(&format!("^{DATANODE_STAT_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
44 static ref INACTIVE_REGION_KEY_PATTERN: Regex = Regex::new(&format!(
45 "^{INACTIVE_REGION_PREFIX}-([0-9]+)-([0-9]+)-([0-9]+)$"
46 ))
47 .unwrap();
48}
49
50#[derive(Debug, Clone, Default, Serialize, Deserialize)]
54pub struct Stat {
55 pub timestamp_millis: i64,
56 pub id: u64,
58 pub addr: String,
60 pub rcus: i64,
62 pub wcus: i64,
64 pub region_num: u64,
66 pub region_stats: Vec<RegionStat>,
67 pub node_epoch: u64,
69 pub datanode_workloads: DatanodeWorkloads,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct RegionStat {
76 pub id: RegionId,
78 pub rcus: i64,
80 pub wcus: i64,
82 pub approximate_bytes: u64,
84 pub engine: String,
86 pub role: RegionRole,
88 pub num_rows: u64,
90 pub memtable_size: u64,
92 pub manifest_size: u64,
94 pub sst_size: u64,
96 pub index_size: u64,
98 pub region_manifest: RegionManifestInfo,
100 pub data_topic_latest_entry_id: u64,
103 pub metadata_topic_latest_entry_id: u64,
107}
108
109#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
110pub enum RegionManifestInfo {
111 Mito {
112 manifest_version: u64,
113 flushed_entry_id: u64,
114 },
115 Metric {
116 data_manifest_version: u64,
117 data_flushed_entry_id: u64,
118 metadata_manifest_version: u64,
119 metadata_flushed_entry_id: u64,
120 },
121}
122
123impl Stat {
124 #[inline]
125 pub fn is_empty(&self) -> bool {
126 self.region_stats.is_empty()
127 }
128
129 pub fn stat_key(&self) -> DatanodeStatKey {
130 DatanodeStatKey { node_id: self.id }
131 }
132
133 pub fn regions(&self) -> Vec<(RegionId, RegionRole)> {
135 self.region_stats.iter().map(|s| (s.id, s.role)).collect()
136 }
137
138 pub fn table_ids(&self) -> HashSet<TableId> {
140 self.region_stats.iter().map(|s| s.id.table_id()).collect()
141 }
142
143 pub fn retain_active_region_stats(&mut self, inactive_region_ids: &HashSet<RegionId>) {
145 if inactive_region_ids.is_empty() {
146 return;
147 }
148
149 self.region_stats
150 .retain(|r| !inactive_region_ids.contains(&r.id));
151 self.rcus = self.region_stats.iter().map(|s| s.rcus).sum();
152 self.wcus = self.region_stats.iter().map(|s| s.wcus).sum();
153 self.region_num = self.region_stats.len() as u64;
154 }
155
156 pub fn memory_size(&self) -> usize {
157 std::mem::size_of::<i64>() * 3 +
159 std::mem::size_of::<u64>() * 3 +
161 std::mem::size_of::<String>() + self.addr.capacity() +
163 self.region_stats.iter().map(|s| s.memory_size()).sum::<usize>()
165 }
166}
167
168impl RegionStat {
169 pub fn memory_size(&self) -> usize {
170 std::mem::size_of::<RegionRole>() +
172 std::mem::size_of::<RegionId>() +
174 std::mem::size_of::<i64>() * 4 +
176 std::mem::size_of::<u64>() * 4 +
178 std::mem::size_of::<String>() + self.engine.capacity() +
180 self.region_manifest.memory_size()
182 }
183}
184
185impl RegionManifestInfo {
186 pub fn memory_size(&self) -> usize {
187 match self {
188 RegionManifestInfo::Mito { .. } => std::mem::size_of::<u64>() * 2,
189 RegionManifestInfo::Metric { .. } => std::mem::size_of::<u64>() * 4,
190 }
191 }
192}
193
194impl TryFrom<&HeartbeatRequest> for Stat {
195 type Error = Option<RequestHeader>;
196
197 fn try_from(value: &HeartbeatRequest) -> std::result::Result<Self, Self::Error> {
198 let HeartbeatRequest {
199 header,
200 peer,
201 region_stats,
202 node_epoch,
203 node_workloads,
204 ..
205 } = value;
206
207 match (header, peer) {
208 (Some(_header), Some(peer)) => {
209 let region_stats = region_stats
210 .iter()
211 .map(RegionStat::from)
212 .collect::<Vec<_>>();
213
214 let datanode_workloads = get_datanode_workloads(node_workloads.as_ref());
215 Ok(Self {
216 timestamp_millis: time_util::current_time_millis(),
217 id: peer.id,
219 addr: peer.addr.clone(),
221 rcus: region_stats.iter().map(|s| s.rcus).sum(),
222 wcus: region_stats.iter().map(|s| s.wcus).sum(),
223 region_num: region_stats.len() as u64,
224 region_stats,
225 node_epoch: *node_epoch,
226 datanode_workloads,
227 })
228 }
229 (header, _) => Err(header.clone()),
230 }
231 }
232}
233
234impl From<store_api::region_engine::RegionManifestInfo> for RegionManifestInfo {
235 fn from(value: store_api::region_engine::RegionManifestInfo) -> Self {
236 match value {
237 store_api::region_engine::RegionManifestInfo::Mito {
238 manifest_version,
239 flushed_entry_id,
240 } => RegionManifestInfo::Mito {
241 manifest_version,
242 flushed_entry_id,
243 },
244 store_api::region_engine::RegionManifestInfo::Metric {
245 data_manifest_version,
246 data_flushed_entry_id,
247 metadata_manifest_version,
248 metadata_flushed_entry_id,
249 } => RegionManifestInfo::Metric {
250 data_manifest_version,
251 data_flushed_entry_id,
252 metadata_manifest_version,
253 metadata_flushed_entry_id,
254 },
255 }
256 }
257}
258
259impl From<&api::v1::meta::RegionStat> for RegionStat {
260 fn from(value: &api::v1::meta::RegionStat) -> Self {
261 let region_stat = value
262 .extensions
263 .get(REGION_STATISTIC_KEY)
264 .and_then(|value| RegionStatistic::deserialize_from_slice(value))
265 .unwrap_or_default();
266
267 Self {
268 id: RegionId::from_u64(value.region_id),
269 rcus: value.rcus,
270 wcus: value.wcus,
271 approximate_bytes: value.approximate_bytes as u64,
272 engine: value.engine.to_string(),
273 role: RegionRole::from(value.role()),
274 num_rows: region_stat.num_rows,
275 memtable_size: region_stat.memtable_size,
276 manifest_size: region_stat.manifest_size,
277 sst_size: region_stat.sst_size,
278 index_size: region_stat.index_size,
279 region_manifest: region_stat.manifest.into(),
280 data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
281 metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
282 }
283 }
284}
285
286#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
290pub struct DatanodeStatKey {
291 pub node_id: u64,
292}
293
294impl DatanodeStatKey {
295 pub fn prefix_key() -> Vec<u8> {
297 format!("{DATANODE_STAT_PREFIX}-0-").into_bytes()
299 }
300}
301
302impl From<DatanodeStatKey> for Vec<u8> {
303 fn from(value: DatanodeStatKey) -> Self {
304 format!("{}-0-{}", DATANODE_STAT_PREFIX, value.node_id).into_bytes()
306 }
307}
308
309impl FromStr for DatanodeStatKey {
310 type Err = error::Error;
311
312 fn from_str(key: &str) -> Result<Self> {
313 let caps = DATANODE_STAT_KEY_PATTERN
314 .captures(key)
315 .context(error::InvalidStatKeySnafu { key })?;
316
317 ensure!(caps.len() == 3, error::InvalidStatKeySnafu { key });
318 let node_id = caps[2].to_string();
319 let node_id: u64 = node_id.parse().context(error::ParseNumSnafu {
320 err_msg: format!("invalid node_id: {node_id}"),
321 })?;
322
323 Ok(Self { node_id })
324 }
325}
326
327impl TryFrom<Vec<u8>> for DatanodeStatKey {
328 type Error = error::Error;
329
330 fn try_from(bytes: Vec<u8>) -> Result<Self> {
331 String::from_utf8(bytes)
332 .context(error::FromUtf8Snafu {
333 name: "DatanodeStatKey",
334 })
335 .map(|x| x.parse())?
336 }
337}
338
339#[derive(Debug, Clone, Serialize, Deserialize)]
341#[serde(transparent)]
342pub struct DatanodeStatValue {
343 pub stats: Vec<Stat>,
344}
345
346impl DatanodeStatValue {
347 pub fn region_num(&self) -> Option<u64> {
349 self.stats.last().map(|x| x.region_num)
350 }
351
352 pub fn node_addr(&self) -> Option<String> {
354 self.stats.last().map(|x| x.addr.clone())
355 }
356}
357
358impl TryFrom<DatanodeStatValue> for Vec<u8> {
359 type Error = error::Error;
360
361 fn try_from(stats: DatanodeStatValue) -> Result<Self> {
362 Ok(serde_json::to_string(&stats)
363 .context(error::SerializeToJsonSnafu {
364 input: format!("{stats:?}"),
365 })?
366 .into_bytes())
367 }
368}
369
370impl FromStr for DatanodeStatValue {
371 type Err = error::Error;
372
373 fn from_str(value: &str) -> Result<Self> {
374 serde_json::from_str(value).context(error::DeserializeFromJsonSnafu { input: value })
375 }
376}
377
378impl TryFrom<Vec<u8>> for DatanodeStatValue {
379 type Error = error::Error;
380
381 fn try_from(value: Vec<u8>) -> Result<Self> {
382 String::from_utf8(value)
383 .context(error::FromUtf8Snafu {
384 name: "DatanodeStatValue",
385 })
386 .map(|x| x.parse())?
387 }
388}
389
390#[cfg(test)]
391mod tests {
392 use super::*;
393
394 #[test]
395 fn test_stat_key() {
396 let stat = Stat {
397 id: 101,
398 region_num: 10,
399 ..Default::default()
400 };
401
402 let stat_key = stat.stat_key();
403
404 assert_eq!(101, stat_key.node_id);
405 }
406
407 #[test]
408 fn test_stat_val_round_trip() {
409 let stat = Stat {
410 id: 101,
411 region_num: 100,
412 ..Default::default()
413 };
414
415 let stat_val = DatanodeStatValue { stats: vec![stat] };
416
417 let bytes: Vec<u8> = stat_val.try_into().unwrap();
418 let stat_val: DatanodeStatValue = bytes.try_into().unwrap();
419 let stats = stat_val.stats;
420
421 assert_eq!(1, stats.len());
422
423 let stat = stats.first().unwrap();
424 assert_eq!(101, stat.id);
425 assert_eq!(100, stat.region_num);
426 }
427
428 #[test]
429 fn test_get_addr_from_stat_val() {
430 let empty = DatanodeStatValue { stats: vec![] };
431 let addr = empty.node_addr();
432 assert!(addr.is_none());
433
434 let stat_val = DatanodeStatValue {
435 stats: vec![
436 Stat {
437 addr: "1".to_string(),
438 ..Default::default()
439 },
440 Stat {
441 addr: "2".to_string(),
442 ..Default::default()
443 },
444 Stat {
445 addr: "3".to_string(),
446 ..Default::default()
447 },
448 ],
449 };
450 let addr = stat_val.node_addr().unwrap();
451 assert_eq!("3", addr);
452 }
453
454 #[test]
455 fn test_get_region_num_from_stat_val() {
456 let empty = DatanodeStatValue { stats: vec![] };
457 let region_num = empty.region_num();
458 assert!(region_num.is_none());
459
460 let wrong = DatanodeStatValue {
461 stats: vec![Stat {
462 region_num: 0,
463 ..Default::default()
464 }],
465 };
466 let right = wrong.region_num();
467 assert_eq!(Some(0), right);
468
469 let stat_val = DatanodeStatValue {
470 stats: vec![
471 Stat {
472 region_num: 1,
473 ..Default::default()
474 },
475 Stat {
476 region_num: 0,
477 ..Default::default()
478 },
479 Stat {
480 region_num: 2,
481 ..Default::default()
482 },
483 ],
484 };
485 let region_num = stat_val.region_num().unwrap();
486 assert_eq!(2, region_num);
487 }
488}