common_meta/
datanode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::str::FromStr;
17
18use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, RequestHeader};
19use common_time::util as time_util;
20use lazy_static::lazy_static;
21use regex::Regex;
22use serde::{Deserialize, Serialize};
23use snafu::{OptionExt, ResultExt, ensure};
24use store_api::region_engine::{RegionRole, RegionStatistic};
25use store_api::storage::RegionId;
26use table::metadata::TableId;
27
28use crate::error;
29use crate::error::Result;
30use crate::heartbeat::utils::get_datanode_workloads;
31
32const DATANODE_STAT_PREFIX: &str = "__meta_datanode_stat";
33
34pub const REGION_STATISTIC_KEY: &str = "__region_statistic";
35
36lazy_static! {
37    pub(crate) static ref DATANODE_LEASE_KEY_PATTERN: Regex =
38        Regex::new("^__meta_datanode_lease-([0-9]+)-([0-9]+)$").unwrap();
39    static ref DATANODE_STAT_KEY_PATTERN: Regex =
40        Regex::new(&format!("^{DATANODE_STAT_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
41    static ref INACTIVE_REGION_KEY_PATTERN: Regex =
42        Regex::new("^__meta_inactive_region-([0-9]+)-([0-9]+)-([0-9]+)$").unwrap();
43}
44
45/// The key of the datanode stat in the storage.
46///
47/// The format is `__meta_datanode_stat-0-{node_id}`.
48#[derive(Debug, Clone, Default, Serialize, Deserialize)]
49pub struct Stat {
50    pub timestamp_millis: i64,
51    // The datanode Id.
52    pub id: u64,
53    // The datanode address.
54    pub addr: String,
55    /// The read capacity units during this period
56    pub rcus: i64,
57    /// The write capacity units during this period
58    pub wcus: i64,
59    /// How many regions on this node
60    pub region_num: u64,
61    /// The region stats of the datanode.
62    pub region_stats: Vec<RegionStat>,
63    /// The topic stats of the datanode.
64    pub topic_stats: Vec<TopicStat>,
65    // The node epoch is used to check whether the node has restarted or redeployed.
66    pub node_epoch: u64,
67    /// The datanode workloads.
68    pub datanode_workloads: DatanodeWorkloads,
69}
70
71/// The statistics of a region.
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct RegionStat {
74    /// The region_id.
75    pub id: RegionId,
76    /// The read capacity units during this period
77    pub rcus: i64,
78    /// The write capacity units during this period
79    pub wcus: i64,
80    /// Approximate disk bytes of this region, including sst, index, manifest and wal
81    pub approximate_bytes: u64,
82    /// The engine name.
83    pub engine: String,
84    /// The region role.
85    pub role: RegionRole,
86    /// The number of rows
87    pub num_rows: u64,
88    /// The size of the memtable in bytes.
89    pub memtable_size: u64,
90    /// The size of the manifest in bytes.
91    pub manifest_size: u64,
92    /// The size of the SST data files in bytes.
93    pub sst_size: u64,
94    /// The num of the SST data files.
95    pub sst_num: u64,
96    /// The size of the SST index files in bytes.
97    pub index_size: u64,
98    /// The manifest infoof the region.
99    pub region_manifest: RegionManifestInfo,
100    /// The total bytes written of the region since region opened.
101    pub written_bytes: u64,
102    /// The latest entry id of topic used by data.
103    /// **Only used by remote WAL prune.**
104    pub data_topic_latest_entry_id: u64,
105    /// The latest entry id of topic used by metadata.
106    /// **Only used by remote WAL prune.**
107    /// In mito engine, this is the same as `data_topic_latest_entry_id`.
108    pub metadata_topic_latest_entry_id: u64,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct TopicStat {
113    /// The topic name.
114    pub topic: String,
115    /// The latest entry id of the topic.
116    pub latest_entry_id: u64,
117    /// The total size in bytes of records appended to the topic.
118    pub record_size: u64,
119    /// The total number of records appended to the topic.
120    pub record_num: u64,
121}
122
123/// Trait for reporting statistics about topics.
124pub trait TopicStatsReporter: Send + Sync {
125    /// Returns a list of topic statistics that can be reported.
126    fn reportable_topics(&mut self) -> Vec<TopicStat>;
127}
128
129#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
130pub enum RegionManifestInfo {
131    Mito {
132        manifest_version: u64,
133        flushed_entry_id: u64,
134    },
135    Metric {
136        data_manifest_version: u64,
137        data_flushed_entry_id: u64,
138        metadata_manifest_version: u64,
139        metadata_flushed_entry_id: u64,
140    },
141}
142
143impl Stat {
144    #[inline]
145    pub fn is_empty(&self) -> bool {
146        self.region_stats.is_empty()
147    }
148
149    pub fn stat_key(&self) -> DatanodeStatKey {
150        DatanodeStatKey { node_id: self.id }
151    }
152
153    /// Returns a tuple array containing [RegionId] and [RegionRole].
154    pub fn regions(&self) -> Vec<(RegionId, RegionRole)> {
155        self.region_stats.iter().map(|s| (s.id, s.role)).collect()
156    }
157
158    /// Returns all table ids in the region stats.
159    pub fn table_ids(&self) -> HashSet<TableId> {
160        self.region_stats.iter().map(|s| s.id.table_id()).collect()
161    }
162
163    /// Retains the active region stats and updates the rcus, wcus, and region_num.
164    pub fn retain_active_region_stats(&mut self, inactive_region_ids: &HashSet<RegionId>) {
165        if inactive_region_ids.is_empty() {
166            return;
167        }
168
169        self.region_stats
170            .retain(|r| !inactive_region_ids.contains(&r.id));
171        self.rcus = self.region_stats.iter().map(|s| s.rcus).sum();
172        self.wcus = self.region_stats.iter().map(|s| s.wcus).sum();
173        self.region_num = self.region_stats.len() as u64;
174    }
175
176    pub fn memory_size(&self) -> usize {
177        // timestamp_millis, rcus, wcus
178        std::mem::size_of::<i64>() * 3 +
179        // id, region_num, node_epoch
180        std::mem::size_of::<u64>() * 3 +
181        // addr
182        std::mem::size_of::<String>() + self.addr.capacity() +
183        // region_stats
184        self.region_stats.iter().map(|s| s.memory_size()).sum::<usize>()
185    }
186}
187
188impl RegionStat {
189    pub fn memory_size(&self) -> usize {
190        // role
191        std::mem::size_of::<RegionRole>() +
192        // id
193        std::mem::size_of::<RegionId>() +
194        // rcus, wcus, approximate_bytes, num_rows
195        std::mem::size_of::<i64>() * 4 +
196        // memtable_size, manifest_size, sst_size, sst_num, index_size
197        std::mem::size_of::<u64>() * 5 +
198        // engine
199        std::mem::size_of::<String>() + self.engine.capacity() +
200        // region_manifest
201        self.region_manifest.memory_size()
202    }
203}
204
205impl RegionManifestInfo {
206    pub fn memory_size(&self) -> usize {
207        match self {
208            RegionManifestInfo::Mito { .. } => std::mem::size_of::<u64>() * 2,
209            RegionManifestInfo::Metric { .. } => std::mem::size_of::<u64>() * 4,
210        }
211    }
212}
213
214impl TryFrom<&HeartbeatRequest> for Stat {
215    type Error = Option<RequestHeader>;
216
217    fn try_from(value: &HeartbeatRequest) -> std::result::Result<Self, Self::Error> {
218        let HeartbeatRequest {
219            header,
220            peer,
221            region_stats,
222            node_epoch,
223            node_workloads,
224            topic_stats,
225            ..
226        } = value;
227
228        match (header, peer) {
229            (Some(_header), Some(peer)) => {
230                let region_stats = region_stats
231                    .iter()
232                    .map(RegionStat::from)
233                    .collect::<Vec<_>>();
234                let topic_stats = topic_stats.iter().map(TopicStat::from).collect::<Vec<_>>();
235
236                let datanode_workloads = get_datanode_workloads(node_workloads.as_ref());
237                Ok(Self {
238                    timestamp_millis: time_util::current_time_millis(),
239                    // datanode id
240                    id: peer.id,
241                    // datanode address
242                    addr: peer.addr.clone(),
243                    rcus: region_stats.iter().map(|s| s.rcus).sum(),
244                    wcus: region_stats.iter().map(|s| s.wcus).sum(),
245                    region_num: region_stats.len() as u64,
246                    region_stats,
247                    topic_stats,
248                    node_epoch: *node_epoch,
249                    datanode_workloads,
250                })
251            }
252            (header, _) => Err(header.clone()),
253        }
254    }
255}
256
257impl From<store_api::region_engine::RegionManifestInfo> for RegionManifestInfo {
258    fn from(value: store_api::region_engine::RegionManifestInfo) -> Self {
259        match value {
260            store_api::region_engine::RegionManifestInfo::Mito {
261                manifest_version,
262                flushed_entry_id,
263            } => RegionManifestInfo::Mito {
264                manifest_version,
265                flushed_entry_id,
266            },
267            store_api::region_engine::RegionManifestInfo::Metric {
268                data_manifest_version,
269                data_flushed_entry_id,
270                metadata_manifest_version,
271                metadata_flushed_entry_id,
272            } => RegionManifestInfo::Metric {
273                data_manifest_version,
274                data_flushed_entry_id,
275                metadata_manifest_version,
276                metadata_flushed_entry_id,
277            },
278        }
279    }
280}
281
282impl From<&api::v1::meta::RegionStat> for RegionStat {
283    fn from(value: &api::v1::meta::RegionStat) -> Self {
284        let region_stat = value
285            .extensions
286            .get(REGION_STATISTIC_KEY)
287            .and_then(|value| RegionStatistic::deserialize_from_slice(value))
288            .unwrap_or_default();
289
290        Self {
291            id: RegionId::from_u64(value.region_id),
292            rcus: value.rcus,
293            wcus: value.wcus,
294            approximate_bytes: value.approximate_bytes as u64,
295            engine: value.engine.clone(),
296            role: RegionRole::from(value.role()),
297            num_rows: region_stat.num_rows,
298            memtable_size: region_stat.memtable_size,
299            manifest_size: region_stat.manifest_size,
300            sst_size: region_stat.sst_size,
301            sst_num: region_stat.sst_num,
302            index_size: region_stat.index_size,
303            region_manifest: region_stat.manifest.into(),
304            written_bytes: region_stat.written_bytes,
305            data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
306            metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
307        }
308    }
309}
310
311impl From<&api::v1::meta::TopicStat> for TopicStat {
312    fn from(value: &api::v1::meta::TopicStat) -> Self {
313        Self {
314            topic: value.topic_name.clone(),
315            latest_entry_id: value.latest_entry_id,
316            record_size: value.record_size,
317            record_num: value.record_num,
318        }
319    }
320}
321
322/// The key of the datanode stat in the memory store.
323///
324/// The format is `__meta_datanode_stat-0-{node_id}`.
325#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
326pub struct DatanodeStatKey {
327    pub node_id: u64,
328}
329
330impl DatanodeStatKey {
331    /// The key prefix.
332    pub fn prefix_key() -> Vec<u8> {
333        // todo(hl): remove cluster id in prefix
334        format!("{DATANODE_STAT_PREFIX}-0-").into_bytes()
335    }
336}
337
338impl From<DatanodeStatKey> for Vec<u8> {
339    fn from(value: DatanodeStatKey) -> Self {
340        // todo(hl): remove cluster id in prefix
341        format!("{}-0-{}", DATANODE_STAT_PREFIX, value.node_id).into_bytes()
342    }
343}
344
345impl FromStr for DatanodeStatKey {
346    type Err = error::Error;
347
348    fn from_str(key: &str) -> Result<Self> {
349        let caps = DATANODE_STAT_KEY_PATTERN
350            .captures(key)
351            .context(error::InvalidStatKeySnafu { key })?;
352
353        ensure!(caps.len() == 3, error::InvalidStatKeySnafu { key });
354        let node_id = caps[2].to_string();
355        let node_id: u64 = node_id.parse().context(error::ParseNumSnafu {
356            err_msg: format!("invalid node_id: {node_id}"),
357        })?;
358
359        Ok(Self { node_id })
360    }
361}
362
363impl TryFrom<Vec<u8>> for DatanodeStatKey {
364    type Error = error::Error;
365
366    fn try_from(bytes: Vec<u8>) -> Result<Self> {
367        String::from_utf8(bytes)
368            .context(error::FromUtf8Snafu {
369                name: "DatanodeStatKey",
370            })
371            .map(|x| x.parse())?
372    }
373}
374
375/// The value of the datanode stat in the memory store.
376#[derive(Debug, Clone, Serialize, Deserialize)]
377#[serde(transparent)]
378pub struct DatanodeStatValue {
379    pub stats: Vec<Stat>,
380}
381
382impl DatanodeStatValue {
383    /// Get the latest number of regions.
384    pub fn region_num(&self) -> Option<u64> {
385        self.stats.last().map(|x| x.region_num)
386    }
387
388    /// Get the latest node addr.
389    pub fn node_addr(&self) -> Option<String> {
390        self.stats.last().map(|x| x.addr.clone())
391    }
392}
393
394impl TryFrom<DatanodeStatValue> for Vec<u8> {
395    type Error = error::Error;
396
397    fn try_from(stats: DatanodeStatValue) -> Result<Self> {
398        Ok(serde_json::to_string(&stats)
399            .context(error::SerializeToJsonSnafu {
400                input: format!("{stats:?}"),
401            })?
402            .into_bytes())
403    }
404}
405
406impl FromStr for DatanodeStatValue {
407    type Err = error::Error;
408
409    fn from_str(value: &str) -> Result<Self> {
410        serde_json::from_str(value).context(error::DeserializeFromJsonSnafu { input: value })
411    }
412}
413
414impl TryFrom<Vec<u8>> for DatanodeStatValue {
415    type Error = error::Error;
416
417    fn try_from(value: Vec<u8>) -> Result<Self> {
418        String::from_utf8(value)
419            .context(error::FromUtf8Snafu {
420                name: "DatanodeStatValue",
421            })
422            .map(|x| x.parse())?
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use super::*;
429
430    #[test]
431    fn test_stat_key() {
432        let stat = Stat {
433            id: 101,
434            region_num: 10,
435            ..Default::default()
436        };
437
438        let stat_key = stat.stat_key();
439
440        assert_eq!(101, stat_key.node_id);
441    }
442
443    #[test]
444    fn test_stat_val_round_trip() {
445        let stat = Stat {
446            id: 101,
447            region_num: 100,
448            ..Default::default()
449        };
450
451        let stat_val = DatanodeStatValue { stats: vec![stat] };
452
453        let bytes: Vec<u8> = stat_val.try_into().unwrap();
454        let stat_val: DatanodeStatValue = bytes.try_into().unwrap();
455        let stats = stat_val.stats;
456
457        assert_eq!(1, stats.len());
458
459        let stat = stats.first().unwrap();
460        assert_eq!(101, stat.id);
461        assert_eq!(100, stat.region_num);
462    }
463
464    #[test]
465    fn test_get_addr_from_stat_val() {
466        let empty = DatanodeStatValue { stats: vec![] };
467        let addr = empty.node_addr();
468        assert!(addr.is_none());
469
470        let stat_val = DatanodeStatValue {
471            stats: vec![
472                Stat {
473                    addr: "1".to_string(),
474                    ..Default::default()
475                },
476                Stat {
477                    addr: "2".to_string(),
478                    ..Default::default()
479                },
480                Stat {
481                    addr: "3".to_string(),
482                    ..Default::default()
483                },
484            ],
485        };
486        let addr = stat_val.node_addr().unwrap();
487        assert_eq!("3", addr);
488    }
489
490    #[test]
491    fn test_get_region_num_from_stat_val() {
492        let empty = DatanodeStatValue { stats: vec![] };
493        let region_num = empty.region_num();
494        assert!(region_num.is_none());
495
496        let wrong = DatanodeStatValue {
497            stats: vec![Stat {
498                region_num: 0,
499                ..Default::default()
500            }],
501        };
502        let right = wrong.region_num();
503        assert_eq!(Some(0), right);
504
505        let stat_val = DatanodeStatValue {
506            stats: vec![
507                Stat {
508                    region_num: 1,
509                    ..Default::default()
510                },
511                Stat {
512                    region_num: 0,
513                    ..Default::default()
514                },
515                Stat {
516                    region_num: 2,
517                    ..Default::default()
518                },
519            ],
520        };
521        let region_num = stat_val.region_num().unwrap();
522        assert_eq!(2, region_num);
523    }
524}