1use std::collections::{HashMap, HashSet};
16use std::str::FromStr;
17
18use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, RequestHeader};
19use common_time::util as time_util;
20use lazy_static::lazy_static;
21use regex::Regex;
22use serde::{Deserialize, Serialize};
23use snafu::{OptionExt, ResultExt, ensure};
24use store_api::region_engine::{RegionRole, RegionStatistic};
25use store_api::storage::RegionId;
26use table::metadata::TableId;
27
28use crate::error::{self, DeserializeFromJsonSnafu, Result};
29use crate::heartbeat::utils::get_datanode_workloads;
30
31const DATANODE_STAT_PREFIX: &str = "__meta_datanode_stat";
32
33pub const REGION_STATISTIC_KEY: &str = "__region_statistic";
34
35lazy_static! {
36 pub(crate) static ref DATANODE_LEASE_KEY_PATTERN: Regex =
37 Regex::new("^__meta_datanode_lease-([0-9]+)-([0-9]+)$").unwrap();
38 static ref DATANODE_STAT_KEY_PATTERN: Regex =
39 Regex::new(&format!("^{DATANODE_STAT_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
40 static ref INACTIVE_REGION_KEY_PATTERN: Regex =
41 Regex::new("^__meta_inactive_region-([0-9]+)-([0-9]+)-([0-9]+)$").unwrap();
42}
43
44#[derive(Debug, Clone, Default, Serialize, Deserialize)]
48pub struct Stat {
49 pub timestamp_millis: i64,
50 pub id: u64,
52 pub addr: String,
54 pub rcus: i64,
56 pub wcus: i64,
58 pub region_num: u64,
60 pub region_stats: Vec<RegionStat>,
62 pub topic_stats: Vec<TopicStat>,
64 pub node_epoch: u64,
66 pub datanode_workloads: DatanodeWorkloads,
68 pub gc_stat: Option<GcStat>,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
74pub struct RegionStat {
75 pub id: RegionId,
77 pub rcus: i64,
79 pub wcus: i64,
81 pub approximate_bytes: u64,
83 pub engine: String,
85 pub role: RegionRole,
87 pub num_rows: u64,
89 pub memtable_size: u64,
91 pub manifest_size: u64,
93 pub sst_size: u64,
95 pub sst_num: u64,
97 pub index_size: u64,
99 pub region_manifest: RegionManifestInfo,
101 pub written_bytes: u64,
103 pub data_topic_latest_entry_id: u64,
106 pub metadata_topic_latest_entry_id: u64,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct TopicStat {
114 pub topic: String,
116 pub latest_entry_id: u64,
118 pub record_size: u64,
120 pub record_num: u64,
122}
123
124pub trait TopicStatsReporter: Send + Sync {
126 fn reportable_topics(&mut self) -> Vec<TopicStat>;
128}
129
130#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
131pub enum RegionManifestInfo {
132 Mito {
133 manifest_version: u64,
134 flushed_entry_id: u64,
135 file_removed_cnt: u64,
137 },
138 Metric {
139 data_manifest_version: u64,
140 data_flushed_entry_id: u64,
141 metadata_manifest_version: u64,
142 metadata_flushed_entry_id: u64,
143 },
144}
145
146impl Stat {
147 #[inline]
148 pub fn is_empty(&self) -> bool {
149 self.region_stats.is_empty()
150 }
151
152 pub fn stat_key(&self) -> DatanodeStatKey {
153 DatanodeStatKey { node_id: self.id }
154 }
155
156 pub fn regions(&self) -> Vec<(RegionId, RegionRole)> {
158 self.region_stats.iter().map(|s| (s.id, s.role)).collect()
159 }
160
161 pub fn table_ids(&self) -> HashSet<TableId> {
163 self.region_stats.iter().map(|s| s.id.table_id()).collect()
164 }
165
166 pub fn retain_active_region_stats(&mut self, inactive_region_ids: &HashSet<RegionId>) {
168 if inactive_region_ids.is_empty() {
169 return;
170 }
171
172 self.region_stats
173 .retain(|r| !inactive_region_ids.contains(&r.id));
174 self.rcus = self.region_stats.iter().map(|s| s.rcus).sum();
175 self.wcus = self.region_stats.iter().map(|s| s.wcus).sum();
176 self.region_num = self.region_stats.len() as u64;
177 }
178
179 pub fn memory_size(&self) -> usize {
180 std::mem::size_of::<i64>() * 3 +
182 std::mem::size_of::<u64>() * 3 +
184 std::mem::size_of::<String>() + self.addr.capacity() +
186 self.region_stats.iter().map(|s| s.memory_size()).sum::<usize>()
188 }
189}
190
191impl RegionStat {
192 pub fn memory_size(&self) -> usize {
193 std::mem::size_of::<RegionRole>() +
195 std::mem::size_of::<RegionId>() +
197 std::mem::size_of::<i64>() * 4 +
199 std::mem::size_of::<u64>() * 5 +
201 std::mem::size_of::<String>() + self.engine.capacity() +
203 self.region_manifest.memory_size()
205 }
206}
207
208impl RegionManifestInfo {
209 pub fn memory_size(&self) -> usize {
210 match self {
211 RegionManifestInfo::Mito { .. } => std::mem::size_of::<u64>() * 2,
212 RegionManifestInfo::Metric { .. } => std::mem::size_of::<u64>() * 4,
213 }
214 }
215}
216
217impl TryFrom<&HeartbeatRequest> for Stat {
218 type Error = Option<RequestHeader>;
219
220 fn try_from(value: &HeartbeatRequest) -> std::result::Result<Self, Self::Error> {
221 let HeartbeatRequest {
222 header,
223 peer,
224 region_stats,
225 node_epoch,
226 node_workloads,
227 topic_stats,
228 extensions,
229 ..
230 } = value;
231
232 match (header, peer) {
233 (Some(header), Some(peer)) => {
234 let region_stats = region_stats
235 .iter()
236 .map(RegionStat::from)
237 .collect::<Vec<_>>();
238 let topic_stats = topic_stats.iter().map(TopicStat::from).collect::<Vec<_>>();
239
240 let datanode_workloads = get_datanode_workloads(node_workloads.as_ref());
241
242 let gc_stat = GcStat::from_extensions(extensions).map_err(|err| {
243 common_telemetry::error!(
244 "Failed to deserialize GcStat from extensions: {}",
245 err
246 );
247 header.clone()
248 })?;
249 Ok(Self {
250 timestamp_millis: time_util::current_time_millis(),
251 id: peer.id,
253 addr: peer.addr.clone(),
255 rcus: region_stats.iter().map(|s| s.rcus).sum(),
256 wcus: region_stats.iter().map(|s| s.wcus).sum(),
257 region_num: region_stats.len() as u64,
258 region_stats,
259 topic_stats,
260 node_epoch: *node_epoch,
261 datanode_workloads,
262 gc_stat,
263 })
264 }
265 (header, _) => Err(header.clone()),
266 }
267 }
268}
269
270impl From<store_api::region_engine::RegionManifestInfo> for RegionManifestInfo {
271 fn from(value: store_api::region_engine::RegionManifestInfo) -> Self {
272 match value {
273 store_api::region_engine::RegionManifestInfo::Mito {
274 manifest_version,
275 flushed_entry_id,
276 file_removed_cnt,
277 } => RegionManifestInfo::Mito {
278 manifest_version,
279 flushed_entry_id,
280 file_removed_cnt,
281 },
282 store_api::region_engine::RegionManifestInfo::Metric {
283 data_manifest_version,
284 data_flushed_entry_id,
285 metadata_manifest_version,
286 metadata_flushed_entry_id,
287 } => RegionManifestInfo::Metric {
288 data_manifest_version,
289 data_flushed_entry_id,
290 metadata_manifest_version,
291 metadata_flushed_entry_id,
292 },
293 }
294 }
295}
296
297impl From<&api::v1::meta::RegionStat> for RegionStat {
298 fn from(value: &api::v1::meta::RegionStat) -> Self {
299 let region_stat = value
300 .extensions
301 .get(REGION_STATISTIC_KEY)
302 .and_then(|value| RegionStatistic::deserialize_from_slice(value))
303 .unwrap_or_default();
304
305 Self {
306 id: RegionId::from_u64(value.region_id),
307 rcus: value.rcus,
308 wcus: value.wcus,
309 approximate_bytes: value.approximate_bytes as u64,
310 engine: value.engine.clone(),
311 role: RegionRole::from(value.role()),
312 num_rows: region_stat.num_rows,
313 memtable_size: region_stat.memtable_size,
314 manifest_size: region_stat.manifest_size,
315 sst_size: region_stat.sst_size,
316 sst_num: region_stat.sst_num,
317 index_size: region_stat.index_size,
318 region_manifest: region_stat.manifest.into(),
319 written_bytes: region_stat.written_bytes,
320 data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
321 metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
322 }
323 }
324}
325
326impl From<&api::v1::meta::TopicStat> for TopicStat {
327 fn from(value: &api::v1::meta::TopicStat) -> Self {
328 Self {
329 topic: value.topic_name.clone(),
330 latest_entry_id: value.latest_entry_id,
331 record_size: value.record_size,
332 record_num: value.record_num,
333 }
334 }
335}
336
337#[derive(Debug, Clone, Serialize, Deserialize, Default)]
338pub struct GcStat {
339 pub running_gc_tasks: u32,
341 pub gc_concurrency: u32,
343}
344
345impl GcStat {
346 pub const GC_STAT_KEY: &str = "__gc_stat";
347
348 pub fn new(running_gc_tasks: u32, gc_concurrency: u32) -> Self {
349 Self {
350 running_gc_tasks,
351 gc_concurrency,
352 }
353 }
354
355 pub fn into_extensions(&self, extensions: &mut std::collections::HashMap<String, Vec<u8>>) {
356 let bytes = serde_json::to_vec(self).unwrap_or_default();
357 extensions.insert(Self::GC_STAT_KEY.to_string(), bytes);
358 }
359
360 pub fn from_extensions(
361 extensions: &std::collections::HashMap<String, Vec<u8>>,
362 ) -> Result<Option<Self>> {
363 extensions
364 .get(Self::GC_STAT_KEY)
365 .map(|bytes| {
366 serde_json::from_slice(bytes).with_context(|_| DeserializeFromJsonSnafu {
367 input: String::from_utf8_lossy(bytes).to_string(),
368 })
369 })
370 .transpose()
371 }
372}
373
374#[derive(Debug, Clone, Serialize, Deserialize, Default)]
376pub struct EnvVars {
377 pub vars: HashMap<String, String>,
378}
379
380impl EnvVars {
381 pub const ENV_VARS_KEY: &str = "__env_vars";
382
383 pub fn new(vars: HashMap<String, String>) -> Self {
384 Self { vars }
385 }
386
387 pub fn from_config(keys: &[String]) -> Self {
389 let vars = keys
390 .iter()
391 .filter_map(|key| std::env::var(key).ok().map(|value| (key.clone(), value)))
392 .collect();
393 Self { vars }
394 }
395
396 pub fn into_extensions(&self, extensions: &mut HashMap<String, Vec<u8>>) {
397 if self.vars.is_empty() {
398 return;
399 }
400 let bytes = serde_json::to_vec(self).unwrap_or_default();
401 extensions.insert(Self::ENV_VARS_KEY.to_string(), bytes);
402 }
403
404 pub fn from_extensions(extensions: &HashMap<String, Vec<u8>>) -> Result<Option<Self>> {
405 extensions
406 .get(Self::ENV_VARS_KEY)
407 .map(|bytes| {
408 serde_json::from_slice(bytes).with_context(|_| DeserializeFromJsonSnafu {
409 input: String::from_utf8_lossy(bytes).to_string(),
410 })
411 })
412 .transpose()
413 }
414}
415
416#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
420pub struct DatanodeStatKey {
421 pub node_id: u64,
422}
423
424impl DatanodeStatKey {
425 pub fn prefix_key() -> Vec<u8> {
427 format!("{DATANODE_STAT_PREFIX}-0-").into_bytes()
429 }
430}
431
432impl From<DatanodeStatKey> for Vec<u8> {
433 fn from(value: DatanodeStatKey) -> Self {
434 format!("{}-0-{}", DATANODE_STAT_PREFIX, value.node_id).into_bytes()
436 }
437}
438
439impl FromStr for DatanodeStatKey {
440 type Err = error::Error;
441
442 fn from_str(key: &str) -> Result<Self> {
443 let caps = DATANODE_STAT_KEY_PATTERN
444 .captures(key)
445 .context(error::InvalidStatKeySnafu { key })?;
446
447 ensure!(caps.len() == 3, error::InvalidStatKeySnafu { key });
448 let node_id = caps[2].to_string();
449 let node_id: u64 = node_id.parse().context(error::ParseNumSnafu {
450 err_msg: format!("invalid node_id: {node_id}"),
451 })?;
452
453 Ok(Self { node_id })
454 }
455}
456
457impl TryFrom<Vec<u8>> for DatanodeStatKey {
458 type Error = error::Error;
459
460 fn try_from(bytes: Vec<u8>) -> Result<Self> {
461 String::from_utf8(bytes)
462 .context(error::FromUtf8Snafu {
463 name: "DatanodeStatKey",
464 })
465 .map(|x| x.parse())?
466 }
467}
468
469#[derive(Debug, Clone, Serialize, Deserialize)]
471#[serde(transparent)]
472pub struct DatanodeStatValue {
473 pub stats: Vec<Stat>,
474}
475
476impl DatanodeStatValue {
477 pub fn region_num(&self) -> Option<u64> {
479 self.stats.last().map(|x| x.region_num)
480 }
481
482 pub fn node_addr(&self) -> Option<String> {
484 self.stats.last().map(|x| x.addr.clone())
485 }
486}
487
488impl TryFrom<DatanodeStatValue> for Vec<u8> {
489 type Error = error::Error;
490
491 fn try_from(stats: DatanodeStatValue) -> Result<Self> {
492 Ok(serde_json::to_string(&stats)
493 .context(error::SerializeToJsonSnafu {
494 input: format!("{stats:?}"),
495 })?
496 .into_bytes())
497 }
498}
499
500impl FromStr for DatanodeStatValue {
501 type Err = error::Error;
502
503 fn from_str(value: &str) -> Result<Self> {
504 serde_json::from_str(value).context(error::DeserializeFromJsonSnafu { input: value })
505 }
506}
507
508impl TryFrom<Vec<u8>> for DatanodeStatValue {
509 type Error = error::Error;
510
511 fn try_from(value: Vec<u8>) -> Result<Self> {
512 String::from_utf8(value)
513 .context(error::FromUtf8Snafu {
514 name: "DatanodeStatValue",
515 })
516 .map(|x| x.parse())?
517 }
518}
519
520#[cfg(test)]
521mod tests {
522 use super::*;
523
524 #[test]
525 fn test_stat_key() {
526 let stat = Stat {
527 id: 101,
528 region_num: 10,
529 ..Default::default()
530 };
531
532 let stat_key = stat.stat_key();
533
534 assert_eq!(101, stat_key.node_id);
535 }
536
537 #[test]
538 fn test_stat_val_round_trip() {
539 let stat = Stat {
540 id: 101,
541 region_num: 100,
542 ..Default::default()
543 };
544
545 let stat_val = DatanodeStatValue { stats: vec![stat] };
546
547 let bytes: Vec<u8> = stat_val.try_into().unwrap();
548 let stat_val: DatanodeStatValue = bytes.try_into().unwrap();
549 let stats = stat_val.stats;
550
551 assert_eq!(1, stats.len());
552
553 let stat = stats.first().unwrap();
554 assert_eq!(101, stat.id);
555 assert_eq!(100, stat.region_num);
556 }
557
558 #[test]
559 fn test_get_addr_from_stat_val() {
560 let empty = DatanodeStatValue { stats: vec![] };
561 let addr = empty.node_addr();
562 assert!(addr.is_none());
563
564 let stat_val = DatanodeStatValue {
565 stats: vec![
566 Stat {
567 addr: "1".to_string(),
568 ..Default::default()
569 },
570 Stat {
571 addr: "2".to_string(),
572 ..Default::default()
573 },
574 Stat {
575 addr: "3".to_string(),
576 ..Default::default()
577 },
578 ],
579 };
580 let addr = stat_val.node_addr().unwrap();
581 assert_eq!("3", addr);
582 }
583
584 #[test]
585 fn test_get_region_num_from_stat_val() {
586 let empty = DatanodeStatValue { stats: vec![] };
587 let region_num = empty.region_num();
588 assert!(region_num.is_none());
589
590 let wrong = DatanodeStatValue {
591 stats: vec![Stat {
592 region_num: 0,
593 ..Default::default()
594 }],
595 };
596 let right = wrong.region_num();
597 assert_eq!(Some(0), right);
598
599 let stat_val = DatanodeStatValue {
600 stats: vec![
601 Stat {
602 region_num: 1,
603 ..Default::default()
604 },
605 Stat {
606 region_num: 0,
607 ..Default::default()
608 },
609 Stat {
610 region_num: 2,
611 ..Default::default()
612 },
613 ],
614 };
615 let region_num = stat_val.region_num().unwrap();
616 assert_eq!(2, region_num);
617 }
618
619 #[test]
620 fn test_region_stat_from_heartbeat_preserves_staging_leader_role() {
621 let request = HeartbeatRequest {
622 header: Some(RequestHeader::default()),
623 peer: Some(api::v1::meta::Peer {
624 id: 1,
625 addr: "127.0.0.1:3001".to_string(),
626 }),
627 region_stats: vec![api::v1::meta::RegionStat {
628 region_id: RegionId::new(1024, 1).as_u64(),
629 engine: "mito".to_string(),
630 role: api::v1::meta::RegionRole::StagingLeader.into(),
631 ..Default::default()
632 }],
633 ..Default::default()
634 };
635
636 let stat = Stat::try_from(&request).unwrap();
637
638 assert_eq!(stat.region_stats.len(), 1);
639 assert_eq!(stat.region_stats[0].role, RegionRole::StagingLeader);
640 }
641
642 #[test]
643 fn test_env_vars_round_trip() {
644 let mut vars = HashMap::new();
645 vars.insert("AZ".to_string(), "us-east-1a".to_string());
646 vars.insert("REGION".to_string(), "us-east-1".to_string());
647 let env_vars = EnvVars::new(vars);
648
649 let mut extensions = HashMap::new();
650 env_vars.into_extensions(&mut extensions);
651
652 let extracted = EnvVars::from_extensions(&extensions).unwrap().unwrap();
653 assert_eq!(extracted.vars.get("AZ").unwrap(), "us-east-1a");
654 assert_eq!(extracted.vars.get("REGION").unwrap(), "us-east-1");
655 }
656
657 #[test]
658 fn test_env_vars_empty_not_written() {
659 let env_vars = EnvVars::default();
660 let mut extensions = HashMap::new();
661 env_vars.into_extensions(&mut extensions);
662 assert!(extensions.is_empty());
663 }
664
665 #[test]
666 fn test_env_vars_from_extensions_missing() {
667 let extensions = HashMap::new();
668 let result = EnvVars::from_extensions(&extensions).unwrap();
669 assert!(result.is_none());
670 }
671}