1use std::collections::HashSet;
16use std::str::FromStr;
17
18use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, RequestHeader};
19use common_time::util as time_util;
20use lazy_static::lazy_static;
21use regex::Regex;
22use serde::{Deserialize, Serialize};
23use snafu::{OptionExt, ResultExt, ensure};
24use store_api::region_engine::{RegionRole, RegionStatistic};
25use store_api::storage::RegionId;
26use table::metadata::TableId;
27
28use crate::error;
29use crate::error::Result;
30use crate::heartbeat::utils::get_datanode_workloads;
31
32const DATANODE_STAT_PREFIX: &str = "__meta_datanode_stat";
33
34pub const REGION_STATISTIC_KEY: &str = "__region_statistic";
35
36lazy_static! {
37 pub(crate) static ref DATANODE_LEASE_KEY_PATTERN: Regex =
38 Regex::new("^__meta_datanode_lease-([0-9]+)-([0-9]+)$").unwrap();
39 static ref DATANODE_STAT_KEY_PATTERN: Regex =
40 Regex::new(&format!("^{DATANODE_STAT_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
41 static ref INACTIVE_REGION_KEY_PATTERN: Regex =
42 Regex::new("^__meta_inactive_region-([0-9]+)-([0-9]+)-([0-9]+)$").unwrap();
43}
44
45#[derive(Debug, Clone, Default, Serialize, Deserialize)]
49pub struct Stat {
50 pub timestamp_millis: i64,
51 pub id: u64,
53 pub addr: String,
55 pub rcus: i64,
57 pub wcus: i64,
59 pub region_num: u64,
61 pub region_stats: Vec<RegionStat>,
63 pub topic_stats: Vec<TopicStat>,
65 pub node_epoch: u64,
67 pub datanode_workloads: DatanodeWorkloads,
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct RegionStat {
74 pub id: RegionId,
76 pub rcus: i64,
78 pub wcus: i64,
80 pub approximate_bytes: u64,
82 pub engine: String,
84 pub role: RegionRole,
86 pub num_rows: u64,
88 pub memtable_size: u64,
90 pub manifest_size: u64,
92 pub sst_size: u64,
94 pub sst_num: u64,
96 pub index_size: u64,
98 pub region_manifest: RegionManifestInfo,
100 pub written_bytes: u64,
102 pub data_topic_latest_entry_id: u64,
105 pub metadata_topic_latest_entry_id: u64,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct TopicStat {
113 pub topic: String,
115 pub latest_entry_id: u64,
117 pub record_size: u64,
119 pub record_num: u64,
121}
122
123pub trait TopicStatsReporter: Send + Sync {
125 fn reportable_topics(&mut self) -> Vec<TopicStat>;
127}
128
129#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
130pub enum RegionManifestInfo {
131 Mito {
132 manifest_version: u64,
133 flushed_entry_id: u64,
134 },
135 Metric {
136 data_manifest_version: u64,
137 data_flushed_entry_id: u64,
138 metadata_manifest_version: u64,
139 metadata_flushed_entry_id: u64,
140 },
141}
142
143impl Stat {
144 #[inline]
145 pub fn is_empty(&self) -> bool {
146 self.region_stats.is_empty()
147 }
148
149 pub fn stat_key(&self) -> DatanodeStatKey {
150 DatanodeStatKey { node_id: self.id }
151 }
152
153 pub fn regions(&self) -> Vec<(RegionId, RegionRole)> {
155 self.region_stats.iter().map(|s| (s.id, s.role)).collect()
156 }
157
158 pub fn table_ids(&self) -> HashSet<TableId> {
160 self.region_stats.iter().map(|s| s.id.table_id()).collect()
161 }
162
163 pub fn retain_active_region_stats(&mut self, inactive_region_ids: &HashSet<RegionId>) {
165 if inactive_region_ids.is_empty() {
166 return;
167 }
168
169 self.region_stats
170 .retain(|r| !inactive_region_ids.contains(&r.id));
171 self.rcus = self.region_stats.iter().map(|s| s.rcus).sum();
172 self.wcus = self.region_stats.iter().map(|s| s.wcus).sum();
173 self.region_num = self.region_stats.len() as u64;
174 }
175
176 pub fn memory_size(&self) -> usize {
177 std::mem::size_of::<i64>() * 3 +
179 std::mem::size_of::<u64>() * 3 +
181 std::mem::size_of::<String>() + self.addr.capacity() +
183 self.region_stats.iter().map(|s| s.memory_size()).sum::<usize>()
185 }
186}
187
188impl RegionStat {
189 pub fn memory_size(&self) -> usize {
190 std::mem::size_of::<RegionRole>() +
192 std::mem::size_of::<RegionId>() +
194 std::mem::size_of::<i64>() * 4 +
196 std::mem::size_of::<u64>() * 5 +
198 std::mem::size_of::<String>() + self.engine.capacity() +
200 self.region_manifest.memory_size()
202 }
203}
204
205impl RegionManifestInfo {
206 pub fn memory_size(&self) -> usize {
207 match self {
208 RegionManifestInfo::Mito { .. } => std::mem::size_of::<u64>() * 2,
209 RegionManifestInfo::Metric { .. } => std::mem::size_of::<u64>() * 4,
210 }
211 }
212}
213
214impl TryFrom<&HeartbeatRequest> for Stat {
215 type Error = Option<RequestHeader>;
216
217 fn try_from(value: &HeartbeatRequest) -> std::result::Result<Self, Self::Error> {
218 let HeartbeatRequest {
219 header,
220 peer,
221 region_stats,
222 node_epoch,
223 node_workloads,
224 topic_stats,
225 ..
226 } = value;
227
228 match (header, peer) {
229 (Some(_header), Some(peer)) => {
230 let region_stats = region_stats
231 .iter()
232 .map(RegionStat::from)
233 .collect::<Vec<_>>();
234 let topic_stats = topic_stats.iter().map(TopicStat::from).collect::<Vec<_>>();
235
236 let datanode_workloads = get_datanode_workloads(node_workloads.as_ref());
237 Ok(Self {
238 timestamp_millis: time_util::current_time_millis(),
239 id: peer.id,
241 addr: peer.addr.clone(),
243 rcus: region_stats.iter().map(|s| s.rcus).sum(),
244 wcus: region_stats.iter().map(|s| s.wcus).sum(),
245 region_num: region_stats.len() as u64,
246 region_stats,
247 topic_stats,
248 node_epoch: *node_epoch,
249 datanode_workloads,
250 })
251 }
252 (header, _) => Err(header.clone()),
253 }
254 }
255}
256
257impl From<store_api::region_engine::RegionManifestInfo> for RegionManifestInfo {
258 fn from(value: store_api::region_engine::RegionManifestInfo) -> Self {
259 match value {
260 store_api::region_engine::RegionManifestInfo::Mito {
261 manifest_version,
262 flushed_entry_id,
263 } => RegionManifestInfo::Mito {
264 manifest_version,
265 flushed_entry_id,
266 },
267 store_api::region_engine::RegionManifestInfo::Metric {
268 data_manifest_version,
269 data_flushed_entry_id,
270 metadata_manifest_version,
271 metadata_flushed_entry_id,
272 } => RegionManifestInfo::Metric {
273 data_manifest_version,
274 data_flushed_entry_id,
275 metadata_manifest_version,
276 metadata_flushed_entry_id,
277 },
278 }
279 }
280}
281
282impl From<&api::v1::meta::RegionStat> for RegionStat {
283 fn from(value: &api::v1::meta::RegionStat) -> Self {
284 let region_stat = value
285 .extensions
286 .get(REGION_STATISTIC_KEY)
287 .and_then(|value| RegionStatistic::deserialize_from_slice(value))
288 .unwrap_or_default();
289
290 Self {
291 id: RegionId::from_u64(value.region_id),
292 rcus: value.rcus,
293 wcus: value.wcus,
294 approximate_bytes: value.approximate_bytes as u64,
295 engine: value.engine.clone(),
296 role: RegionRole::from(value.role()),
297 num_rows: region_stat.num_rows,
298 memtable_size: region_stat.memtable_size,
299 manifest_size: region_stat.manifest_size,
300 sst_size: region_stat.sst_size,
301 sst_num: region_stat.sst_num,
302 index_size: region_stat.index_size,
303 region_manifest: region_stat.manifest.into(),
304 written_bytes: region_stat.written_bytes,
305 data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
306 metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
307 }
308 }
309}
310
311impl From<&api::v1::meta::TopicStat> for TopicStat {
312 fn from(value: &api::v1::meta::TopicStat) -> Self {
313 Self {
314 topic: value.topic_name.clone(),
315 latest_entry_id: value.latest_entry_id,
316 record_size: value.record_size,
317 record_num: value.record_num,
318 }
319 }
320}
321
322#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
326pub struct DatanodeStatKey {
327 pub node_id: u64,
328}
329
330impl DatanodeStatKey {
331 pub fn prefix_key() -> Vec<u8> {
333 format!("{DATANODE_STAT_PREFIX}-0-").into_bytes()
335 }
336}
337
338impl From<DatanodeStatKey> for Vec<u8> {
339 fn from(value: DatanodeStatKey) -> Self {
340 format!("{}-0-{}", DATANODE_STAT_PREFIX, value.node_id).into_bytes()
342 }
343}
344
345impl FromStr for DatanodeStatKey {
346 type Err = error::Error;
347
348 fn from_str(key: &str) -> Result<Self> {
349 let caps = DATANODE_STAT_KEY_PATTERN
350 .captures(key)
351 .context(error::InvalidStatKeySnafu { key })?;
352
353 ensure!(caps.len() == 3, error::InvalidStatKeySnafu { key });
354 let node_id = caps[2].to_string();
355 let node_id: u64 = node_id.parse().context(error::ParseNumSnafu {
356 err_msg: format!("invalid node_id: {node_id}"),
357 })?;
358
359 Ok(Self { node_id })
360 }
361}
362
363impl TryFrom<Vec<u8>> for DatanodeStatKey {
364 type Error = error::Error;
365
366 fn try_from(bytes: Vec<u8>) -> Result<Self> {
367 String::from_utf8(bytes)
368 .context(error::FromUtf8Snafu {
369 name: "DatanodeStatKey",
370 })
371 .map(|x| x.parse())?
372 }
373}
374
375#[derive(Debug, Clone, Serialize, Deserialize)]
377#[serde(transparent)]
378pub struct DatanodeStatValue {
379 pub stats: Vec<Stat>,
380}
381
382impl DatanodeStatValue {
383 pub fn region_num(&self) -> Option<u64> {
385 self.stats.last().map(|x| x.region_num)
386 }
387
388 pub fn node_addr(&self) -> Option<String> {
390 self.stats.last().map(|x| x.addr.clone())
391 }
392}
393
394impl TryFrom<DatanodeStatValue> for Vec<u8> {
395 type Error = error::Error;
396
397 fn try_from(stats: DatanodeStatValue) -> Result<Self> {
398 Ok(serde_json::to_string(&stats)
399 .context(error::SerializeToJsonSnafu {
400 input: format!("{stats:?}"),
401 })?
402 .into_bytes())
403 }
404}
405
406impl FromStr for DatanodeStatValue {
407 type Err = error::Error;
408
409 fn from_str(value: &str) -> Result<Self> {
410 serde_json::from_str(value).context(error::DeserializeFromJsonSnafu { input: value })
411 }
412}
413
414impl TryFrom<Vec<u8>> for DatanodeStatValue {
415 type Error = error::Error;
416
417 fn try_from(value: Vec<u8>) -> Result<Self> {
418 String::from_utf8(value)
419 .context(error::FromUtf8Snafu {
420 name: "DatanodeStatValue",
421 })
422 .map(|x| x.parse())?
423 }
424}
425
426#[cfg(test)]
427mod tests {
428 use super::*;
429
430 #[test]
431 fn test_stat_key() {
432 let stat = Stat {
433 id: 101,
434 region_num: 10,
435 ..Default::default()
436 };
437
438 let stat_key = stat.stat_key();
439
440 assert_eq!(101, stat_key.node_id);
441 }
442
443 #[test]
444 fn test_stat_val_round_trip() {
445 let stat = Stat {
446 id: 101,
447 region_num: 100,
448 ..Default::default()
449 };
450
451 let stat_val = DatanodeStatValue { stats: vec![stat] };
452
453 let bytes: Vec<u8> = stat_val.try_into().unwrap();
454 let stat_val: DatanodeStatValue = bytes.try_into().unwrap();
455 let stats = stat_val.stats;
456
457 assert_eq!(1, stats.len());
458
459 let stat = stats.first().unwrap();
460 assert_eq!(101, stat.id);
461 assert_eq!(100, stat.region_num);
462 }
463
464 #[test]
465 fn test_get_addr_from_stat_val() {
466 let empty = DatanodeStatValue { stats: vec![] };
467 let addr = empty.node_addr();
468 assert!(addr.is_none());
469
470 let stat_val = DatanodeStatValue {
471 stats: vec![
472 Stat {
473 addr: "1".to_string(),
474 ..Default::default()
475 },
476 Stat {
477 addr: "2".to_string(),
478 ..Default::default()
479 },
480 Stat {
481 addr: "3".to_string(),
482 ..Default::default()
483 },
484 ],
485 };
486 let addr = stat_val.node_addr().unwrap();
487 assert_eq!("3", addr);
488 }
489
490 #[test]
491 fn test_get_region_num_from_stat_val() {
492 let empty = DatanodeStatValue { stats: vec![] };
493 let region_num = empty.region_num();
494 assert!(region_num.is_none());
495
496 let wrong = DatanodeStatValue {
497 stats: vec![Stat {
498 region_num: 0,
499 ..Default::default()
500 }],
501 };
502 let right = wrong.region_num();
503 assert_eq!(Some(0), right);
504
505 let stat_val = DatanodeStatValue {
506 stats: vec![
507 Stat {
508 region_num: 1,
509 ..Default::default()
510 },
511 Stat {
512 region_num: 0,
513 ..Default::default()
514 },
515 Stat {
516 region_num: 2,
517 ..Default::default()
518 },
519 ],
520 };
521 let region_num = stat_val.region_num().unwrap();
522 assert_eq!(2, region_num);
523 }
524}