common_meta/
datanode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::str::FromStr;
17
18use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, RequestHeader};
19use common_time::util as time_util;
20use lazy_static::lazy_static;
21use regex::Regex;
22use serde::{Deserialize, Serialize};
23use snafu::{OptionExt, ResultExt, ensure};
24use store_api::region_engine::{RegionRole, RegionStatistic};
25use store_api::storage::RegionId;
26use table::metadata::TableId;
27
28use crate::error::{self, DeserializeFromJsonSnafu, Result};
29use crate::heartbeat::utils::get_datanode_workloads;
30
31const DATANODE_STAT_PREFIX: &str = "__meta_datanode_stat";
32
33pub const REGION_STATISTIC_KEY: &str = "__region_statistic";
34
35lazy_static! {
36    pub(crate) static ref DATANODE_LEASE_KEY_PATTERN: Regex =
37        Regex::new("^__meta_datanode_lease-([0-9]+)-([0-9]+)$").unwrap();
38    static ref DATANODE_STAT_KEY_PATTERN: Regex =
39        Regex::new(&format!("^{DATANODE_STAT_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
40    static ref INACTIVE_REGION_KEY_PATTERN: Regex =
41        Regex::new("^__meta_inactive_region-([0-9]+)-([0-9]+)-([0-9]+)$").unwrap();
42}
43
44/// The key of the datanode stat in the storage.
45///
46/// The format is `__meta_datanode_stat-0-{node_id}`.
47#[derive(Debug, Clone, Default, Serialize, Deserialize)]
48pub struct Stat {
49    pub timestamp_millis: i64,
50    // The datanode Id.
51    pub id: u64,
52    // The datanode address.
53    pub addr: String,
54    /// The read capacity units during this period
55    pub rcus: i64,
56    /// The write capacity units during this period
57    pub wcus: i64,
58    /// How many regions on this node
59    pub region_num: u64,
60    /// The region stats of the datanode.
61    pub region_stats: Vec<RegionStat>,
62    /// The topic stats of the datanode.
63    pub topic_stats: Vec<TopicStat>,
64    // The node epoch is used to check whether the node has restarted or redeployed.
65    pub node_epoch: u64,
66    /// The datanode workloads.
67    pub datanode_workloads: DatanodeWorkloads,
68    /// The GC statistics of the datanode.
69    pub gc_stat: Option<GcStat>,
70}
71
72/// The statistics of a region.
73#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
74pub struct RegionStat {
75    /// The region_id.
76    pub id: RegionId,
77    /// The read capacity units during this period
78    pub rcus: i64,
79    /// The write capacity units during this period
80    pub wcus: i64,
81    /// Approximate disk bytes of this region, including sst, index, manifest and wal
82    pub approximate_bytes: u64,
83    /// The engine name.
84    pub engine: String,
85    /// The region role.
86    pub role: RegionRole,
87    /// The number of rows
88    pub num_rows: u64,
89    /// The size of the memtable in bytes.
90    pub memtable_size: u64,
91    /// The size of the manifest in bytes.
92    pub manifest_size: u64,
93    /// The size of the SST data files in bytes.
94    pub sst_size: u64,
95    /// The num of the SST data files.
96    pub sst_num: u64,
97    /// The size of the SST index files in bytes.
98    pub index_size: u64,
99    /// The manifest infoof the region.
100    pub region_manifest: RegionManifestInfo,
101    /// The total bytes written of the region since region opened.
102    pub written_bytes: u64,
103    /// The latest entry id of topic used by data.
104    /// **Only used by remote WAL prune.**
105    pub data_topic_latest_entry_id: u64,
106    /// The latest entry id of topic used by metadata.
107    /// **Only used by remote WAL prune.**
108    /// In mito engine, this is the same as `data_topic_latest_entry_id`.
109    pub metadata_topic_latest_entry_id: u64,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct TopicStat {
114    /// The topic name.
115    pub topic: String,
116    /// The latest entry id of the topic.
117    pub latest_entry_id: u64,
118    /// The total size in bytes of records appended to the topic.
119    pub record_size: u64,
120    /// The total number of records appended to the topic.
121    pub record_num: u64,
122}
123
124/// Trait for reporting statistics about topics.
125pub trait TopicStatsReporter: Send + Sync {
126    /// Returns a list of topic statistics that can be reported.
127    fn reportable_topics(&mut self) -> Vec<TopicStat>;
128}
129
130#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
131pub enum RegionManifestInfo {
132    Mito {
133        manifest_version: u64,
134        flushed_entry_id: u64,
135    },
136    Metric {
137        data_manifest_version: u64,
138        data_flushed_entry_id: u64,
139        metadata_manifest_version: u64,
140        metadata_flushed_entry_id: u64,
141    },
142}
143
144impl Stat {
145    #[inline]
146    pub fn is_empty(&self) -> bool {
147        self.region_stats.is_empty()
148    }
149
150    pub fn stat_key(&self) -> DatanodeStatKey {
151        DatanodeStatKey { node_id: self.id }
152    }
153
154    /// Returns a tuple array containing [RegionId] and [RegionRole].
155    pub fn regions(&self) -> Vec<(RegionId, RegionRole)> {
156        self.region_stats.iter().map(|s| (s.id, s.role)).collect()
157    }
158
159    /// Returns all table ids in the region stats.
160    pub fn table_ids(&self) -> HashSet<TableId> {
161        self.region_stats.iter().map(|s| s.id.table_id()).collect()
162    }
163
164    /// Retains the active region stats and updates the rcus, wcus, and region_num.
165    pub fn retain_active_region_stats(&mut self, inactive_region_ids: &HashSet<RegionId>) {
166        if inactive_region_ids.is_empty() {
167            return;
168        }
169
170        self.region_stats
171            .retain(|r| !inactive_region_ids.contains(&r.id));
172        self.rcus = self.region_stats.iter().map(|s| s.rcus).sum();
173        self.wcus = self.region_stats.iter().map(|s| s.wcus).sum();
174        self.region_num = self.region_stats.len() as u64;
175    }
176
177    pub fn memory_size(&self) -> usize {
178        // timestamp_millis, rcus, wcus
179        std::mem::size_of::<i64>() * 3 +
180        // id, region_num, node_epoch
181        std::mem::size_of::<u64>() * 3 +
182        // addr
183        std::mem::size_of::<String>() + self.addr.capacity() +
184        // region_stats
185        self.region_stats.iter().map(|s| s.memory_size()).sum::<usize>()
186    }
187}
188
189impl RegionStat {
190    pub fn memory_size(&self) -> usize {
191        // role
192        std::mem::size_of::<RegionRole>() +
193        // id
194        std::mem::size_of::<RegionId>() +
195        // rcus, wcus, approximate_bytes, num_rows
196        std::mem::size_of::<i64>() * 4 +
197        // memtable_size, manifest_size, sst_size, sst_num, index_size
198        std::mem::size_of::<u64>() * 5 +
199        // engine
200        std::mem::size_of::<String>() + self.engine.capacity() +
201        // region_manifest
202        self.region_manifest.memory_size()
203    }
204}
205
206impl RegionManifestInfo {
207    pub fn memory_size(&self) -> usize {
208        match self {
209            RegionManifestInfo::Mito { .. } => std::mem::size_of::<u64>() * 2,
210            RegionManifestInfo::Metric { .. } => std::mem::size_of::<u64>() * 4,
211        }
212    }
213}
214
215impl TryFrom<&HeartbeatRequest> for Stat {
216    type Error = Option<RequestHeader>;
217
218    fn try_from(value: &HeartbeatRequest) -> std::result::Result<Self, Self::Error> {
219        let HeartbeatRequest {
220            header,
221            peer,
222            region_stats,
223            node_epoch,
224            node_workloads,
225            topic_stats,
226            extensions,
227            ..
228        } = value;
229
230        match (header, peer) {
231            (Some(header), Some(peer)) => {
232                let region_stats = region_stats
233                    .iter()
234                    .map(RegionStat::from)
235                    .collect::<Vec<_>>();
236                let topic_stats = topic_stats.iter().map(TopicStat::from).collect::<Vec<_>>();
237
238                let datanode_workloads = get_datanode_workloads(node_workloads.as_ref());
239
240                let gc_stat = GcStat::from_extensions(extensions).map_err(|err| {
241                    common_telemetry::error!(
242                        "Failed to deserialize GcStat from extensions: {}",
243                        err
244                    );
245                    header.clone()
246                })?;
247                Ok(Self {
248                    timestamp_millis: time_util::current_time_millis(),
249                    // datanode id
250                    id: peer.id,
251                    // datanode address
252                    addr: peer.addr.clone(),
253                    rcus: region_stats.iter().map(|s| s.rcus).sum(),
254                    wcus: region_stats.iter().map(|s| s.wcus).sum(),
255                    region_num: region_stats.len() as u64,
256                    region_stats,
257                    topic_stats,
258                    node_epoch: *node_epoch,
259                    datanode_workloads,
260                    gc_stat,
261                })
262            }
263            (header, _) => Err(header.clone()),
264        }
265    }
266}
267
268impl From<store_api::region_engine::RegionManifestInfo> for RegionManifestInfo {
269    fn from(value: store_api::region_engine::RegionManifestInfo) -> Self {
270        match value {
271            store_api::region_engine::RegionManifestInfo::Mito {
272                manifest_version,
273                flushed_entry_id,
274            } => RegionManifestInfo::Mito {
275                manifest_version,
276                flushed_entry_id,
277            },
278            store_api::region_engine::RegionManifestInfo::Metric {
279                data_manifest_version,
280                data_flushed_entry_id,
281                metadata_manifest_version,
282                metadata_flushed_entry_id,
283            } => RegionManifestInfo::Metric {
284                data_manifest_version,
285                data_flushed_entry_id,
286                metadata_manifest_version,
287                metadata_flushed_entry_id,
288            },
289        }
290    }
291}
292
293impl From<&api::v1::meta::RegionStat> for RegionStat {
294    fn from(value: &api::v1::meta::RegionStat) -> Self {
295        let region_stat = value
296            .extensions
297            .get(REGION_STATISTIC_KEY)
298            .and_then(|value| RegionStatistic::deserialize_from_slice(value))
299            .unwrap_or_default();
300
301        Self {
302            id: RegionId::from_u64(value.region_id),
303            rcus: value.rcus,
304            wcus: value.wcus,
305            approximate_bytes: value.approximate_bytes as u64,
306            engine: value.engine.clone(),
307            role: RegionRole::from(value.role()),
308            num_rows: region_stat.num_rows,
309            memtable_size: region_stat.memtable_size,
310            manifest_size: region_stat.manifest_size,
311            sst_size: region_stat.sst_size,
312            sst_num: region_stat.sst_num,
313            index_size: region_stat.index_size,
314            region_manifest: region_stat.manifest.into(),
315            written_bytes: region_stat.written_bytes,
316            data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
317            metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
318        }
319    }
320}
321
322impl From<&api::v1::meta::TopicStat> for TopicStat {
323    fn from(value: &api::v1::meta::TopicStat) -> Self {
324        Self {
325            topic: value.topic_name.clone(),
326            latest_entry_id: value.latest_entry_id,
327            record_size: value.record_size,
328            record_num: value.record_num,
329        }
330    }
331}
332
333#[derive(Debug, Clone, Serialize, Deserialize, Default)]
334pub struct GcStat {
335    /// Number of GC tasks currently running on the datanode.
336    pub running_gc_tasks: u32,
337    /// The maximum number of concurrent GC tasks the datanode can handle.
338    pub gc_concurrency: u32,
339}
340
341impl GcStat {
342    pub const GC_STAT_KEY: &str = "__gc_stat";
343
344    pub fn new(running_gc_tasks: u32, gc_concurrency: u32) -> Self {
345        Self {
346            running_gc_tasks,
347            gc_concurrency,
348        }
349    }
350
351    pub fn into_extensions(&self, extensions: &mut std::collections::HashMap<String, Vec<u8>>) {
352        let bytes = serde_json::to_vec(self).unwrap_or_default();
353        extensions.insert(Self::GC_STAT_KEY.to_string(), bytes);
354    }
355
356    pub fn from_extensions(
357        extensions: &std::collections::HashMap<String, Vec<u8>>,
358    ) -> Result<Option<Self>> {
359        extensions
360            .get(Self::GC_STAT_KEY)
361            .map(|bytes| {
362                serde_json::from_slice(bytes).with_context(|_| DeserializeFromJsonSnafu {
363                    input: String::from_utf8_lossy(bytes).to_string(),
364                })
365            })
366            .transpose()
367    }
368}
369
370/// The key of the datanode stat in the memory store.
371///
372/// The format is `__meta_datanode_stat-0-{node_id}`.
373#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
374pub struct DatanodeStatKey {
375    pub node_id: u64,
376}
377
378impl DatanodeStatKey {
379    /// The key prefix.
380    pub fn prefix_key() -> Vec<u8> {
381        // todo(hl): remove cluster id in prefix
382        format!("{DATANODE_STAT_PREFIX}-0-").into_bytes()
383    }
384}
385
386impl From<DatanodeStatKey> for Vec<u8> {
387    fn from(value: DatanodeStatKey) -> Self {
388        // todo(hl): remove cluster id in prefix
389        format!("{}-0-{}", DATANODE_STAT_PREFIX, value.node_id).into_bytes()
390    }
391}
392
393impl FromStr for DatanodeStatKey {
394    type Err = error::Error;
395
396    fn from_str(key: &str) -> Result<Self> {
397        let caps = DATANODE_STAT_KEY_PATTERN
398            .captures(key)
399            .context(error::InvalidStatKeySnafu { key })?;
400
401        ensure!(caps.len() == 3, error::InvalidStatKeySnafu { key });
402        let node_id = caps[2].to_string();
403        let node_id: u64 = node_id.parse().context(error::ParseNumSnafu {
404            err_msg: format!("invalid node_id: {node_id}"),
405        })?;
406
407        Ok(Self { node_id })
408    }
409}
410
411impl TryFrom<Vec<u8>> for DatanodeStatKey {
412    type Error = error::Error;
413
414    fn try_from(bytes: Vec<u8>) -> Result<Self> {
415        String::from_utf8(bytes)
416            .context(error::FromUtf8Snafu {
417                name: "DatanodeStatKey",
418            })
419            .map(|x| x.parse())?
420    }
421}
422
423/// The value of the datanode stat in the memory store.
424#[derive(Debug, Clone, Serialize, Deserialize)]
425#[serde(transparent)]
426pub struct DatanodeStatValue {
427    pub stats: Vec<Stat>,
428}
429
430impl DatanodeStatValue {
431    /// Get the latest number of regions.
432    pub fn region_num(&self) -> Option<u64> {
433        self.stats.last().map(|x| x.region_num)
434    }
435
436    /// Get the latest node addr.
437    pub fn node_addr(&self) -> Option<String> {
438        self.stats.last().map(|x| x.addr.clone())
439    }
440}
441
442impl TryFrom<DatanodeStatValue> for Vec<u8> {
443    type Error = error::Error;
444
445    fn try_from(stats: DatanodeStatValue) -> Result<Self> {
446        Ok(serde_json::to_string(&stats)
447            .context(error::SerializeToJsonSnafu {
448                input: format!("{stats:?}"),
449            })?
450            .into_bytes())
451    }
452}
453
454impl FromStr for DatanodeStatValue {
455    type Err = error::Error;
456
457    fn from_str(value: &str) -> Result<Self> {
458        serde_json::from_str(value).context(error::DeserializeFromJsonSnafu { input: value })
459    }
460}
461
462impl TryFrom<Vec<u8>> for DatanodeStatValue {
463    type Error = error::Error;
464
465    fn try_from(value: Vec<u8>) -> Result<Self> {
466        String::from_utf8(value)
467            .context(error::FromUtf8Snafu {
468                name: "DatanodeStatValue",
469            })
470            .map(|x| x.parse())?
471    }
472}
473
474#[cfg(test)]
475mod tests {
476    use super::*;
477
478    #[test]
479    fn test_stat_key() {
480        let stat = Stat {
481            id: 101,
482            region_num: 10,
483            ..Default::default()
484        };
485
486        let stat_key = stat.stat_key();
487
488        assert_eq!(101, stat_key.node_id);
489    }
490
491    #[test]
492    fn test_stat_val_round_trip() {
493        let stat = Stat {
494            id: 101,
495            region_num: 100,
496            ..Default::default()
497        };
498
499        let stat_val = DatanodeStatValue { stats: vec![stat] };
500
501        let bytes: Vec<u8> = stat_val.try_into().unwrap();
502        let stat_val: DatanodeStatValue = bytes.try_into().unwrap();
503        let stats = stat_val.stats;
504
505        assert_eq!(1, stats.len());
506
507        let stat = stats.first().unwrap();
508        assert_eq!(101, stat.id);
509        assert_eq!(100, stat.region_num);
510    }
511
512    #[test]
513    fn test_get_addr_from_stat_val() {
514        let empty = DatanodeStatValue { stats: vec![] };
515        let addr = empty.node_addr();
516        assert!(addr.is_none());
517
518        let stat_val = DatanodeStatValue {
519            stats: vec![
520                Stat {
521                    addr: "1".to_string(),
522                    ..Default::default()
523                },
524                Stat {
525                    addr: "2".to_string(),
526                    ..Default::default()
527                },
528                Stat {
529                    addr: "3".to_string(),
530                    ..Default::default()
531                },
532            ],
533        };
534        let addr = stat_val.node_addr().unwrap();
535        assert_eq!("3", addr);
536    }
537
538    #[test]
539    fn test_get_region_num_from_stat_val() {
540        let empty = DatanodeStatValue { stats: vec![] };
541        let region_num = empty.region_num();
542        assert!(region_num.is_none());
543
544        let wrong = DatanodeStatValue {
545            stats: vec![Stat {
546                region_num: 0,
547                ..Default::default()
548            }],
549        };
550        let right = wrong.region_num();
551        assert_eq!(Some(0), right);
552
553        let stat_val = DatanodeStatValue {
554            stats: vec![
555                Stat {
556                    region_num: 1,
557                    ..Default::default()
558                },
559                Stat {
560                    region_num: 0,
561                    ..Default::default()
562                },
563                Stat {
564                    region_num: 2,
565                    ..Default::default()
566                },
567            ],
568        };
569        let region_num = stat_val.region_num().unwrap();
570        assert_eq!(2, region_num);
571    }
572}