1use std::collections::HashSet;
16use std::str::FromStr;
17
18use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, RequestHeader};
19use common_time::util as time_util;
20use lazy_static::lazy_static;
21use regex::Regex;
22use serde::{Deserialize, Serialize};
23use snafu::{OptionExt, ResultExt, ensure};
24use store_api::region_engine::{RegionRole, RegionStatistic};
25use store_api::storage::RegionId;
26use table::metadata::TableId;
27
28use crate::error::{self, DeserializeFromJsonSnafu, Result};
29use crate::heartbeat::utils::get_datanode_workloads;
30
31const DATANODE_STAT_PREFIX: &str = "__meta_datanode_stat";
32
33pub const REGION_STATISTIC_KEY: &str = "__region_statistic";
34
35lazy_static! {
36 pub(crate) static ref DATANODE_LEASE_KEY_PATTERN: Regex =
37 Regex::new("^__meta_datanode_lease-([0-9]+)-([0-9]+)$").unwrap();
38 static ref DATANODE_STAT_KEY_PATTERN: Regex =
39 Regex::new(&format!("^{DATANODE_STAT_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
40 static ref INACTIVE_REGION_KEY_PATTERN: Regex =
41 Regex::new("^__meta_inactive_region-([0-9]+)-([0-9]+)-([0-9]+)$").unwrap();
42}
43
44#[derive(Debug, Clone, Default, Serialize, Deserialize)]
48pub struct Stat {
49 pub timestamp_millis: i64,
50 pub id: u64,
52 pub addr: String,
54 pub rcus: i64,
56 pub wcus: i64,
58 pub region_num: u64,
60 pub region_stats: Vec<RegionStat>,
62 pub topic_stats: Vec<TopicStat>,
64 pub node_epoch: u64,
66 pub datanode_workloads: DatanodeWorkloads,
68 pub gc_stat: Option<GcStat>,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
74pub struct RegionStat {
75 pub id: RegionId,
77 pub rcus: i64,
79 pub wcus: i64,
81 pub approximate_bytes: u64,
83 pub engine: String,
85 pub role: RegionRole,
87 pub num_rows: u64,
89 pub memtable_size: u64,
91 pub manifest_size: u64,
93 pub sst_size: u64,
95 pub sst_num: u64,
97 pub index_size: u64,
99 pub region_manifest: RegionManifestInfo,
101 pub written_bytes: u64,
103 pub data_topic_latest_entry_id: u64,
106 pub metadata_topic_latest_entry_id: u64,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct TopicStat {
114 pub topic: String,
116 pub latest_entry_id: u64,
118 pub record_size: u64,
120 pub record_num: u64,
122}
123
124pub trait TopicStatsReporter: Send + Sync {
126 fn reportable_topics(&mut self) -> Vec<TopicStat>;
128}
129
130#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
131pub enum RegionManifestInfo {
132 Mito {
133 manifest_version: u64,
134 flushed_entry_id: u64,
135 },
136 Metric {
137 data_manifest_version: u64,
138 data_flushed_entry_id: u64,
139 metadata_manifest_version: u64,
140 metadata_flushed_entry_id: u64,
141 },
142}
143
144impl Stat {
145 #[inline]
146 pub fn is_empty(&self) -> bool {
147 self.region_stats.is_empty()
148 }
149
150 pub fn stat_key(&self) -> DatanodeStatKey {
151 DatanodeStatKey { node_id: self.id }
152 }
153
154 pub fn regions(&self) -> Vec<(RegionId, RegionRole)> {
156 self.region_stats.iter().map(|s| (s.id, s.role)).collect()
157 }
158
159 pub fn table_ids(&self) -> HashSet<TableId> {
161 self.region_stats.iter().map(|s| s.id.table_id()).collect()
162 }
163
164 pub fn retain_active_region_stats(&mut self, inactive_region_ids: &HashSet<RegionId>) {
166 if inactive_region_ids.is_empty() {
167 return;
168 }
169
170 self.region_stats
171 .retain(|r| !inactive_region_ids.contains(&r.id));
172 self.rcus = self.region_stats.iter().map(|s| s.rcus).sum();
173 self.wcus = self.region_stats.iter().map(|s| s.wcus).sum();
174 self.region_num = self.region_stats.len() as u64;
175 }
176
177 pub fn memory_size(&self) -> usize {
178 std::mem::size_of::<i64>() * 3 +
180 std::mem::size_of::<u64>() * 3 +
182 std::mem::size_of::<String>() + self.addr.capacity() +
184 self.region_stats.iter().map(|s| s.memory_size()).sum::<usize>()
186 }
187}
188
189impl RegionStat {
190 pub fn memory_size(&self) -> usize {
191 std::mem::size_of::<RegionRole>() +
193 std::mem::size_of::<RegionId>() +
195 std::mem::size_of::<i64>() * 4 +
197 std::mem::size_of::<u64>() * 5 +
199 std::mem::size_of::<String>() + self.engine.capacity() +
201 self.region_manifest.memory_size()
203 }
204}
205
206impl RegionManifestInfo {
207 pub fn memory_size(&self) -> usize {
208 match self {
209 RegionManifestInfo::Mito { .. } => std::mem::size_of::<u64>() * 2,
210 RegionManifestInfo::Metric { .. } => std::mem::size_of::<u64>() * 4,
211 }
212 }
213}
214
215impl TryFrom<&HeartbeatRequest> for Stat {
216 type Error = Option<RequestHeader>;
217
218 fn try_from(value: &HeartbeatRequest) -> std::result::Result<Self, Self::Error> {
219 let HeartbeatRequest {
220 header,
221 peer,
222 region_stats,
223 node_epoch,
224 node_workloads,
225 topic_stats,
226 extensions,
227 ..
228 } = value;
229
230 match (header, peer) {
231 (Some(header), Some(peer)) => {
232 let region_stats = region_stats
233 .iter()
234 .map(RegionStat::from)
235 .collect::<Vec<_>>();
236 let topic_stats = topic_stats.iter().map(TopicStat::from).collect::<Vec<_>>();
237
238 let datanode_workloads = get_datanode_workloads(node_workloads.as_ref());
239
240 let gc_stat = GcStat::from_extensions(extensions).map_err(|err| {
241 common_telemetry::error!(
242 "Failed to deserialize GcStat from extensions: {}",
243 err
244 );
245 header.clone()
246 })?;
247 Ok(Self {
248 timestamp_millis: time_util::current_time_millis(),
249 id: peer.id,
251 addr: peer.addr.clone(),
253 rcus: region_stats.iter().map(|s| s.rcus).sum(),
254 wcus: region_stats.iter().map(|s| s.wcus).sum(),
255 region_num: region_stats.len() as u64,
256 region_stats,
257 topic_stats,
258 node_epoch: *node_epoch,
259 datanode_workloads,
260 gc_stat,
261 })
262 }
263 (header, _) => Err(header.clone()),
264 }
265 }
266}
267
268impl From<store_api::region_engine::RegionManifestInfo> for RegionManifestInfo {
269 fn from(value: store_api::region_engine::RegionManifestInfo) -> Self {
270 match value {
271 store_api::region_engine::RegionManifestInfo::Mito {
272 manifest_version,
273 flushed_entry_id,
274 } => RegionManifestInfo::Mito {
275 manifest_version,
276 flushed_entry_id,
277 },
278 store_api::region_engine::RegionManifestInfo::Metric {
279 data_manifest_version,
280 data_flushed_entry_id,
281 metadata_manifest_version,
282 metadata_flushed_entry_id,
283 } => RegionManifestInfo::Metric {
284 data_manifest_version,
285 data_flushed_entry_id,
286 metadata_manifest_version,
287 metadata_flushed_entry_id,
288 },
289 }
290 }
291}
292
293impl From<&api::v1::meta::RegionStat> for RegionStat {
294 fn from(value: &api::v1::meta::RegionStat) -> Self {
295 let region_stat = value
296 .extensions
297 .get(REGION_STATISTIC_KEY)
298 .and_then(|value| RegionStatistic::deserialize_from_slice(value))
299 .unwrap_or_default();
300
301 Self {
302 id: RegionId::from_u64(value.region_id),
303 rcus: value.rcus,
304 wcus: value.wcus,
305 approximate_bytes: value.approximate_bytes as u64,
306 engine: value.engine.clone(),
307 role: RegionRole::from(value.role()),
308 num_rows: region_stat.num_rows,
309 memtable_size: region_stat.memtable_size,
310 manifest_size: region_stat.manifest_size,
311 sst_size: region_stat.sst_size,
312 sst_num: region_stat.sst_num,
313 index_size: region_stat.index_size,
314 region_manifest: region_stat.manifest.into(),
315 written_bytes: region_stat.written_bytes,
316 data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
317 metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
318 }
319 }
320}
321
322impl From<&api::v1::meta::TopicStat> for TopicStat {
323 fn from(value: &api::v1::meta::TopicStat) -> Self {
324 Self {
325 topic: value.topic_name.clone(),
326 latest_entry_id: value.latest_entry_id,
327 record_size: value.record_size,
328 record_num: value.record_num,
329 }
330 }
331}
332
333#[derive(Debug, Clone, Serialize, Deserialize, Default)]
334pub struct GcStat {
335 pub running_gc_tasks: u32,
337 pub gc_concurrency: u32,
339}
340
341impl GcStat {
342 pub const GC_STAT_KEY: &str = "__gc_stat";
343
344 pub fn new(running_gc_tasks: u32, gc_concurrency: u32) -> Self {
345 Self {
346 running_gc_tasks,
347 gc_concurrency,
348 }
349 }
350
351 pub fn into_extensions(&self, extensions: &mut std::collections::HashMap<String, Vec<u8>>) {
352 let bytes = serde_json::to_vec(self).unwrap_or_default();
353 extensions.insert(Self::GC_STAT_KEY.to_string(), bytes);
354 }
355
356 pub fn from_extensions(
357 extensions: &std::collections::HashMap<String, Vec<u8>>,
358 ) -> Result<Option<Self>> {
359 extensions
360 .get(Self::GC_STAT_KEY)
361 .map(|bytes| {
362 serde_json::from_slice(bytes).with_context(|_| DeserializeFromJsonSnafu {
363 input: String::from_utf8_lossy(bytes).to_string(),
364 })
365 })
366 .transpose()
367 }
368}
369
370#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
374pub struct DatanodeStatKey {
375 pub node_id: u64,
376}
377
378impl DatanodeStatKey {
379 pub fn prefix_key() -> Vec<u8> {
381 format!("{DATANODE_STAT_PREFIX}-0-").into_bytes()
383 }
384}
385
386impl From<DatanodeStatKey> for Vec<u8> {
387 fn from(value: DatanodeStatKey) -> Self {
388 format!("{}-0-{}", DATANODE_STAT_PREFIX, value.node_id).into_bytes()
390 }
391}
392
393impl FromStr for DatanodeStatKey {
394 type Err = error::Error;
395
396 fn from_str(key: &str) -> Result<Self> {
397 let caps = DATANODE_STAT_KEY_PATTERN
398 .captures(key)
399 .context(error::InvalidStatKeySnafu { key })?;
400
401 ensure!(caps.len() == 3, error::InvalidStatKeySnafu { key });
402 let node_id = caps[2].to_string();
403 let node_id: u64 = node_id.parse().context(error::ParseNumSnafu {
404 err_msg: format!("invalid node_id: {node_id}"),
405 })?;
406
407 Ok(Self { node_id })
408 }
409}
410
411impl TryFrom<Vec<u8>> for DatanodeStatKey {
412 type Error = error::Error;
413
414 fn try_from(bytes: Vec<u8>) -> Result<Self> {
415 String::from_utf8(bytes)
416 .context(error::FromUtf8Snafu {
417 name: "DatanodeStatKey",
418 })
419 .map(|x| x.parse())?
420 }
421}
422
423#[derive(Debug, Clone, Serialize, Deserialize)]
425#[serde(transparent)]
426pub struct DatanodeStatValue {
427 pub stats: Vec<Stat>,
428}
429
430impl DatanodeStatValue {
431 pub fn region_num(&self) -> Option<u64> {
433 self.stats.last().map(|x| x.region_num)
434 }
435
436 pub fn node_addr(&self) -> Option<String> {
438 self.stats.last().map(|x| x.addr.clone())
439 }
440}
441
442impl TryFrom<DatanodeStatValue> for Vec<u8> {
443 type Error = error::Error;
444
445 fn try_from(stats: DatanodeStatValue) -> Result<Self> {
446 Ok(serde_json::to_string(&stats)
447 .context(error::SerializeToJsonSnafu {
448 input: format!("{stats:?}"),
449 })?
450 .into_bytes())
451 }
452}
453
454impl FromStr for DatanodeStatValue {
455 type Err = error::Error;
456
457 fn from_str(value: &str) -> Result<Self> {
458 serde_json::from_str(value).context(error::DeserializeFromJsonSnafu { input: value })
459 }
460}
461
462impl TryFrom<Vec<u8>> for DatanodeStatValue {
463 type Error = error::Error;
464
465 fn try_from(value: Vec<u8>) -> Result<Self> {
466 String::from_utf8(value)
467 .context(error::FromUtf8Snafu {
468 name: "DatanodeStatValue",
469 })
470 .map(|x| x.parse())?
471 }
472}
473
474#[cfg(test)]
475mod tests {
476 use super::*;
477
478 #[test]
479 fn test_stat_key() {
480 let stat = Stat {
481 id: 101,
482 region_num: 10,
483 ..Default::default()
484 };
485
486 let stat_key = stat.stat_key();
487
488 assert_eq!(101, stat_key.node_id);
489 }
490
491 #[test]
492 fn test_stat_val_round_trip() {
493 let stat = Stat {
494 id: 101,
495 region_num: 100,
496 ..Default::default()
497 };
498
499 let stat_val = DatanodeStatValue { stats: vec![stat] };
500
501 let bytes: Vec<u8> = stat_val.try_into().unwrap();
502 let stat_val: DatanodeStatValue = bytes.try_into().unwrap();
503 let stats = stat_val.stats;
504
505 assert_eq!(1, stats.len());
506
507 let stat = stats.first().unwrap();
508 assert_eq!(101, stat.id);
509 assert_eq!(100, stat.region_num);
510 }
511
512 #[test]
513 fn test_get_addr_from_stat_val() {
514 let empty = DatanodeStatValue { stats: vec![] };
515 let addr = empty.node_addr();
516 assert!(addr.is_none());
517
518 let stat_val = DatanodeStatValue {
519 stats: vec![
520 Stat {
521 addr: "1".to_string(),
522 ..Default::default()
523 },
524 Stat {
525 addr: "2".to_string(),
526 ..Default::default()
527 },
528 Stat {
529 addr: "3".to_string(),
530 ..Default::default()
531 },
532 ],
533 };
534 let addr = stat_val.node_addr().unwrap();
535 assert_eq!("3", addr);
536 }
537
538 #[test]
539 fn test_get_region_num_from_stat_val() {
540 let empty = DatanodeStatValue { stats: vec![] };
541 let region_num = empty.region_num();
542 assert!(region_num.is_none());
543
544 let wrong = DatanodeStatValue {
545 stats: vec![Stat {
546 region_num: 0,
547 ..Default::default()
548 }],
549 };
550 let right = wrong.region_num();
551 assert_eq!(Some(0), right);
552
553 let stat_val = DatanodeStatValue {
554 stats: vec![
555 Stat {
556 region_num: 1,
557 ..Default::default()
558 },
559 Stat {
560 region_num: 0,
561 ..Default::default()
562 },
563 Stat {
564 region_num: 2,
565 ..Default::default()
566 },
567 ],
568 };
569 let region_num = stat_val.region_num().unwrap();
570 assert_eq!(2, region_num);
571 }
572}