common_meta/
datanode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::str::FromStr;
17
18use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, RequestHeader};
19use common_time::util as time_util;
20use lazy_static::lazy_static;
21use regex::Regex;
22use serde::{Deserialize, Serialize};
23use snafu::{ensure, OptionExt, ResultExt};
24use store_api::region_engine::{RegionRole, RegionStatistic};
25use store_api::storage::RegionId;
26use table::metadata::TableId;
27
28use crate::error;
29use crate::error::Result;
30use crate::heartbeat::utils::get_datanode_workloads;
31
32pub(crate) const DATANODE_LEASE_PREFIX: &str = "__meta_datanode_lease";
33const INACTIVE_REGION_PREFIX: &str = "__meta_inactive_region";
34
35const DATANODE_STAT_PREFIX: &str = "__meta_datanode_stat";
36
37pub const REGION_STATISTIC_KEY: &str = "__region_statistic";
38
39lazy_static! {
40    pub(crate) static ref DATANODE_LEASE_KEY_PATTERN: Regex =
41        Regex::new(&format!("^{DATANODE_LEASE_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
42    static ref DATANODE_STAT_KEY_PATTERN: Regex =
43        Regex::new(&format!("^{DATANODE_STAT_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
44    static ref INACTIVE_REGION_KEY_PATTERN: Regex = Regex::new(&format!(
45        "^{INACTIVE_REGION_PREFIX}-([0-9]+)-([0-9]+)-([0-9]+)$"
46    ))
47    .unwrap();
48}
49
50/// The key of the datanode stat in the storage.
51///
52/// The format is `__meta_datanode_stat-0-{node_id}`.
53#[derive(Debug, Clone, Default, Serialize, Deserialize)]
54pub struct Stat {
55    pub timestamp_millis: i64,
56    // The datanode Id.
57    pub id: u64,
58    // The datanode address.
59    pub addr: String,
60    /// The read capacity units during this period
61    pub rcus: i64,
62    /// The write capacity units during this period
63    pub wcus: i64,
64    /// How many regions on this node
65    pub region_num: u64,
66    /// The region stats of the datanode.
67    pub region_stats: Vec<RegionStat>,
68    /// The topic stats of the datanode.
69    pub topic_stats: Vec<TopicStat>,
70    // The node epoch is used to check whether the node has restarted or redeployed.
71    pub node_epoch: u64,
72    /// The datanode workloads.
73    pub datanode_workloads: DatanodeWorkloads,
74}
75
76/// The statistics of a region.
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct RegionStat {
79    /// The region_id.
80    pub id: RegionId,
81    /// The read capacity units during this period
82    pub rcus: i64,
83    /// The write capacity units during this period
84    pub wcus: i64,
85    /// Approximate disk bytes of this region, including sst, index, manifest and wal
86    pub approximate_bytes: u64,
87    /// The engine name.
88    pub engine: String,
89    /// The region role.
90    pub role: RegionRole,
91    /// The number of rows
92    pub num_rows: u64,
93    /// The size of the memtable in bytes.
94    pub memtable_size: u64,
95    /// The size of the manifest in bytes.
96    pub manifest_size: u64,
97    /// The size of the SST data files in bytes.
98    pub sst_size: u64,
99    /// The num of the SST data files.
100    pub sst_num: u64,
101    /// The size of the SST index files in bytes.
102    pub index_size: u64,
103    /// The manifest infoof the region.
104    pub region_manifest: RegionManifestInfo,
105    /// The write bytes.
106    pub write_bytes: u64,
107    /// The latest entry id of topic used by data.
108    /// **Only used by remote WAL prune.**
109    pub data_topic_latest_entry_id: u64,
110    /// The latest entry id of topic used by metadata.
111    /// **Only used by remote WAL prune.**
112    /// In mito engine, this is the same as `data_topic_latest_entry_id`.
113    pub metadata_topic_latest_entry_id: u64,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct TopicStat {
118    /// The topic name.
119    pub topic: String,
120    /// The latest entry id of the topic.
121    pub latest_entry_id: u64,
122    /// The total size in bytes of records appended to the topic.
123    pub record_size: u64,
124    /// The total number of records appended to the topic.
125    pub record_num: u64,
126}
127
128/// Trait for reporting statistics about topics.
129pub trait TopicStatsReporter: Send + Sync {
130    /// Returns a list of topic statistics that can be reported.
131    fn reportable_topics(&mut self) -> Vec<TopicStat>;
132}
133
134#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
135pub enum RegionManifestInfo {
136    Mito {
137        manifest_version: u64,
138        flushed_entry_id: u64,
139    },
140    Metric {
141        data_manifest_version: u64,
142        data_flushed_entry_id: u64,
143        metadata_manifest_version: u64,
144        metadata_flushed_entry_id: u64,
145    },
146}
147
148impl Stat {
149    #[inline]
150    pub fn is_empty(&self) -> bool {
151        self.region_stats.is_empty()
152    }
153
154    pub fn stat_key(&self) -> DatanodeStatKey {
155        DatanodeStatKey { node_id: self.id }
156    }
157
158    /// Returns a tuple array containing [RegionId] and [RegionRole].
159    pub fn regions(&self) -> Vec<(RegionId, RegionRole)> {
160        self.region_stats.iter().map(|s| (s.id, s.role)).collect()
161    }
162
163    /// Returns all table ids in the region stats.
164    pub fn table_ids(&self) -> HashSet<TableId> {
165        self.region_stats.iter().map(|s| s.id.table_id()).collect()
166    }
167
168    /// Retains the active region stats and updates the rcus, wcus, and region_num.
169    pub fn retain_active_region_stats(&mut self, inactive_region_ids: &HashSet<RegionId>) {
170        if inactive_region_ids.is_empty() {
171            return;
172        }
173
174        self.region_stats
175            .retain(|r| !inactive_region_ids.contains(&r.id));
176        self.rcus = self.region_stats.iter().map(|s| s.rcus).sum();
177        self.wcus = self.region_stats.iter().map(|s| s.wcus).sum();
178        self.region_num = self.region_stats.len() as u64;
179    }
180
181    pub fn memory_size(&self) -> usize {
182        // timestamp_millis, rcus, wcus
183        std::mem::size_of::<i64>() * 3 +
184        // id, region_num, node_epoch
185        std::mem::size_of::<u64>() * 3 +
186        // addr
187        std::mem::size_of::<String>() + self.addr.capacity() +
188        // region_stats
189        self.region_stats.iter().map(|s| s.memory_size()).sum::<usize>()
190    }
191}
192
193impl RegionStat {
194    pub fn memory_size(&self) -> usize {
195        // role
196        std::mem::size_of::<RegionRole>() +
197        // id
198        std::mem::size_of::<RegionId>() +
199        // rcus, wcus, approximate_bytes, num_rows
200        std::mem::size_of::<i64>() * 4 +
201        // memtable_size, manifest_size, sst_size, sst_num, index_size
202        std::mem::size_of::<u64>() * 5 +
203        // engine
204        std::mem::size_of::<String>() + self.engine.capacity() +
205        // region_manifest
206        self.region_manifest.memory_size()
207    }
208}
209
210impl RegionManifestInfo {
211    pub fn memory_size(&self) -> usize {
212        match self {
213            RegionManifestInfo::Mito { .. } => std::mem::size_of::<u64>() * 2,
214            RegionManifestInfo::Metric { .. } => std::mem::size_of::<u64>() * 4,
215        }
216    }
217}
218
219impl TryFrom<&HeartbeatRequest> for Stat {
220    type Error = Option<RequestHeader>;
221
222    fn try_from(value: &HeartbeatRequest) -> std::result::Result<Self, Self::Error> {
223        let HeartbeatRequest {
224            header,
225            peer,
226            region_stats,
227            node_epoch,
228            node_workloads,
229            topic_stats,
230            ..
231        } = value;
232
233        match (header, peer) {
234            (Some(_header), Some(peer)) => {
235                let region_stats = region_stats
236                    .iter()
237                    .map(RegionStat::from)
238                    .collect::<Vec<_>>();
239                let topic_stats = topic_stats.iter().map(TopicStat::from).collect::<Vec<_>>();
240
241                let datanode_workloads = get_datanode_workloads(node_workloads.as_ref());
242                Ok(Self {
243                    timestamp_millis: time_util::current_time_millis(),
244                    // datanode id
245                    id: peer.id,
246                    // datanode address
247                    addr: peer.addr.clone(),
248                    rcus: region_stats.iter().map(|s| s.rcus).sum(),
249                    wcus: region_stats.iter().map(|s| s.wcus).sum(),
250                    region_num: region_stats.len() as u64,
251                    region_stats,
252                    topic_stats,
253                    node_epoch: *node_epoch,
254                    datanode_workloads,
255                })
256            }
257            (header, _) => Err(header.clone()),
258        }
259    }
260}
261
262impl From<store_api::region_engine::RegionManifestInfo> for RegionManifestInfo {
263    fn from(value: store_api::region_engine::RegionManifestInfo) -> Self {
264        match value {
265            store_api::region_engine::RegionManifestInfo::Mito {
266                manifest_version,
267                flushed_entry_id,
268            } => RegionManifestInfo::Mito {
269                manifest_version,
270                flushed_entry_id,
271            },
272            store_api::region_engine::RegionManifestInfo::Metric {
273                data_manifest_version,
274                data_flushed_entry_id,
275                metadata_manifest_version,
276                metadata_flushed_entry_id,
277            } => RegionManifestInfo::Metric {
278                data_manifest_version,
279                data_flushed_entry_id,
280                metadata_manifest_version,
281                metadata_flushed_entry_id,
282            },
283        }
284    }
285}
286
287impl From<&api::v1::meta::RegionStat> for RegionStat {
288    fn from(value: &api::v1::meta::RegionStat) -> Self {
289        let region_stat = value
290            .extensions
291            .get(REGION_STATISTIC_KEY)
292            .and_then(|value| RegionStatistic::deserialize_from_slice(value))
293            .unwrap_or_default();
294
295        Self {
296            id: RegionId::from_u64(value.region_id),
297            rcus: value.rcus,
298            wcus: value.wcus,
299            approximate_bytes: value.approximate_bytes as u64,
300            engine: value.engine.to_string(),
301            role: RegionRole::from(value.role()),
302            num_rows: region_stat.num_rows,
303            memtable_size: region_stat.memtable_size,
304            manifest_size: region_stat.manifest_size,
305            sst_size: region_stat.sst_size,
306            sst_num: region_stat.sst_num,
307            index_size: region_stat.index_size,
308            region_manifest: region_stat.manifest.into(),
309            write_bytes: region_stat.write_bytes,
310            data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
311            metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
312        }
313    }
314}
315
316impl From<&api::v1::meta::TopicStat> for TopicStat {
317    fn from(value: &api::v1::meta::TopicStat) -> Self {
318        Self {
319            topic: value.topic_name.clone(),
320            latest_entry_id: value.latest_entry_id,
321            record_size: value.record_size,
322            record_num: value.record_num,
323        }
324    }
325}
326
327/// The key of the datanode stat in the memory store.
328///
329/// The format is `__meta_datanode_stat-0-{node_id}`.
330#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
331pub struct DatanodeStatKey {
332    pub node_id: u64,
333}
334
335impl DatanodeStatKey {
336    /// The key prefix.
337    pub fn prefix_key() -> Vec<u8> {
338        // todo(hl): remove cluster id in prefix
339        format!("{DATANODE_STAT_PREFIX}-0-").into_bytes()
340    }
341}
342
343impl From<DatanodeStatKey> for Vec<u8> {
344    fn from(value: DatanodeStatKey) -> Self {
345        // todo(hl): remove cluster id in prefix
346        format!("{}-0-{}", DATANODE_STAT_PREFIX, value.node_id).into_bytes()
347    }
348}
349
350impl FromStr for DatanodeStatKey {
351    type Err = error::Error;
352
353    fn from_str(key: &str) -> Result<Self> {
354        let caps = DATANODE_STAT_KEY_PATTERN
355            .captures(key)
356            .context(error::InvalidStatKeySnafu { key })?;
357
358        ensure!(caps.len() == 3, error::InvalidStatKeySnafu { key });
359        let node_id = caps[2].to_string();
360        let node_id: u64 = node_id.parse().context(error::ParseNumSnafu {
361            err_msg: format!("invalid node_id: {node_id}"),
362        })?;
363
364        Ok(Self { node_id })
365    }
366}
367
368impl TryFrom<Vec<u8>> for DatanodeStatKey {
369    type Error = error::Error;
370
371    fn try_from(bytes: Vec<u8>) -> Result<Self> {
372        String::from_utf8(bytes)
373            .context(error::FromUtf8Snafu {
374                name: "DatanodeStatKey",
375            })
376            .map(|x| x.parse())?
377    }
378}
379
380/// The value of the datanode stat in the memory store.
381#[derive(Debug, Clone, Serialize, Deserialize)]
382#[serde(transparent)]
383pub struct DatanodeStatValue {
384    pub stats: Vec<Stat>,
385}
386
387impl DatanodeStatValue {
388    /// Get the latest number of regions.
389    pub fn region_num(&self) -> Option<u64> {
390        self.stats.last().map(|x| x.region_num)
391    }
392
393    /// Get the latest node addr.
394    pub fn node_addr(&self) -> Option<String> {
395        self.stats.last().map(|x| x.addr.clone())
396    }
397}
398
399impl TryFrom<DatanodeStatValue> for Vec<u8> {
400    type Error = error::Error;
401
402    fn try_from(stats: DatanodeStatValue) -> Result<Self> {
403        Ok(serde_json::to_string(&stats)
404            .context(error::SerializeToJsonSnafu {
405                input: format!("{stats:?}"),
406            })?
407            .into_bytes())
408    }
409}
410
411impl FromStr for DatanodeStatValue {
412    type Err = error::Error;
413
414    fn from_str(value: &str) -> Result<Self> {
415        serde_json::from_str(value).context(error::DeserializeFromJsonSnafu { input: value })
416    }
417}
418
419impl TryFrom<Vec<u8>> for DatanodeStatValue {
420    type Error = error::Error;
421
422    fn try_from(value: Vec<u8>) -> Result<Self> {
423        String::from_utf8(value)
424            .context(error::FromUtf8Snafu {
425                name: "DatanodeStatValue",
426            })
427            .map(|x| x.parse())?
428    }
429}
430
431#[cfg(test)]
432mod tests {
433    use super::*;
434
435    #[test]
436    fn test_stat_key() {
437        let stat = Stat {
438            id: 101,
439            region_num: 10,
440            ..Default::default()
441        };
442
443        let stat_key = stat.stat_key();
444
445        assert_eq!(101, stat_key.node_id);
446    }
447
448    #[test]
449    fn test_stat_val_round_trip() {
450        let stat = Stat {
451            id: 101,
452            region_num: 100,
453            ..Default::default()
454        };
455
456        let stat_val = DatanodeStatValue { stats: vec![stat] };
457
458        let bytes: Vec<u8> = stat_val.try_into().unwrap();
459        let stat_val: DatanodeStatValue = bytes.try_into().unwrap();
460        let stats = stat_val.stats;
461
462        assert_eq!(1, stats.len());
463
464        let stat = stats.first().unwrap();
465        assert_eq!(101, stat.id);
466        assert_eq!(100, stat.region_num);
467    }
468
469    #[test]
470    fn test_get_addr_from_stat_val() {
471        let empty = DatanodeStatValue { stats: vec![] };
472        let addr = empty.node_addr();
473        assert!(addr.is_none());
474
475        let stat_val = DatanodeStatValue {
476            stats: vec![
477                Stat {
478                    addr: "1".to_string(),
479                    ..Default::default()
480                },
481                Stat {
482                    addr: "2".to_string(),
483                    ..Default::default()
484                },
485                Stat {
486                    addr: "3".to_string(),
487                    ..Default::default()
488                },
489            ],
490        };
491        let addr = stat_val.node_addr().unwrap();
492        assert_eq!("3", addr);
493    }
494
495    #[test]
496    fn test_get_region_num_from_stat_val() {
497        let empty = DatanodeStatValue { stats: vec![] };
498        let region_num = empty.region_num();
499        assert!(region_num.is_none());
500
501        let wrong = DatanodeStatValue {
502            stats: vec![Stat {
503                region_num: 0,
504                ..Default::default()
505            }],
506        };
507        let right = wrong.region_num();
508        assert_eq!(Some(0), right);
509
510        let stat_val = DatanodeStatValue {
511            stats: vec![
512                Stat {
513                    region_num: 1,
514                    ..Default::default()
515                },
516                Stat {
517                    region_num: 0,
518                    ..Default::default()
519                },
520                Stat {
521                    region_num: 2,
522                    ..Default::default()
523                },
524            ],
525        };
526        let region_num = stat_val.region_num().unwrap();
527        assert_eq!(2, region_num);
528    }
529}