1use std::collections::HashSet;
16use std::str::FromStr;
17
18use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, RequestHeader};
19use common_time::util as time_util;
20use lazy_static::lazy_static;
21use regex::Regex;
22use serde::{Deserialize, Serialize};
23use snafu::{ensure, OptionExt, ResultExt};
24use store_api::region_engine::{RegionRole, RegionStatistic};
25use store_api::storage::RegionId;
26use table::metadata::TableId;
27
28use crate::error;
29use crate::error::Result;
30use crate::heartbeat::utils::get_datanode_workloads;
31
32pub(crate) const DATANODE_LEASE_PREFIX: &str = "__meta_datanode_lease";
33const INACTIVE_REGION_PREFIX: &str = "__meta_inactive_region";
34
35const DATANODE_STAT_PREFIX: &str = "__meta_datanode_stat";
36
37pub const REGION_STATISTIC_KEY: &str = "__region_statistic";
38
39lazy_static! {
40 pub(crate) static ref DATANODE_LEASE_KEY_PATTERN: Regex =
41 Regex::new(&format!("^{DATANODE_LEASE_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
42 static ref DATANODE_STAT_KEY_PATTERN: Regex =
43 Regex::new(&format!("^{DATANODE_STAT_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
44 static ref INACTIVE_REGION_KEY_PATTERN: Regex = Regex::new(&format!(
45 "^{INACTIVE_REGION_PREFIX}-([0-9]+)-([0-9]+)-([0-9]+)$"
46 ))
47 .unwrap();
48}
49
50#[derive(Debug, Clone, Default, Serialize, Deserialize)]
54pub struct Stat {
55 pub timestamp_millis: i64,
56 pub id: u64,
58 pub addr: String,
60 pub rcus: i64,
62 pub wcus: i64,
64 pub region_num: u64,
66 pub region_stats: Vec<RegionStat>,
68 pub topic_stats: Vec<TopicStat>,
70 pub node_epoch: u64,
72 pub datanode_workloads: DatanodeWorkloads,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct RegionStat {
79 pub id: RegionId,
81 pub rcus: i64,
83 pub wcus: i64,
85 pub approximate_bytes: u64,
87 pub engine: String,
89 pub role: RegionRole,
91 pub num_rows: u64,
93 pub memtable_size: u64,
95 pub manifest_size: u64,
97 pub sst_size: u64,
99 pub sst_num: u64,
101 pub index_size: u64,
103 pub region_manifest: RegionManifestInfo,
105 pub write_bytes: u64,
107 pub data_topic_latest_entry_id: u64,
110 pub metadata_topic_latest_entry_id: u64,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct TopicStat {
118 pub topic: String,
120 pub latest_entry_id: u64,
122 pub record_size: u64,
124 pub record_num: u64,
126}
127
128pub trait TopicStatsReporter: Send + Sync {
130 fn reportable_topics(&mut self) -> Vec<TopicStat>;
132}
133
134#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
135pub enum RegionManifestInfo {
136 Mito {
137 manifest_version: u64,
138 flushed_entry_id: u64,
139 },
140 Metric {
141 data_manifest_version: u64,
142 data_flushed_entry_id: u64,
143 metadata_manifest_version: u64,
144 metadata_flushed_entry_id: u64,
145 },
146}
147
148impl Stat {
149 #[inline]
150 pub fn is_empty(&self) -> bool {
151 self.region_stats.is_empty()
152 }
153
154 pub fn stat_key(&self) -> DatanodeStatKey {
155 DatanodeStatKey { node_id: self.id }
156 }
157
158 pub fn regions(&self) -> Vec<(RegionId, RegionRole)> {
160 self.region_stats.iter().map(|s| (s.id, s.role)).collect()
161 }
162
163 pub fn table_ids(&self) -> HashSet<TableId> {
165 self.region_stats.iter().map(|s| s.id.table_id()).collect()
166 }
167
168 pub fn retain_active_region_stats(&mut self, inactive_region_ids: &HashSet<RegionId>) {
170 if inactive_region_ids.is_empty() {
171 return;
172 }
173
174 self.region_stats
175 .retain(|r| !inactive_region_ids.contains(&r.id));
176 self.rcus = self.region_stats.iter().map(|s| s.rcus).sum();
177 self.wcus = self.region_stats.iter().map(|s| s.wcus).sum();
178 self.region_num = self.region_stats.len() as u64;
179 }
180
181 pub fn memory_size(&self) -> usize {
182 std::mem::size_of::<i64>() * 3 +
184 std::mem::size_of::<u64>() * 3 +
186 std::mem::size_of::<String>() + self.addr.capacity() +
188 self.region_stats.iter().map(|s| s.memory_size()).sum::<usize>()
190 }
191}
192
193impl RegionStat {
194 pub fn memory_size(&self) -> usize {
195 std::mem::size_of::<RegionRole>() +
197 std::mem::size_of::<RegionId>() +
199 std::mem::size_of::<i64>() * 4 +
201 std::mem::size_of::<u64>() * 5 +
203 std::mem::size_of::<String>() + self.engine.capacity() +
205 self.region_manifest.memory_size()
207 }
208}
209
210impl RegionManifestInfo {
211 pub fn memory_size(&self) -> usize {
212 match self {
213 RegionManifestInfo::Mito { .. } => std::mem::size_of::<u64>() * 2,
214 RegionManifestInfo::Metric { .. } => std::mem::size_of::<u64>() * 4,
215 }
216 }
217}
218
219impl TryFrom<&HeartbeatRequest> for Stat {
220 type Error = Option<RequestHeader>;
221
222 fn try_from(value: &HeartbeatRequest) -> std::result::Result<Self, Self::Error> {
223 let HeartbeatRequest {
224 header,
225 peer,
226 region_stats,
227 node_epoch,
228 node_workloads,
229 topic_stats,
230 ..
231 } = value;
232
233 match (header, peer) {
234 (Some(_header), Some(peer)) => {
235 let region_stats = region_stats
236 .iter()
237 .map(RegionStat::from)
238 .collect::<Vec<_>>();
239 let topic_stats = topic_stats.iter().map(TopicStat::from).collect::<Vec<_>>();
240
241 let datanode_workloads = get_datanode_workloads(node_workloads.as_ref());
242 Ok(Self {
243 timestamp_millis: time_util::current_time_millis(),
244 id: peer.id,
246 addr: peer.addr.clone(),
248 rcus: region_stats.iter().map(|s| s.rcus).sum(),
249 wcus: region_stats.iter().map(|s| s.wcus).sum(),
250 region_num: region_stats.len() as u64,
251 region_stats,
252 topic_stats,
253 node_epoch: *node_epoch,
254 datanode_workloads,
255 })
256 }
257 (header, _) => Err(header.clone()),
258 }
259 }
260}
261
262impl From<store_api::region_engine::RegionManifestInfo> for RegionManifestInfo {
263 fn from(value: store_api::region_engine::RegionManifestInfo) -> Self {
264 match value {
265 store_api::region_engine::RegionManifestInfo::Mito {
266 manifest_version,
267 flushed_entry_id,
268 } => RegionManifestInfo::Mito {
269 manifest_version,
270 flushed_entry_id,
271 },
272 store_api::region_engine::RegionManifestInfo::Metric {
273 data_manifest_version,
274 data_flushed_entry_id,
275 metadata_manifest_version,
276 metadata_flushed_entry_id,
277 } => RegionManifestInfo::Metric {
278 data_manifest_version,
279 data_flushed_entry_id,
280 metadata_manifest_version,
281 metadata_flushed_entry_id,
282 },
283 }
284 }
285}
286
287impl From<&api::v1::meta::RegionStat> for RegionStat {
288 fn from(value: &api::v1::meta::RegionStat) -> Self {
289 let region_stat = value
290 .extensions
291 .get(REGION_STATISTIC_KEY)
292 .and_then(|value| RegionStatistic::deserialize_from_slice(value))
293 .unwrap_or_default();
294
295 Self {
296 id: RegionId::from_u64(value.region_id),
297 rcus: value.rcus,
298 wcus: value.wcus,
299 approximate_bytes: value.approximate_bytes as u64,
300 engine: value.engine.to_string(),
301 role: RegionRole::from(value.role()),
302 num_rows: region_stat.num_rows,
303 memtable_size: region_stat.memtable_size,
304 manifest_size: region_stat.manifest_size,
305 sst_size: region_stat.sst_size,
306 sst_num: region_stat.sst_num,
307 index_size: region_stat.index_size,
308 region_manifest: region_stat.manifest.into(),
309 write_bytes: region_stat.write_bytes,
310 data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
311 metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
312 }
313 }
314}
315
316impl From<&api::v1::meta::TopicStat> for TopicStat {
317 fn from(value: &api::v1::meta::TopicStat) -> Self {
318 Self {
319 topic: value.topic_name.clone(),
320 latest_entry_id: value.latest_entry_id,
321 record_size: value.record_size,
322 record_num: value.record_num,
323 }
324 }
325}
326
327#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
331pub struct DatanodeStatKey {
332 pub node_id: u64,
333}
334
335impl DatanodeStatKey {
336 pub fn prefix_key() -> Vec<u8> {
338 format!("{DATANODE_STAT_PREFIX}-0-").into_bytes()
340 }
341}
342
343impl From<DatanodeStatKey> for Vec<u8> {
344 fn from(value: DatanodeStatKey) -> Self {
345 format!("{}-0-{}", DATANODE_STAT_PREFIX, value.node_id).into_bytes()
347 }
348}
349
350impl FromStr for DatanodeStatKey {
351 type Err = error::Error;
352
353 fn from_str(key: &str) -> Result<Self> {
354 let caps = DATANODE_STAT_KEY_PATTERN
355 .captures(key)
356 .context(error::InvalidStatKeySnafu { key })?;
357
358 ensure!(caps.len() == 3, error::InvalidStatKeySnafu { key });
359 let node_id = caps[2].to_string();
360 let node_id: u64 = node_id.parse().context(error::ParseNumSnafu {
361 err_msg: format!("invalid node_id: {node_id}"),
362 })?;
363
364 Ok(Self { node_id })
365 }
366}
367
368impl TryFrom<Vec<u8>> for DatanodeStatKey {
369 type Error = error::Error;
370
371 fn try_from(bytes: Vec<u8>) -> Result<Self> {
372 String::from_utf8(bytes)
373 .context(error::FromUtf8Snafu {
374 name: "DatanodeStatKey",
375 })
376 .map(|x| x.parse())?
377 }
378}
379
380#[derive(Debug, Clone, Serialize, Deserialize)]
382#[serde(transparent)]
383pub struct DatanodeStatValue {
384 pub stats: Vec<Stat>,
385}
386
387impl DatanodeStatValue {
388 pub fn region_num(&self) -> Option<u64> {
390 self.stats.last().map(|x| x.region_num)
391 }
392
393 pub fn node_addr(&self) -> Option<String> {
395 self.stats.last().map(|x| x.addr.clone())
396 }
397}
398
399impl TryFrom<DatanodeStatValue> for Vec<u8> {
400 type Error = error::Error;
401
402 fn try_from(stats: DatanodeStatValue) -> Result<Self> {
403 Ok(serde_json::to_string(&stats)
404 .context(error::SerializeToJsonSnafu {
405 input: format!("{stats:?}"),
406 })?
407 .into_bytes())
408 }
409}
410
411impl FromStr for DatanodeStatValue {
412 type Err = error::Error;
413
414 fn from_str(value: &str) -> Result<Self> {
415 serde_json::from_str(value).context(error::DeserializeFromJsonSnafu { input: value })
416 }
417}
418
419impl TryFrom<Vec<u8>> for DatanodeStatValue {
420 type Error = error::Error;
421
422 fn try_from(value: Vec<u8>) -> Result<Self> {
423 String::from_utf8(value)
424 .context(error::FromUtf8Snafu {
425 name: "DatanodeStatValue",
426 })
427 .map(|x| x.parse())?
428 }
429}
430
431#[cfg(test)]
432mod tests {
433 use super::*;
434
435 #[test]
436 fn test_stat_key() {
437 let stat = Stat {
438 id: 101,
439 region_num: 10,
440 ..Default::default()
441 };
442
443 let stat_key = stat.stat_key();
444
445 assert_eq!(101, stat_key.node_id);
446 }
447
448 #[test]
449 fn test_stat_val_round_trip() {
450 let stat = Stat {
451 id: 101,
452 region_num: 100,
453 ..Default::default()
454 };
455
456 let stat_val = DatanodeStatValue { stats: vec![stat] };
457
458 let bytes: Vec<u8> = stat_val.try_into().unwrap();
459 let stat_val: DatanodeStatValue = bytes.try_into().unwrap();
460 let stats = stat_val.stats;
461
462 assert_eq!(1, stats.len());
463
464 let stat = stats.first().unwrap();
465 assert_eq!(101, stat.id);
466 assert_eq!(100, stat.region_num);
467 }
468
469 #[test]
470 fn test_get_addr_from_stat_val() {
471 let empty = DatanodeStatValue { stats: vec![] };
472 let addr = empty.node_addr();
473 assert!(addr.is_none());
474
475 let stat_val = DatanodeStatValue {
476 stats: vec![
477 Stat {
478 addr: "1".to_string(),
479 ..Default::default()
480 },
481 Stat {
482 addr: "2".to_string(),
483 ..Default::default()
484 },
485 Stat {
486 addr: "3".to_string(),
487 ..Default::default()
488 },
489 ],
490 };
491 let addr = stat_val.node_addr().unwrap();
492 assert_eq!("3", addr);
493 }
494
495 #[test]
496 fn test_get_region_num_from_stat_val() {
497 let empty = DatanodeStatValue { stats: vec![] };
498 let region_num = empty.region_num();
499 assert!(region_num.is_none());
500
501 let wrong = DatanodeStatValue {
502 stats: vec![Stat {
503 region_num: 0,
504 ..Default::default()
505 }],
506 };
507 let right = wrong.region_num();
508 assert_eq!(Some(0), right);
509
510 let stat_val = DatanodeStatValue {
511 stats: vec![
512 Stat {
513 region_num: 1,
514 ..Default::default()
515 },
516 Stat {
517 region_num: 0,
518 ..Default::default()
519 },
520 Stat {
521 region_num: 2,
522 ..Default::default()
523 },
524 ],
525 };
526 let region_num = stat_val.region_num().unwrap();
527 assert_eq!(2, region_num);
528 }
529}