common_meta/
datanode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::str::FromStr;
17
18use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, RequestHeader};
19use common_time::util as time_util;
20use lazy_static::lazy_static;
21use regex::Regex;
22use serde::{Deserialize, Serialize};
23use snafu::{ensure, OptionExt, ResultExt};
24use store_api::region_engine::{RegionRole, RegionStatistic};
25use store_api::storage::RegionId;
26use table::metadata::TableId;
27
28use crate::error;
29use crate::error::Result;
30use crate::heartbeat::utils::get_datanode_workloads;
31
32pub(crate) const DATANODE_LEASE_PREFIX: &str = "__meta_datanode_lease";
33const INACTIVE_REGION_PREFIX: &str = "__meta_inactive_region";
34
35const DATANODE_STAT_PREFIX: &str = "__meta_datanode_stat";
36
37pub const REGION_STATISTIC_KEY: &str = "__region_statistic";
38
39lazy_static! {
40    pub(crate) static ref DATANODE_LEASE_KEY_PATTERN: Regex =
41        Regex::new(&format!("^{DATANODE_LEASE_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
42    static ref DATANODE_STAT_KEY_PATTERN: Regex =
43        Regex::new(&format!("^{DATANODE_STAT_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
44    static ref INACTIVE_REGION_KEY_PATTERN: Regex = Regex::new(&format!(
45        "^{INACTIVE_REGION_PREFIX}-([0-9]+)-([0-9]+)-([0-9]+)$"
46    ))
47    .unwrap();
48}
49
50/// The key of the datanode stat in the storage.
51///
52/// The format is `__meta_datanode_stat-0-{node_id}`.
53#[derive(Debug, Clone, Default, Serialize, Deserialize)]
54pub struct Stat {
55    pub timestamp_millis: i64,
56    // The datanode Id.
57    pub id: u64,
58    // The datanode address.
59    pub addr: String,
60    /// The read capacity units during this period
61    pub rcus: i64,
62    /// The write capacity units during this period
63    pub wcus: i64,
64    /// How many regions on this node
65    pub region_num: u64,
66    pub region_stats: Vec<RegionStat>,
67    // The node epoch is used to check whether the node has restarted or redeployed.
68    pub node_epoch: u64,
69    /// The datanode workloads.
70    pub datanode_workloads: DatanodeWorkloads,
71}
72
73/// The statistics of a region.
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct RegionStat {
76    /// The region_id.
77    pub id: RegionId,
78    /// The read capacity units during this period
79    pub rcus: i64,
80    /// The write capacity units during this period
81    pub wcus: i64,
82    /// Approximate disk bytes of this region, including sst, index, manifest and wal
83    pub approximate_bytes: u64,
84    /// The engine name.
85    pub engine: String,
86    /// The region role.
87    pub role: RegionRole,
88    /// The number of rows
89    pub num_rows: u64,
90    /// The size of the memtable in bytes.
91    pub memtable_size: u64,
92    /// The size of the manifest in bytes.
93    pub manifest_size: u64,
94    /// The size of the SST data files in bytes.
95    pub sst_size: u64,
96    /// The size of the SST index files in bytes.
97    pub index_size: u64,
98    /// The manifest infoof the region.
99    pub region_manifest: RegionManifestInfo,
100    /// The latest entry id of topic used by data.
101    /// **Only used by remote WAL prune.**
102    pub data_topic_latest_entry_id: u64,
103    /// The latest entry id of topic used by metadata.
104    /// **Only used by remote WAL prune.**
105    /// In mito engine, this is the same as `data_topic_latest_entry_id`.
106    pub metadata_topic_latest_entry_id: u64,
107}
108
109#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
110pub enum RegionManifestInfo {
111    Mito {
112        manifest_version: u64,
113        flushed_entry_id: u64,
114    },
115    Metric {
116        data_manifest_version: u64,
117        data_flushed_entry_id: u64,
118        metadata_manifest_version: u64,
119        metadata_flushed_entry_id: u64,
120    },
121}
122
123impl Stat {
124    #[inline]
125    pub fn is_empty(&self) -> bool {
126        self.region_stats.is_empty()
127    }
128
129    pub fn stat_key(&self) -> DatanodeStatKey {
130        DatanodeStatKey { node_id: self.id }
131    }
132
133    /// Returns a tuple array containing [RegionId] and [RegionRole].
134    pub fn regions(&self) -> Vec<(RegionId, RegionRole)> {
135        self.region_stats.iter().map(|s| (s.id, s.role)).collect()
136    }
137
138    /// Returns all table ids in the region stats.
139    pub fn table_ids(&self) -> HashSet<TableId> {
140        self.region_stats.iter().map(|s| s.id.table_id()).collect()
141    }
142
143    /// Retains the active region stats and updates the rcus, wcus, and region_num.
144    pub fn retain_active_region_stats(&mut self, inactive_region_ids: &HashSet<RegionId>) {
145        if inactive_region_ids.is_empty() {
146            return;
147        }
148
149        self.region_stats
150            .retain(|r| !inactive_region_ids.contains(&r.id));
151        self.rcus = self.region_stats.iter().map(|s| s.rcus).sum();
152        self.wcus = self.region_stats.iter().map(|s| s.wcus).sum();
153        self.region_num = self.region_stats.len() as u64;
154    }
155
156    pub fn memory_size(&self) -> usize {
157        // timestamp_millis, rcus, wcus
158        std::mem::size_of::<i64>() * 3 +
159        // id, region_num, node_epoch
160        std::mem::size_of::<u64>() * 3 +
161        // addr
162        std::mem::size_of::<String>() + self.addr.capacity() +
163        // region_stats
164        self.region_stats.iter().map(|s| s.memory_size()).sum::<usize>()
165    }
166}
167
168impl RegionStat {
169    pub fn memory_size(&self) -> usize {
170        // role
171        std::mem::size_of::<RegionRole>() +
172        // id
173        std::mem::size_of::<RegionId>() +
174        // rcus, wcus, approximate_bytes, num_rows
175        std::mem::size_of::<i64>() * 4 +
176        // memtable_size, manifest_size, sst_size, index_size
177        std::mem::size_of::<u64>() * 4 +
178        // engine
179        std::mem::size_of::<String>() + self.engine.capacity() +
180        // region_manifest
181        self.region_manifest.memory_size()
182    }
183}
184
185impl RegionManifestInfo {
186    pub fn memory_size(&self) -> usize {
187        match self {
188            RegionManifestInfo::Mito { .. } => std::mem::size_of::<u64>() * 2,
189            RegionManifestInfo::Metric { .. } => std::mem::size_of::<u64>() * 4,
190        }
191    }
192}
193
194impl TryFrom<&HeartbeatRequest> for Stat {
195    type Error = Option<RequestHeader>;
196
197    fn try_from(value: &HeartbeatRequest) -> std::result::Result<Self, Self::Error> {
198        let HeartbeatRequest {
199            header,
200            peer,
201            region_stats,
202            node_epoch,
203            node_workloads,
204            ..
205        } = value;
206
207        match (header, peer) {
208            (Some(_header), Some(peer)) => {
209                let region_stats = region_stats
210                    .iter()
211                    .map(RegionStat::from)
212                    .collect::<Vec<_>>();
213
214                let datanode_workloads = get_datanode_workloads(node_workloads.as_ref());
215                Ok(Self {
216                    timestamp_millis: time_util::current_time_millis(),
217                    // datanode id
218                    id: peer.id,
219                    // datanode address
220                    addr: peer.addr.clone(),
221                    rcus: region_stats.iter().map(|s| s.rcus).sum(),
222                    wcus: region_stats.iter().map(|s| s.wcus).sum(),
223                    region_num: region_stats.len() as u64,
224                    region_stats,
225                    node_epoch: *node_epoch,
226                    datanode_workloads,
227                })
228            }
229            (header, _) => Err(header.clone()),
230        }
231    }
232}
233
234impl From<store_api::region_engine::RegionManifestInfo> for RegionManifestInfo {
235    fn from(value: store_api::region_engine::RegionManifestInfo) -> Self {
236        match value {
237            store_api::region_engine::RegionManifestInfo::Mito {
238                manifest_version,
239                flushed_entry_id,
240            } => RegionManifestInfo::Mito {
241                manifest_version,
242                flushed_entry_id,
243            },
244            store_api::region_engine::RegionManifestInfo::Metric {
245                data_manifest_version,
246                data_flushed_entry_id,
247                metadata_manifest_version,
248                metadata_flushed_entry_id,
249            } => RegionManifestInfo::Metric {
250                data_manifest_version,
251                data_flushed_entry_id,
252                metadata_manifest_version,
253                metadata_flushed_entry_id,
254            },
255        }
256    }
257}
258
259impl From<&api::v1::meta::RegionStat> for RegionStat {
260    fn from(value: &api::v1::meta::RegionStat) -> Self {
261        let region_stat = value
262            .extensions
263            .get(REGION_STATISTIC_KEY)
264            .and_then(|value| RegionStatistic::deserialize_from_slice(value))
265            .unwrap_or_default();
266
267        Self {
268            id: RegionId::from_u64(value.region_id),
269            rcus: value.rcus,
270            wcus: value.wcus,
271            approximate_bytes: value.approximate_bytes as u64,
272            engine: value.engine.to_string(),
273            role: RegionRole::from(value.role()),
274            num_rows: region_stat.num_rows,
275            memtable_size: region_stat.memtable_size,
276            manifest_size: region_stat.manifest_size,
277            sst_size: region_stat.sst_size,
278            index_size: region_stat.index_size,
279            region_manifest: region_stat.manifest.into(),
280            data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
281            metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
282        }
283    }
284}
285
286/// The key of the datanode stat in the memory store.
287///
288/// The format is `__meta_datanode_stat-0-{node_id}`.
289#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
290pub struct DatanodeStatKey {
291    pub node_id: u64,
292}
293
294impl DatanodeStatKey {
295    /// The key prefix.
296    pub fn prefix_key() -> Vec<u8> {
297        // todo(hl): remove cluster id in prefix
298        format!("{DATANODE_STAT_PREFIX}-0-").into_bytes()
299    }
300}
301
302impl From<DatanodeStatKey> for Vec<u8> {
303    fn from(value: DatanodeStatKey) -> Self {
304        // todo(hl): remove cluster id in prefix
305        format!("{}-0-{}", DATANODE_STAT_PREFIX, value.node_id).into_bytes()
306    }
307}
308
309impl FromStr for DatanodeStatKey {
310    type Err = error::Error;
311
312    fn from_str(key: &str) -> Result<Self> {
313        let caps = DATANODE_STAT_KEY_PATTERN
314            .captures(key)
315            .context(error::InvalidStatKeySnafu { key })?;
316
317        ensure!(caps.len() == 3, error::InvalidStatKeySnafu { key });
318        let node_id = caps[2].to_string();
319        let node_id: u64 = node_id.parse().context(error::ParseNumSnafu {
320            err_msg: format!("invalid node_id: {node_id}"),
321        })?;
322
323        Ok(Self { node_id })
324    }
325}
326
327impl TryFrom<Vec<u8>> for DatanodeStatKey {
328    type Error = error::Error;
329
330    fn try_from(bytes: Vec<u8>) -> Result<Self> {
331        String::from_utf8(bytes)
332            .context(error::FromUtf8Snafu {
333                name: "DatanodeStatKey",
334            })
335            .map(|x| x.parse())?
336    }
337}
338
339/// The value of the datanode stat in the memory store.
340#[derive(Debug, Clone, Serialize, Deserialize)]
341#[serde(transparent)]
342pub struct DatanodeStatValue {
343    pub stats: Vec<Stat>,
344}
345
346impl DatanodeStatValue {
347    /// Get the latest number of regions.
348    pub fn region_num(&self) -> Option<u64> {
349        self.stats.last().map(|x| x.region_num)
350    }
351
352    /// Get the latest node addr.
353    pub fn node_addr(&self) -> Option<String> {
354        self.stats.last().map(|x| x.addr.clone())
355    }
356}
357
358impl TryFrom<DatanodeStatValue> for Vec<u8> {
359    type Error = error::Error;
360
361    fn try_from(stats: DatanodeStatValue) -> Result<Self> {
362        Ok(serde_json::to_string(&stats)
363            .context(error::SerializeToJsonSnafu {
364                input: format!("{stats:?}"),
365            })?
366            .into_bytes())
367    }
368}
369
370impl FromStr for DatanodeStatValue {
371    type Err = error::Error;
372
373    fn from_str(value: &str) -> Result<Self> {
374        serde_json::from_str(value).context(error::DeserializeFromJsonSnafu { input: value })
375    }
376}
377
378impl TryFrom<Vec<u8>> for DatanodeStatValue {
379    type Error = error::Error;
380
381    fn try_from(value: Vec<u8>) -> Result<Self> {
382        String::from_utf8(value)
383            .context(error::FromUtf8Snafu {
384                name: "DatanodeStatValue",
385            })
386            .map(|x| x.parse())?
387    }
388}
389
390#[cfg(test)]
391mod tests {
392    use super::*;
393
394    #[test]
395    fn test_stat_key() {
396        let stat = Stat {
397            id: 101,
398            region_num: 10,
399            ..Default::default()
400        };
401
402        let stat_key = stat.stat_key();
403
404        assert_eq!(101, stat_key.node_id);
405    }
406
407    #[test]
408    fn test_stat_val_round_trip() {
409        let stat = Stat {
410            id: 101,
411            region_num: 100,
412            ..Default::default()
413        };
414
415        let stat_val = DatanodeStatValue { stats: vec![stat] };
416
417        let bytes: Vec<u8> = stat_val.try_into().unwrap();
418        let stat_val: DatanodeStatValue = bytes.try_into().unwrap();
419        let stats = stat_val.stats;
420
421        assert_eq!(1, stats.len());
422
423        let stat = stats.first().unwrap();
424        assert_eq!(101, stat.id);
425        assert_eq!(100, stat.region_num);
426    }
427
428    #[test]
429    fn test_get_addr_from_stat_val() {
430        let empty = DatanodeStatValue { stats: vec![] };
431        let addr = empty.node_addr();
432        assert!(addr.is_none());
433
434        let stat_val = DatanodeStatValue {
435            stats: vec![
436                Stat {
437                    addr: "1".to_string(),
438                    ..Default::default()
439                },
440                Stat {
441                    addr: "2".to_string(),
442                    ..Default::default()
443                },
444                Stat {
445                    addr: "3".to_string(),
446                    ..Default::default()
447                },
448            ],
449        };
450        let addr = stat_val.node_addr().unwrap();
451        assert_eq!("3", addr);
452    }
453
454    #[test]
455    fn test_get_region_num_from_stat_val() {
456        let empty = DatanodeStatValue { stats: vec![] };
457        let region_num = empty.region_num();
458        assert!(region_num.is_none());
459
460        let wrong = DatanodeStatValue {
461            stats: vec![Stat {
462                region_num: 0,
463                ..Default::default()
464            }],
465        };
466        let right = wrong.region_num();
467        assert_eq!(Some(0), right);
468
469        let stat_val = DatanodeStatValue {
470            stats: vec![
471                Stat {
472                    region_num: 1,
473                    ..Default::default()
474                },
475                Stat {
476                    region_num: 0,
477                    ..Default::default()
478                },
479                Stat {
480                    region_num: 2,
481                    ..Default::default()
482                },
483            ],
484        };
485        let region_num = stat_val.region_num().unwrap();
486        assert_eq!(2, region_num);
487    }
488}