Skip to main content

meta_srv/handler/
persist_stats_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::{Duration, Instant};
16
17use api::v1::meta::{HeartbeatRequest, Role};
18use api::v1::value::ValueData;
19use api::v1::{ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows, Value};
20use client::DEFAULT_CATALOG_NAME;
21use client::inserter::{Context as InserterContext, Inserter};
22use common_catalog::consts::DEFAULT_PRIVATE_SCHEMA_NAME;
23use common_macro::{Schema, ToRow};
24use common_meta::DatanodeId;
25use common_meta::datanode::RegionStat;
26use common_telemetry::warn;
27use dashmap::DashMap;
28use store_api::region_engine::RegionRole;
29use store_api::storage::RegionId;
30
31use crate::error::Result;
32use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
33use crate::metasrv::Context;
34
35/// The handler to persist stats.
36pub struct PersistStatsHandler {
37    inserter: Box<dyn Inserter>,
38    last_persisted_region_stats: DashMap<RegionId, PersistedRegionStat>,
39    last_persisted_time: DashMap<DatanodeId, Instant>,
40    persist_interval: Duration,
41}
42
43/// The name of the table to persist region stats.
44const META_REGION_STATS_HISTORY_TABLE_NAME: &str = "region_statistics_history";
45/// The default context to persist region stats.
46const DEFAULT_CONTEXT: InserterContext = InserterContext {
47    catalog: DEFAULT_CATALOG_NAME,
48    schema: DEFAULT_PRIVATE_SCHEMA_NAME,
49};
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52struct PersistedRegionStat {
53    region_id: RegionId,
54    written_bytes: u64,
55}
56
57impl From<&RegionStat> for PersistedRegionStat {
58    fn from(stat: &RegionStat) -> Self {
59        Self {
60            region_id: stat.id,
61            written_bytes: stat.written_bytes,
62        }
63    }
64}
65
66#[derive(ToRow, Schema)]
67struct PersistRegionStat<'a> {
68    table_id: u32,
69    region_id: u64,
70    region_number: u32,
71    manifest_size: u64,
72    datanode_id: u64,
73    #[col(datatype = "string")]
74    engine: &'a str,
75    num_rows: u64,
76    sst_num: u64,
77    sst_size: u64,
78    write_bytes_delta: u64,
79    #[col(
80        // This col name is for the information schema table, so we don't touch it
81        name = "greptime_timestamp",
82        semantic = "Timestamp",
83        datatype = "TimestampMillisecond"
84    )]
85    timestamp_millis: i64,
86}
87
88/// Compute the region stat to persist.
89fn compute_persist_region_stat(
90    region_stat: &RegionStat,
91    datanode_id: DatanodeId,
92    timestamp_millis: i64,
93    persisted_region_stat: Option<PersistedRegionStat>,
94) -> PersistRegionStat<'_> {
95    let write_bytes_delta = persisted_region_stat
96        .and_then(|persisted_region_stat| {
97            region_stat
98                .written_bytes
99                .checked_sub(persisted_region_stat.written_bytes)
100        })
101        .unwrap_or_default();
102
103    PersistRegionStat {
104        table_id: region_stat.id.table_id(),
105        region_id: region_stat.id.as_u64(),
106        region_number: region_stat.id.region_number(),
107        manifest_size: region_stat.manifest_size,
108        datanode_id,
109        engine: region_stat.engine.as_str(),
110        num_rows: region_stat.num_rows,
111        sst_num: region_stat.sst_num,
112        sst_size: region_stat.sst_size,
113        write_bytes_delta,
114        timestamp_millis,
115    }
116}
117
118fn to_persisted_if_leader(
119    region_stat: &RegionStat,
120    last_persisted_region_stats: &DashMap<RegionId, PersistedRegionStat>,
121    datanode_id: DatanodeId,
122    timestamp_millis: i64,
123) -> Option<(Row, PersistedRegionStat)> {
124    if matches!(
125        region_stat.role,
126        RegionRole::Leader | RegionRole::StagingLeader
127    ) {
128        let persisted_region_stat = last_persisted_region_stats.get(&region_stat.id).map(|s| *s);
129        Some((
130            compute_persist_region_stat(
131                region_stat,
132                datanode_id,
133                timestamp_millis,
134                persisted_region_stat,
135            )
136            .to_row(),
137            PersistedRegionStat::from(region_stat),
138        ))
139    } else {
140        None
141    }
142}
143
144/// Align the timestamp to the nearest interval.
145///
146/// # Panics
147/// Panics if `interval` as milliseconds is zero.
148fn align_ts(ts: i64, interval: Duration) -> i64 {
149    assert!(
150        interval.as_millis() != 0,
151        "interval must be greater than zero"
152    );
153    ts / interval.as_millis() as i64 * interval.as_millis() as i64
154}
155
156impl PersistStatsHandler {
157    /// Creates a new [`PersistStatsHandler`].
158    pub fn new(inserter: Box<dyn Inserter>, mut persist_interval: Duration) -> Self {
159        if persist_interval < Duration::from_mins(10) {
160            warn!("persist_interval is less than 10 minutes, set to 10 minutes");
161            persist_interval = Duration::from_mins(10);
162        }
163
164        Self {
165            inserter,
166            last_persisted_region_stats: DashMap::new(),
167            last_persisted_time: DashMap::new(),
168            persist_interval,
169        }
170    }
171
172    fn should_persist(&self, datanode_id: DatanodeId) -> bool {
173        let Some(last_persisted_time) = self.last_persisted_time.get(&datanode_id) else {
174            return true;
175        };
176
177        last_persisted_time.elapsed() >= self.persist_interval
178    }
179
180    async fn persist(
181        &self,
182        timestamp_millis: i64,
183        datanode_id: DatanodeId,
184        region_stats: &[RegionStat],
185    ) {
186        // Safety: persist_interval is greater than zero.
187        let aligned_ts = align_ts(timestamp_millis, self.persist_interval);
188        let (rows, incoming_region_stats): (Vec<_>, Vec<_>) = region_stats
189            .iter()
190            .flat_map(|region_stat| {
191                to_persisted_if_leader(
192                    region_stat,
193                    &self.last_persisted_region_stats,
194                    datanode_id,
195                    aligned_ts,
196                )
197            })
198            .unzip();
199
200        if rows.is_empty() {
201            return;
202        }
203
204        if let Err(err) = self
205            .inserter
206            .insert_rows(
207                &DEFAULT_CONTEXT,
208                RowInsertRequests {
209                    inserts: vec![RowInsertRequest {
210                        table_name: META_REGION_STATS_HISTORY_TABLE_NAME.to_string(),
211                        rows: Some(Rows {
212                            schema: PersistRegionStat::schema(),
213                            rows,
214                        }),
215                    }],
216                },
217            )
218            .await
219        {
220            warn!(
221                "Failed to persist region stats, datanode_id: {}, error: {:?}",
222                datanode_id, err
223            );
224            return;
225        }
226
227        self.last_persisted_time.insert(datanode_id, Instant::now());
228        for s in incoming_region_stats {
229            self.last_persisted_region_stats.insert(s.region_id, s);
230        }
231    }
232}
233
234#[async_trait::async_trait]
235impl HeartbeatHandler for PersistStatsHandler {
236    fn is_acceptable(&self, role: Role) -> bool {
237        role == Role::Datanode
238    }
239
240    async fn handle(
241        &self,
242        _req: &HeartbeatRequest,
243        _: &mut Context,
244        acc: &mut HeartbeatAccumulator,
245    ) -> Result<HandleControl> {
246        let Some(current_stat) = acc.stat.as_ref() else {
247            return Ok(HandleControl::Continue);
248        };
249
250        if !self.should_persist(current_stat.id) {
251            return Ok(HandleControl::Continue);
252        }
253
254        self.persist(
255            current_stat.timestamp_millis,
256            current_stat.id,
257            &current_stat.region_stats,
258        )
259        .await;
260
261        Ok(HandleControl::Continue)
262    }
263}
264
265#[cfg(test)]
266mod tests {
267    use std::sync::{Arc, Mutex};
268
269    use client::inserter::{Context as InserterContext, InsertOptions};
270    use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
271    use store_api::region_engine::RegionRole;
272    use store_api::storage::RegionId;
273
274    use super::*;
275    use crate::handler::test_utils::TestEnv;
276
277    fn create_test_region_stat(
278        table_id: u32,
279        region_number: u32,
280        written_bytes: u64,
281        engine: &str,
282    ) -> RegionStat {
283        let region_id = RegionId::new(table_id, region_number);
284        RegionStat {
285            id: region_id,
286            rcus: 100,
287            wcus: 200,
288            approximate_bytes: 1024,
289            engine: engine.to_string(),
290            role: RegionRole::Leader,
291            num_rows: 1000,
292            memtable_size: 512,
293            manifest_size: 256,
294            sst_size: 2048,
295            sst_num: 5,
296            index_size: 128,
297            region_manifest: RegionManifestInfo::Mito {
298                manifest_version: 1,
299                flushed_entry_id: 100,
300                file_removed_cnt: 0,
301            },
302            written_bytes,
303            data_topic_latest_entry_id: 200,
304            metadata_topic_latest_entry_id: 200,
305        }
306    }
307
308    #[test]
309    fn test_compute_persist_region_stat_with_no_persisted_stat() {
310        let region_stat = create_test_region_stat(1, 1, 1000, "mito");
311        let datanode_id = 123;
312        let timestamp_millis = 1640995200000; // 2022-01-01 00:00:00 UTC
313        let result = compute_persist_region_stat(&region_stat, datanode_id, timestamp_millis, None);
314        assert_eq!(result.table_id, 1);
315        assert_eq!(result.region_id, region_stat.id.as_u64());
316        assert_eq!(result.region_number, 1);
317        assert_eq!(result.manifest_size, 256);
318        assert_eq!(result.datanode_id, datanode_id);
319        assert_eq!(result.engine, "mito");
320        assert_eq!(result.num_rows, 1000);
321        assert_eq!(result.sst_num, 5);
322        assert_eq!(result.sst_size, 2048);
323        assert_eq!(result.write_bytes_delta, 0); // No previous stat, so delta is 0
324        assert_eq!(result.timestamp_millis, timestamp_millis);
325    }
326
327    #[test]
328    fn test_compute_persist_region_stat_with_persisted_stat_increase() {
329        let region_stat = create_test_region_stat(2, 3, 1500, "mito");
330        let datanode_id = 456;
331        let timestamp_millis = 1640995260000; // 2022-01-01 00:01:00 UTC
332        let persisted_stat = PersistedRegionStat {
333            region_id: region_stat.id,
334            written_bytes: 1000, // Previous write bytes
335        };
336        let result = compute_persist_region_stat(
337            &region_stat,
338            datanode_id,
339            timestamp_millis,
340            Some(persisted_stat),
341        );
342        assert_eq!(result.table_id, 2);
343        assert_eq!(result.region_id, region_stat.id.as_u64());
344        assert_eq!(result.region_number, 3);
345        assert_eq!(result.manifest_size, 256);
346        assert_eq!(result.datanode_id, datanode_id);
347        assert_eq!(result.engine, "mito");
348        assert_eq!(result.num_rows, 1000);
349        assert_eq!(result.sst_num, 5);
350        assert_eq!(result.sst_size, 2048);
351        assert_eq!(result.write_bytes_delta, 500); // 1500 - 1000 = 500
352        assert_eq!(result.timestamp_millis, timestamp_millis);
353    }
354
355    #[test]
356    fn test_compute_persist_region_stat_with_persisted_stat_decrease() {
357        let region_stat = create_test_region_stat(3, 5, 800, "mito");
358        let datanode_id = 789;
359        let timestamp_millis = 1640995320000; // 2022-01-01 00:02:00 UTC
360        let persisted_stat = PersistedRegionStat {
361            region_id: region_stat.id,
362            written_bytes: 1200, // Previous write bytes (higher than current)
363        };
364        let result = compute_persist_region_stat(
365            &region_stat,
366            datanode_id,
367            timestamp_millis,
368            Some(persisted_stat),
369        );
370        assert_eq!(result.table_id, 3);
371        assert_eq!(result.region_id, region_stat.id.as_u64());
372        assert_eq!(result.region_number, 5);
373        assert_eq!(result.manifest_size, 256);
374        assert_eq!(result.datanode_id, datanode_id);
375        assert_eq!(result.engine, "mito");
376        assert_eq!(result.num_rows, 1000);
377        assert_eq!(result.sst_num, 5);
378        assert_eq!(result.sst_size, 2048);
379        assert_eq!(result.write_bytes_delta, 0); // 800 - 1200 would be negative, so 0 due to checked_sub
380        assert_eq!(result.timestamp_millis, timestamp_millis);
381    }
382
383    #[test]
384    fn test_compute_persist_region_stat_with_persisted_stat_equal() {
385        let region_stat = create_test_region_stat(4, 7, 2000, "mito");
386        let datanode_id = 101;
387        let timestamp_millis = 1640995380000; // 2022-01-01 00:03:00 UTC
388        let persisted_stat = PersistedRegionStat {
389            region_id: region_stat.id,
390            written_bytes: 2000, // Same as current write bytes
391        };
392        let result = compute_persist_region_stat(
393            &region_stat,
394            datanode_id,
395            timestamp_millis,
396            Some(persisted_stat),
397        );
398        assert_eq!(result.table_id, 4);
399        assert_eq!(result.region_id, region_stat.id.as_u64());
400        assert_eq!(result.region_number, 7);
401        assert_eq!(result.manifest_size, 256);
402        assert_eq!(result.datanode_id, datanode_id);
403        assert_eq!(result.engine, "mito");
404        assert_eq!(result.num_rows, 1000);
405        assert_eq!(result.sst_num, 5);
406        assert_eq!(result.sst_size, 2048);
407        assert_eq!(result.write_bytes_delta, 0); // 2000 - 2000 = 0
408        assert_eq!(result.timestamp_millis, timestamp_millis);
409    }
410
411    #[test]
412    fn test_compute_persist_region_stat_with_overflow_protection() {
413        let region_stat = create_test_region_stat(8, 15, 500, "mito");
414        let datanode_id = 505;
415        let timestamp_millis = 1640995620000; // 2022-01-01 00:07:00 UTC
416        let persisted_stat = PersistedRegionStat {
417            region_id: region_stat.id,
418            written_bytes: 1000, // Higher than current, would cause underflow
419        };
420        let result = compute_persist_region_stat(
421            &region_stat,
422            datanode_id,
423            timestamp_millis,
424            Some(persisted_stat),
425        );
426        assert_eq!(result.table_id, 8);
427        assert_eq!(result.region_id, region_stat.id.as_u64());
428        assert_eq!(result.region_number, 15);
429        assert_eq!(result.manifest_size, 256);
430        assert_eq!(result.datanode_id, datanode_id);
431        assert_eq!(result.engine, "mito");
432        assert_eq!(result.num_rows, 1000);
433        assert_eq!(result.sst_num, 5);
434        assert_eq!(result.sst_size, 2048);
435        assert_eq!(result.write_bytes_delta, 0); // checked_sub returns None, so default to 0
436        assert_eq!(result.timestamp_millis, timestamp_millis);
437    }
438
439    struct MockInserter {
440        requests: Arc<Mutex<Vec<api::v1::RowInsertRequest>>>,
441    }
442
443    #[async_trait::async_trait]
444    impl Inserter for MockInserter {
445        async fn insert_rows(
446            &self,
447            _context: &InserterContext<'_>,
448            requests: api::v1::RowInsertRequests,
449        ) -> client::error::Result<()> {
450            self.requests.lock().unwrap().extend(requests.inserts);
451
452            Ok(())
453        }
454
455        fn set_options(&mut self, _options: &InsertOptions) {}
456    }
457
458    #[tokio::test]
459    async fn test_not_persist_region_stats() {
460        let env = TestEnv::new();
461        let mut ctx = env.ctx();
462
463        let requests = Arc::new(Mutex::new(vec![]));
464        let inserter = MockInserter {
465            requests: requests.clone(),
466        };
467        let handler = PersistStatsHandler::new(Box::new(inserter), Duration::from_secs(10));
468        let mut acc = HeartbeatAccumulator {
469            stat: Some(Stat {
470                id: 1,
471                timestamp_millis: 1640995200000,
472                region_stats: vec![create_test_region_stat(1, 1, 1000, "mito")],
473                ..Default::default()
474            }),
475            ..Default::default()
476        };
477        handler.last_persisted_time.insert(1, Instant::now());
478        // Do not persist
479        handler
480            .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
481            .await
482            .unwrap();
483        assert!(requests.lock().unwrap().is_empty());
484    }
485
486    #[tokio::test]
487    async fn test_persist_region_stats() {
488        let env = TestEnv::new();
489        let mut ctx = env.ctx();
490        let requests = Arc::new(Mutex::new(vec![]));
491        let inserter = MockInserter {
492            requests: requests.clone(),
493        };
494
495        let handler = PersistStatsHandler::new(Box::new(inserter), Duration::from_secs(10));
496
497        let region_stat = create_test_region_stat(1, 1, 1000, "mito");
498        let timestamp_millis = 1640995200000;
499        let datanode_id = 1;
500        let region_id = RegionId::new(1, 1);
501        let mut acc = HeartbeatAccumulator {
502            stat: Some(Stat {
503                id: datanode_id,
504                timestamp_millis,
505                region_stats: vec![region_stat.clone()],
506                ..Default::default()
507            }),
508            ..Default::default()
509        };
510
511        handler.last_persisted_region_stats.insert(
512            region_id,
513            PersistedRegionStat {
514                region_id,
515                written_bytes: 500,
516            },
517        );
518        let (expected_row, expected_persisted_region_stat) = to_persisted_if_leader(
519            &region_stat,
520            &handler.last_persisted_region_stats,
521            datanode_id,
522            timestamp_millis,
523        )
524        .unwrap();
525        let before_insert_time = Instant::now();
526        // Persist
527        handler
528            .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
529            .await
530            .unwrap();
531        let request = {
532            let mut requests = requests.lock().unwrap();
533            assert_eq!(requests.len(), 1);
534            requests.pop().unwrap()
535        };
536        assert_eq!(
537            request.table_name,
538            META_REGION_STATS_HISTORY_TABLE_NAME.to_string()
539        );
540        assert_eq!(request.rows.unwrap().rows, vec![expected_row]);
541
542        // Check last persisted time
543        assert!(
544            handler
545                .last_persisted_time
546                .get(&datanode_id)
547                .unwrap()
548                .gt(&before_insert_time)
549        );
550
551        // Check last persisted region stats
552        assert_eq!(
553            handler
554                .last_persisted_region_stats
555                .get(&region_id)
556                .unwrap()
557                .value(),
558            &expected_persisted_region_stat
559        );
560    }
561}