Skip to main content

common_meta/
datanode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::str::FromStr;
17
18use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, RequestHeader};
19use common_time::util as time_util;
20use lazy_static::lazy_static;
21use regex::Regex;
22use serde::{Deserialize, Serialize};
23use snafu::{OptionExt, ResultExt, ensure};
24use store_api::region_engine::{RegionRole, RegionStatistic};
25use store_api::storage::RegionId;
26use table::metadata::TableId;
27
28use crate::error::{self, DeserializeFromJsonSnafu, Result};
29use crate::heartbeat::utils::get_datanode_workloads;
30
31const DATANODE_STAT_PREFIX: &str = "__meta_datanode_stat";
32
33pub const REGION_STATISTIC_KEY: &str = "__region_statistic";
34
35lazy_static! {
36    pub(crate) static ref DATANODE_LEASE_KEY_PATTERN: Regex =
37        Regex::new("^__meta_datanode_lease-([0-9]+)-([0-9]+)$").unwrap();
38    static ref DATANODE_STAT_KEY_PATTERN: Regex =
39        Regex::new(&format!("^{DATANODE_STAT_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
40    static ref INACTIVE_REGION_KEY_PATTERN: Regex =
41        Regex::new("^__meta_inactive_region-([0-9]+)-([0-9]+)-([0-9]+)$").unwrap();
42}
43
44/// The key of the datanode stat in the storage.
45///
46/// The format is `__meta_datanode_stat-0-{node_id}`.
47#[derive(Debug, Clone, Default, Serialize, Deserialize)]
48pub struct Stat {
49    pub timestamp_millis: i64,
50    // The datanode Id.
51    pub id: u64,
52    // The datanode address.
53    pub addr: String,
54    /// The read capacity units during this period
55    pub rcus: i64,
56    /// The write capacity units during this period
57    pub wcus: i64,
58    /// How many regions on this node
59    pub region_num: u64,
60    /// The region stats of the datanode.
61    pub region_stats: Vec<RegionStat>,
62    /// The topic stats of the datanode.
63    pub topic_stats: Vec<TopicStat>,
64    // The node epoch is used to check whether the node has restarted or redeployed.
65    pub node_epoch: u64,
66    /// The datanode workloads.
67    pub datanode_workloads: DatanodeWorkloads,
68    /// The GC statistics of the datanode.
69    pub gc_stat: Option<GcStat>,
70}
71
72/// The statistics of a region.
73#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
74pub struct RegionStat {
75    /// The region_id.
76    pub id: RegionId,
77    /// The read capacity units during this period
78    pub rcus: i64,
79    /// The write capacity units during this period
80    pub wcus: i64,
81    /// Approximate disk bytes of this region, including sst, index, manifest and wal
82    pub approximate_bytes: u64,
83    /// The engine name.
84    pub engine: String,
85    /// The region role.
86    pub role: RegionRole,
87    /// The number of rows
88    pub num_rows: u64,
89    /// The size of the memtable in bytes.
90    pub memtable_size: u64,
91    /// The size of the manifest in bytes.
92    pub manifest_size: u64,
93    /// The size of the SST data files in bytes.
94    pub sst_size: u64,
95    /// The num of the SST data files.
96    pub sst_num: u64,
97    /// The size of the SST index files in bytes.
98    pub index_size: u64,
99    /// The manifest infoof the region.
100    pub region_manifest: RegionManifestInfo,
101    /// The total bytes written of the region since region opened.
102    pub written_bytes: u64,
103    /// The latest entry id of topic used by data.
104    /// **Only used by remote WAL prune.**
105    pub data_topic_latest_entry_id: u64,
106    /// The latest entry id of topic used by metadata.
107    /// **Only used by remote WAL prune.**
108    /// In mito engine, this is the same as `data_topic_latest_entry_id`.
109    pub metadata_topic_latest_entry_id: u64,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct TopicStat {
114    /// The topic name.
115    pub topic: String,
116    /// The latest entry id of the topic.
117    pub latest_entry_id: u64,
118    /// The total size in bytes of records appended to the topic.
119    pub record_size: u64,
120    /// The total number of records appended to the topic.
121    pub record_num: u64,
122}
123
124/// Trait for reporting statistics about topics.
125pub trait TopicStatsReporter: Send + Sync {
126    /// Returns a list of topic statistics that can be reported.
127    fn reportable_topics(&mut self) -> Vec<TopicStat>;
128}
129
130#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
131pub enum RegionManifestInfo {
132    Mito {
133        manifest_version: u64,
134        flushed_entry_id: u64,
135        /// Number of files removed in the manifest's `removed_files` field.
136        file_removed_cnt: u64,
137    },
138    Metric {
139        data_manifest_version: u64,
140        data_flushed_entry_id: u64,
141        metadata_manifest_version: u64,
142        metadata_flushed_entry_id: u64,
143    },
144}
145
146impl Stat {
147    #[inline]
148    pub fn is_empty(&self) -> bool {
149        self.region_stats.is_empty()
150    }
151
152    pub fn stat_key(&self) -> DatanodeStatKey {
153        DatanodeStatKey { node_id: self.id }
154    }
155
156    /// Returns a tuple array containing [RegionId] and [RegionRole].
157    pub fn regions(&self) -> Vec<(RegionId, RegionRole)> {
158        self.region_stats.iter().map(|s| (s.id, s.role)).collect()
159    }
160
161    /// Returns all table ids in the region stats.
162    pub fn table_ids(&self) -> HashSet<TableId> {
163        self.region_stats.iter().map(|s| s.id.table_id()).collect()
164    }
165
166    /// Retains the active region stats and updates the rcus, wcus, and region_num.
167    pub fn retain_active_region_stats(&mut self, inactive_region_ids: &HashSet<RegionId>) {
168        if inactive_region_ids.is_empty() {
169            return;
170        }
171
172        self.region_stats
173            .retain(|r| !inactive_region_ids.contains(&r.id));
174        self.rcus = self.region_stats.iter().map(|s| s.rcus).sum();
175        self.wcus = self.region_stats.iter().map(|s| s.wcus).sum();
176        self.region_num = self.region_stats.len() as u64;
177    }
178
179    pub fn memory_size(&self) -> usize {
180        // timestamp_millis, rcus, wcus
181        std::mem::size_of::<i64>() * 3 +
182        // id, region_num, node_epoch
183        std::mem::size_of::<u64>() * 3 +
184        // addr
185        std::mem::size_of::<String>() + self.addr.capacity() +
186        // region_stats
187        self.region_stats.iter().map(|s| s.memory_size()).sum::<usize>()
188    }
189}
190
191impl RegionStat {
192    pub fn memory_size(&self) -> usize {
193        // role
194        std::mem::size_of::<RegionRole>() +
195        // id
196        std::mem::size_of::<RegionId>() +
197        // rcus, wcus, approximate_bytes, num_rows
198        std::mem::size_of::<i64>() * 4 +
199        // memtable_size, manifest_size, sst_size, sst_num, index_size
200        std::mem::size_of::<u64>() * 5 +
201        // engine
202        std::mem::size_of::<String>() + self.engine.capacity() +
203        // region_manifest
204        self.region_manifest.memory_size()
205    }
206}
207
208impl RegionManifestInfo {
209    pub fn memory_size(&self) -> usize {
210        match self {
211            RegionManifestInfo::Mito { .. } => std::mem::size_of::<u64>() * 2,
212            RegionManifestInfo::Metric { .. } => std::mem::size_of::<u64>() * 4,
213        }
214    }
215}
216
217impl TryFrom<&HeartbeatRequest> for Stat {
218    type Error = Option<RequestHeader>;
219
220    fn try_from(value: &HeartbeatRequest) -> std::result::Result<Self, Self::Error> {
221        let HeartbeatRequest {
222            header,
223            peer,
224            region_stats,
225            node_epoch,
226            node_workloads,
227            topic_stats,
228            extensions,
229            ..
230        } = value;
231
232        match (header, peer) {
233            (Some(header), Some(peer)) => {
234                let region_stats = region_stats
235                    .iter()
236                    .map(RegionStat::from)
237                    .collect::<Vec<_>>();
238                let topic_stats = topic_stats.iter().map(TopicStat::from).collect::<Vec<_>>();
239
240                let datanode_workloads = get_datanode_workloads(node_workloads.as_ref());
241
242                let gc_stat = GcStat::from_extensions(extensions).map_err(|err| {
243                    common_telemetry::error!(
244                        "Failed to deserialize GcStat from extensions: {}",
245                        err
246                    );
247                    header.clone()
248                })?;
249                Ok(Self {
250                    timestamp_millis: time_util::current_time_millis(),
251                    // datanode id
252                    id: peer.id,
253                    // datanode address
254                    addr: peer.addr.clone(),
255                    rcus: region_stats.iter().map(|s| s.rcus).sum(),
256                    wcus: region_stats.iter().map(|s| s.wcus).sum(),
257                    region_num: region_stats.len() as u64,
258                    region_stats,
259                    topic_stats,
260                    node_epoch: *node_epoch,
261                    datanode_workloads,
262                    gc_stat,
263                })
264            }
265            (header, _) => Err(header.clone()),
266        }
267    }
268}
269
270impl From<store_api::region_engine::RegionManifestInfo> for RegionManifestInfo {
271    fn from(value: store_api::region_engine::RegionManifestInfo) -> Self {
272        match value {
273            store_api::region_engine::RegionManifestInfo::Mito {
274                manifest_version,
275                flushed_entry_id,
276                file_removed_cnt,
277            } => RegionManifestInfo::Mito {
278                manifest_version,
279                flushed_entry_id,
280                file_removed_cnt,
281            },
282            store_api::region_engine::RegionManifestInfo::Metric {
283                data_manifest_version,
284                data_flushed_entry_id,
285                metadata_manifest_version,
286                metadata_flushed_entry_id,
287            } => RegionManifestInfo::Metric {
288                data_manifest_version,
289                data_flushed_entry_id,
290                metadata_manifest_version,
291                metadata_flushed_entry_id,
292            },
293        }
294    }
295}
296
297impl From<&api::v1::meta::RegionStat> for RegionStat {
298    fn from(value: &api::v1::meta::RegionStat) -> Self {
299        let region_stat = value
300            .extensions
301            .get(REGION_STATISTIC_KEY)
302            .and_then(|value| RegionStatistic::deserialize_from_slice(value))
303            .unwrap_or_default();
304
305        Self {
306            id: RegionId::from_u64(value.region_id),
307            rcus: value.rcus,
308            wcus: value.wcus,
309            approximate_bytes: value.approximate_bytes as u64,
310            engine: value.engine.clone(),
311            role: RegionRole::from(value.role()),
312            num_rows: region_stat.num_rows,
313            memtable_size: region_stat.memtable_size,
314            manifest_size: region_stat.manifest_size,
315            sst_size: region_stat.sst_size,
316            sst_num: region_stat.sst_num,
317            index_size: region_stat.index_size,
318            region_manifest: region_stat.manifest.into(),
319            written_bytes: region_stat.written_bytes,
320            data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
321            metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
322        }
323    }
324}
325
326impl From<&api::v1::meta::TopicStat> for TopicStat {
327    fn from(value: &api::v1::meta::TopicStat) -> Self {
328        Self {
329            topic: value.topic_name.clone(),
330            latest_entry_id: value.latest_entry_id,
331            record_size: value.record_size,
332            record_num: value.record_num,
333        }
334    }
335}
336
337#[derive(Debug, Clone, Serialize, Deserialize, Default)]
338pub struct GcStat {
339    /// Number of GC tasks currently running on the datanode.
340    pub running_gc_tasks: u32,
341    /// The maximum number of concurrent GC tasks the datanode can handle.
342    pub gc_concurrency: u32,
343}
344
345impl GcStat {
346    pub const GC_STAT_KEY: &str = "__gc_stat";
347
348    pub fn new(running_gc_tasks: u32, gc_concurrency: u32) -> Self {
349        Self {
350            running_gc_tasks,
351            gc_concurrency,
352        }
353    }
354
355    pub fn into_extensions(&self, extensions: &mut std::collections::HashMap<String, Vec<u8>>) {
356        let bytes = serde_json::to_vec(self).unwrap_or_default();
357        extensions.insert(Self::GC_STAT_KEY.to_string(), bytes);
358    }
359
360    pub fn from_extensions(
361        extensions: &std::collections::HashMap<String, Vec<u8>>,
362    ) -> Result<Option<Self>> {
363        extensions
364            .get(Self::GC_STAT_KEY)
365            .map(|bytes| {
366                serde_json::from_slice(bytes).with_context(|_| DeserializeFromJsonSnafu {
367                    input: String::from_utf8_lossy(bytes).to_string(),
368                })
369            })
370            .transpose()
371    }
372}
373
374/// Environment variables reported by a node in heartbeat messages.
375#[derive(Debug, Clone, Serialize, Deserialize, Default)]
376pub struct EnvVars {
377    pub vars: HashMap<String, String>,
378}
379
380impl EnvVars {
381    pub const ENV_VARS_KEY: &str = "__env_vars";
382
383    pub fn new(vars: HashMap<String, String>) -> Self {
384        Self { vars }
385    }
386
387    /// Read the configured env var keys from the environment and build an EnvVars.
388    pub fn from_config(keys: &[String]) -> Self {
389        let vars = keys
390            .iter()
391            .filter_map(|key| std::env::var(key).ok().map(|value| (key.clone(), value)))
392            .collect();
393        Self { vars }
394    }
395
396    pub fn into_extensions(&self, extensions: &mut HashMap<String, Vec<u8>>) {
397        if self.vars.is_empty() {
398            return;
399        }
400        let bytes = serde_json::to_vec(self).unwrap_or_default();
401        extensions.insert(Self::ENV_VARS_KEY.to_string(), bytes);
402    }
403
404    pub fn from_extensions(extensions: &HashMap<String, Vec<u8>>) -> Result<Option<Self>> {
405        extensions
406            .get(Self::ENV_VARS_KEY)
407            .map(|bytes| {
408                serde_json::from_slice(bytes).with_context(|_| DeserializeFromJsonSnafu {
409                    input: String::from_utf8_lossy(bytes).to_string(),
410                })
411            })
412            .transpose()
413    }
414}
415
416/// The key of the datanode stat in the memory store.
417///
418/// The format is `__meta_datanode_stat-0-{node_id}`.
419#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
420pub struct DatanodeStatKey {
421    pub node_id: u64,
422}
423
424impl DatanodeStatKey {
425    /// The key prefix.
426    pub fn prefix_key() -> Vec<u8> {
427        // todo(hl): remove cluster id in prefix
428        format!("{DATANODE_STAT_PREFIX}-0-").into_bytes()
429    }
430}
431
432impl From<DatanodeStatKey> for Vec<u8> {
433    fn from(value: DatanodeStatKey) -> Self {
434        // todo(hl): remove cluster id in prefix
435        format!("{}-0-{}", DATANODE_STAT_PREFIX, value.node_id).into_bytes()
436    }
437}
438
439impl FromStr for DatanodeStatKey {
440    type Err = error::Error;
441
442    fn from_str(key: &str) -> Result<Self> {
443        let caps = DATANODE_STAT_KEY_PATTERN
444            .captures(key)
445            .context(error::InvalidStatKeySnafu { key })?;
446
447        ensure!(caps.len() == 3, error::InvalidStatKeySnafu { key });
448        let node_id = caps[2].to_string();
449        let node_id: u64 = node_id.parse().context(error::ParseNumSnafu {
450            err_msg: format!("invalid node_id: {node_id}"),
451        })?;
452
453        Ok(Self { node_id })
454    }
455}
456
457impl TryFrom<Vec<u8>> for DatanodeStatKey {
458    type Error = error::Error;
459
460    fn try_from(bytes: Vec<u8>) -> Result<Self> {
461        String::from_utf8(bytes)
462            .context(error::FromUtf8Snafu {
463                name: "DatanodeStatKey",
464            })
465            .map(|x| x.parse())?
466    }
467}
468
469/// The value of the datanode stat in the memory store.
470#[derive(Debug, Clone, Serialize, Deserialize)]
471#[serde(transparent)]
472pub struct DatanodeStatValue {
473    pub stats: Vec<Stat>,
474}
475
476impl DatanodeStatValue {
477    /// Get the latest number of regions.
478    pub fn region_num(&self) -> Option<u64> {
479        self.stats.last().map(|x| x.region_num)
480    }
481
482    /// Get the latest node addr.
483    pub fn node_addr(&self) -> Option<String> {
484        self.stats.last().map(|x| x.addr.clone())
485    }
486}
487
488impl TryFrom<DatanodeStatValue> for Vec<u8> {
489    type Error = error::Error;
490
491    fn try_from(stats: DatanodeStatValue) -> Result<Self> {
492        Ok(serde_json::to_string(&stats)
493            .context(error::SerializeToJsonSnafu {
494                input: format!("{stats:?}"),
495            })?
496            .into_bytes())
497    }
498}
499
500impl FromStr for DatanodeStatValue {
501    type Err = error::Error;
502
503    fn from_str(value: &str) -> Result<Self> {
504        serde_json::from_str(value).context(error::DeserializeFromJsonSnafu { input: value })
505    }
506}
507
508impl TryFrom<Vec<u8>> for DatanodeStatValue {
509    type Error = error::Error;
510
511    fn try_from(value: Vec<u8>) -> Result<Self> {
512        String::from_utf8(value)
513            .context(error::FromUtf8Snafu {
514                name: "DatanodeStatValue",
515            })
516            .map(|x| x.parse())?
517    }
518}
519
520#[cfg(test)]
521mod tests {
522    use super::*;
523
524    #[test]
525    fn test_stat_key() {
526        let stat = Stat {
527            id: 101,
528            region_num: 10,
529            ..Default::default()
530        };
531
532        let stat_key = stat.stat_key();
533
534        assert_eq!(101, stat_key.node_id);
535    }
536
537    #[test]
538    fn test_stat_val_round_trip() {
539        let stat = Stat {
540            id: 101,
541            region_num: 100,
542            ..Default::default()
543        };
544
545        let stat_val = DatanodeStatValue { stats: vec![stat] };
546
547        let bytes: Vec<u8> = stat_val.try_into().unwrap();
548        let stat_val: DatanodeStatValue = bytes.try_into().unwrap();
549        let stats = stat_val.stats;
550
551        assert_eq!(1, stats.len());
552
553        let stat = stats.first().unwrap();
554        assert_eq!(101, stat.id);
555        assert_eq!(100, stat.region_num);
556    }
557
558    #[test]
559    fn test_get_addr_from_stat_val() {
560        let empty = DatanodeStatValue { stats: vec![] };
561        let addr = empty.node_addr();
562        assert!(addr.is_none());
563
564        let stat_val = DatanodeStatValue {
565            stats: vec![
566                Stat {
567                    addr: "1".to_string(),
568                    ..Default::default()
569                },
570                Stat {
571                    addr: "2".to_string(),
572                    ..Default::default()
573                },
574                Stat {
575                    addr: "3".to_string(),
576                    ..Default::default()
577                },
578            ],
579        };
580        let addr = stat_val.node_addr().unwrap();
581        assert_eq!("3", addr);
582    }
583
584    #[test]
585    fn test_get_region_num_from_stat_val() {
586        let empty = DatanodeStatValue { stats: vec![] };
587        let region_num = empty.region_num();
588        assert!(region_num.is_none());
589
590        let wrong = DatanodeStatValue {
591            stats: vec![Stat {
592                region_num: 0,
593                ..Default::default()
594            }],
595        };
596        let right = wrong.region_num();
597        assert_eq!(Some(0), right);
598
599        let stat_val = DatanodeStatValue {
600            stats: vec![
601                Stat {
602                    region_num: 1,
603                    ..Default::default()
604                },
605                Stat {
606                    region_num: 0,
607                    ..Default::default()
608                },
609                Stat {
610                    region_num: 2,
611                    ..Default::default()
612                },
613            ],
614        };
615        let region_num = stat_val.region_num().unwrap();
616        assert_eq!(2, region_num);
617    }
618
619    #[test]
620    fn test_region_stat_from_heartbeat_preserves_staging_leader_role() {
621        let request = HeartbeatRequest {
622            header: Some(RequestHeader::default()),
623            peer: Some(api::v1::meta::Peer {
624                id: 1,
625                addr: "127.0.0.1:3001".to_string(),
626            }),
627            region_stats: vec![api::v1::meta::RegionStat {
628                region_id: RegionId::new(1024, 1).as_u64(),
629                engine: "mito".to_string(),
630                role: api::v1::meta::RegionRole::StagingLeader.into(),
631                ..Default::default()
632            }],
633            ..Default::default()
634        };
635
636        let stat = Stat::try_from(&request).unwrap();
637
638        assert_eq!(stat.region_stats.len(), 1);
639        assert_eq!(stat.region_stats[0].role, RegionRole::StagingLeader);
640    }
641
642    #[test]
643    fn test_env_vars_round_trip() {
644        let mut vars = HashMap::new();
645        vars.insert("AZ".to_string(), "us-east-1a".to_string());
646        vars.insert("REGION".to_string(), "us-east-1".to_string());
647        let env_vars = EnvVars::new(vars);
648
649        let mut extensions = HashMap::new();
650        env_vars.into_extensions(&mut extensions);
651
652        let extracted = EnvVars::from_extensions(&extensions).unwrap().unwrap();
653        assert_eq!(extracted.vars.get("AZ").unwrap(), "us-east-1a");
654        assert_eq!(extracted.vars.get("REGION").unwrap(), "us-east-1");
655    }
656
657    #[test]
658    fn test_env_vars_empty_not_written() {
659        let env_vars = EnvVars::default();
660        let mut extensions = HashMap::new();
661        env_vars.into_extensions(&mut extensions);
662        assert!(extensions.is_empty());
663    }
664
665    #[test]
666    fn test_env_vars_from_extensions_missing() {
667        let extensions = HashMap::new();
668        let result = EnvVars::from_extensions(&extensions).unwrap();
669        assert!(result.is_none());
670    }
671}