meta_srv/handler/
persist_stats_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::{Duration, Instant};
16
17use api::v1::meta::{HeartbeatRequest, Role};
18use api::v1::value::ValueData;
19use api::v1::{ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows, Value};
20use client::inserter::{Context as InserterContext, Inserter};
21use client::DEFAULT_CATALOG_NAME;
22use common_catalog::consts::DEFAULT_PRIVATE_SCHEMA_NAME;
23use common_macro::{Schema, ToRow};
24use common_meta::datanode::RegionStat;
25use common_meta::DatanodeId;
26use common_telemetry::warn;
27use dashmap::DashMap;
28use store_api::region_engine::RegionRole;
29use store_api::storage::RegionId;
30
31use crate::error::Result;
32use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
33use crate::metasrv::Context;
34
35/// The handler to persist stats.
36pub struct PersistStatsHandler {
37    inserter: Box<dyn Inserter>,
38    last_persisted_region_stats: DashMap<RegionId, PersistedRegionStat>,
39    last_persisted_time: DashMap<DatanodeId, Instant>,
40    persist_interval: Duration,
41}
42
43/// The name of the table to persist region stats.
44const META_REGION_STATS_HISTORY_TABLE_NAME: &str = "region_statistics_history";
45/// The default context to persist region stats.
46const DEFAULT_CONTEXT: InserterContext = InserterContext {
47    catalog: DEFAULT_CATALOG_NAME,
48    schema: DEFAULT_PRIVATE_SCHEMA_NAME,
49};
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52struct PersistedRegionStat {
53    region_id: RegionId,
54    written_bytes: u64,
55}
56
57impl From<&RegionStat> for PersistedRegionStat {
58    fn from(stat: &RegionStat) -> Self {
59        Self {
60            region_id: stat.id,
61            written_bytes: stat.written_bytes,
62        }
63    }
64}
65
66#[derive(ToRow, Schema)]
67struct PersistRegionStat<'a> {
68    table_id: u32,
69    region_id: u64,
70    region_number: u32,
71    manifest_size: u64,
72    datanode_id: u64,
73    #[col(datatype = "string")]
74    engine: &'a str,
75    num_rows: u64,
76    sst_num: u64,
77    sst_size: u64,
78    write_bytes_delta: u64,
79    #[col(
80        name = "greptime_timestamp",
81        semantic = "Timestamp",
82        datatype = "TimestampMillisecond"
83    )]
84    timestamp_millis: i64,
85}
86
87/// Compute the region stat to persist.
88fn compute_persist_region_stat(
89    region_stat: &RegionStat,
90    datanode_id: DatanodeId,
91    timestamp_millis: i64,
92    persisted_region_stat: Option<PersistedRegionStat>,
93) -> PersistRegionStat {
94    let write_bytes_delta = persisted_region_stat
95        .and_then(|persisted_region_stat| {
96            region_stat
97                .written_bytes
98                .checked_sub(persisted_region_stat.written_bytes)
99        })
100        .unwrap_or_default();
101
102    PersistRegionStat {
103        table_id: region_stat.id.table_id(),
104        region_id: region_stat.id.as_u64(),
105        region_number: region_stat.id.region_number(),
106        manifest_size: region_stat.manifest_size,
107        datanode_id,
108        engine: region_stat.engine.as_str(),
109        num_rows: region_stat.num_rows,
110        sst_num: region_stat.sst_num,
111        sst_size: region_stat.sst_size,
112        write_bytes_delta,
113        timestamp_millis,
114    }
115}
116
117fn to_persisted_if_leader(
118    region_stat: &RegionStat,
119    last_persisted_region_stats: &DashMap<RegionId, PersistedRegionStat>,
120    datanode_id: DatanodeId,
121    timestamp_millis: i64,
122) -> Option<(Row, PersistedRegionStat)> {
123    if matches!(region_stat.role, RegionRole::Leader) {
124        let persisted_region_stat = last_persisted_region_stats.get(&region_stat.id).map(|s| *s);
125        Some((
126            compute_persist_region_stat(
127                region_stat,
128                datanode_id,
129                timestamp_millis,
130                persisted_region_stat,
131            )
132            .to_row(),
133            PersistedRegionStat::from(region_stat),
134        ))
135    } else {
136        None
137    }
138}
139
140/// Align the timestamp to the nearest interval.
141///
142/// # Panics
143/// Panics if `interval` as milliseconds is zero.
144fn align_ts(ts: i64, interval: Duration) -> i64 {
145    assert!(
146        interval.as_millis() != 0,
147        "interval must be greater than zero"
148    );
149    ts / interval.as_millis() as i64 * interval.as_millis() as i64
150}
151
152impl PersistStatsHandler {
153    /// Creates a new [`PersistStatsHandler`].
154    pub fn new(inserter: Box<dyn Inserter>, mut persist_interval: Duration) -> Self {
155        if persist_interval < Duration::from_mins(10) {
156            warn!("persist_interval is less than 10 minutes, set to 10 minutes");
157            persist_interval = Duration::from_mins(10);
158        }
159
160        Self {
161            inserter,
162            last_persisted_region_stats: DashMap::new(),
163            last_persisted_time: DashMap::new(),
164            persist_interval,
165        }
166    }
167
168    fn should_persist(&self, datanode_id: DatanodeId) -> bool {
169        let Some(last_persisted_time) = self.last_persisted_time.get(&datanode_id) else {
170            return true;
171        };
172
173        last_persisted_time.elapsed() >= self.persist_interval
174    }
175
176    async fn persist(
177        &self,
178        timestamp_millis: i64,
179        datanode_id: DatanodeId,
180        region_stats: &[RegionStat],
181    ) {
182        // Safety: persist_interval is greater than zero.
183        let aligned_ts = align_ts(timestamp_millis, self.persist_interval);
184        let (rows, incoming_region_stats): (Vec<_>, Vec<_>) = region_stats
185            .iter()
186            .flat_map(|region_stat| {
187                to_persisted_if_leader(
188                    region_stat,
189                    &self.last_persisted_region_stats,
190                    datanode_id,
191                    aligned_ts,
192                )
193            })
194            .unzip();
195
196        if rows.is_empty() {
197            return;
198        }
199
200        if let Err(err) = self
201            .inserter
202            .insert_rows(
203                &DEFAULT_CONTEXT,
204                RowInsertRequests {
205                    inserts: vec![RowInsertRequest {
206                        table_name: META_REGION_STATS_HISTORY_TABLE_NAME.to_string(),
207                        rows: Some(Rows {
208                            schema: PersistRegionStat::schema(),
209                            rows,
210                        }),
211                    }],
212                },
213            )
214            .await
215        {
216            warn!(
217                "Failed to persist region stats, datanode_id: {}, error: {:?}",
218                datanode_id, err
219            );
220            return;
221        }
222
223        self.last_persisted_time.insert(datanode_id, Instant::now());
224        for s in incoming_region_stats {
225            self.last_persisted_region_stats.insert(s.region_id, s);
226        }
227    }
228}
229
230#[async_trait::async_trait]
231impl HeartbeatHandler for PersistStatsHandler {
232    fn is_acceptable(&self, role: Role) -> bool {
233        role == Role::Datanode
234    }
235
236    async fn handle(
237        &self,
238        _req: &HeartbeatRequest,
239        _: &mut Context,
240        acc: &mut HeartbeatAccumulator,
241    ) -> Result<HandleControl> {
242        let Some(current_stat) = acc.stat.as_ref() else {
243            return Ok(HandleControl::Continue);
244        };
245
246        if !self.should_persist(current_stat.id) {
247            return Ok(HandleControl::Continue);
248        }
249
250        self.persist(
251            current_stat.timestamp_millis,
252            current_stat.id,
253            &current_stat.region_stats,
254        )
255        .await;
256
257        Ok(HandleControl::Continue)
258    }
259}
260
261#[cfg(test)]
262mod tests {
263    use std::sync::{Arc, Mutex};
264
265    use client::inserter::{Context as InserterContext, InsertOptions};
266    use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
267    use store_api::region_engine::RegionRole;
268    use store_api::storage::RegionId;
269
270    use super::*;
271    use crate::handler::test_utils::TestEnv;
272
273    fn create_test_region_stat(
274        table_id: u32,
275        region_number: u32,
276        written_bytes: u64,
277        engine: &str,
278    ) -> RegionStat {
279        let region_id = RegionId::new(table_id, region_number);
280        RegionStat {
281            id: region_id,
282            rcus: 100,
283            wcus: 200,
284            approximate_bytes: 1024,
285            engine: engine.to_string(),
286            role: RegionRole::Leader,
287            num_rows: 1000,
288            memtable_size: 512,
289            manifest_size: 256,
290            sst_size: 2048,
291            sst_num: 5,
292            index_size: 128,
293            region_manifest: RegionManifestInfo::Mito {
294                manifest_version: 1,
295                flushed_entry_id: 100,
296            },
297            written_bytes,
298            data_topic_latest_entry_id: 200,
299            metadata_topic_latest_entry_id: 200,
300        }
301    }
302
303    #[test]
304    fn test_compute_persist_region_stat_with_no_persisted_stat() {
305        let region_stat = create_test_region_stat(1, 1, 1000, "mito");
306        let datanode_id = 123;
307        let timestamp_millis = 1640995200000; // 2022-01-01 00:00:00 UTC
308        let result = compute_persist_region_stat(&region_stat, datanode_id, timestamp_millis, None);
309        assert_eq!(result.table_id, 1);
310        assert_eq!(result.region_id, region_stat.id.as_u64());
311        assert_eq!(result.region_number, 1);
312        assert_eq!(result.manifest_size, 256);
313        assert_eq!(result.datanode_id, datanode_id);
314        assert_eq!(result.engine, "mito");
315        assert_eq!(result.num_rows, 1000);
316        assert_eq!(result.sst_num, 5);
317        assert_eq!(result.sst_size, 2048);
318        assert_eq!(result.write_bytes_delta, 0); // No previous stat, so delta is 0
319        assert_eq!(result.timestamp_millis, timestamp_millis);
320    }
321
322    #[test]
323    fn test_compute_persist_region_stat_with_persisted_stat_increase() {
324        let region_stat = create_test_region_stat(2, 3, 1500, "mito");
325        let datanode_id = 456;
326        let timestamp_millis = 1640995260000; // 2022-01-01 00:01:00 UTC
327        let persisted_stat = PersistedRegionStat {
328            region_id: region_stat.id,
329            written_bytes: 1000, // Previous write bytes
330        };
331        let result = compute_persist_region_stat(
332            &region_stat,
333            datanode_id,
334            timestamp_millis,
335            Some(persisted_stat),
336        );
337        assert_eq!(result.table_id, 2);
338        assert_eq!(result.region_id, region_stat.id.as_u64());
339        assert_eq!(result.region_number, 3);
340        assert_eq!(result.manifest_size, 256);
341        assert_eq!(result.datanode_id, datanode_id);
342        assert_eq!(result.engine, "mito");
343        assert_eq!(result.num_rows, 1000);
344        assert_eq!(result.sst_num, 5);
345        assert_eq!(result.sst_size, 2048);
346        assert_eq!(result.write_bytes_delta, 500); // 1500 - 1000 = 500
347        assert_eq!(result.timestamp_millis, timestamp_millis);
348    }
349
350    #[test]
351    fn test_compute_persist_region_stat_with_persisted_stat_decrease() {
352        let region_stat = create_test_region_stat(3, 5, 800, "mito");
353        let datanode_id = 789;
354        let timestamp_millis = 1640995320000; // 2022-01-01 00:02:00 UTC
355        let persisted_stat = PersistedRegionStat {
356            region_id: region_stat.id,
357            written_bytes: 1200, // Previous write bytes (higher than current)
358        };
359        let result = compute_persist_region_stat(
360            &region_stat,
361            datanode_id,
362            timestamp_millis,
363            Some(persisted_stat),
364        );
365        assert_eq!(result.table_id, 3);
366        assert_eq!(result.region_id, region_stat.id.as_u64());
367        assert_eq!(result.region_number, 5);
368        assert_eq!(result.manifest_size, 256);
369        assert_eq!(result.datanode_id, datanode_id);
370        assert_eq!(result.engine, "mito");
371        assert_eq!(result.num_rows, 1000);
372        assert_eq!(result.sst_num, 5);
373        assert_eq!(result.sst_size, 2048);
374        assert_eq!(result.write_bytes_delta, 0); // 800 - 1200 would be negative, so 0 due to checked_sub
375        assert_eq!(result.timestamp_millis, timestamp_millis);
376    }
377
378    #[test]
379    fn test_compute_persist_region_stat_with_persisted_stat_equal() {
380        let region_stat = create_test_region_stat(4, 7, 2000, "mito");
381        let datanode_id = 101;
382        let timestamp_millis = 1640995380000; // 2022-01-01 00:03:00 UTC
383        let persisted_stat = PersistedRegionStat {
384            region_id: region_stat.id,
385            written_bytes: 2000, // Same as current write bytes
386        };
387        let result = compute_persist_region_stat(
388            &region_stat,
389            datanode_id,
390            timestamp_millis,
391            Some(persisted_stat),
392        );
393        assert_eq!(result.table_id, 4);
394        assert_eq!(result.region_id, region_stat.id.as_u64());
395        assert_eq!(result.region_number, 7);
396        assert_eq!(result.manifest_size, 256);
397        assert_eq!(result.datanode_id, datanode_id);
398        assert_eq!(result.engine, "mito");
399        assert_eq!(result.num_rows, 1000);
400        assert_eq!(result.sst_num, 5);
401        assert_eq!(result.sst_size, 2048);
402        assert_eq!(result.write_bytes_delta, 0); // 2000 - 2000 = 0
403        assert_eq!(result.timestamp_millis, timestamp_millis);
404    }
405
406    #[test]
407    fn test_compute_persist_region_stat_with_overflow_protection() {
408        let region_stat = create_test_region_stat(8, 15, 500, "mito");
409        let datanode_id = 505;
410        let timestamp_millis = 1640995620000; // 2022-01-01 00:07:00 UTC
411        let persisted_stat = PersistedRegionStat {
412            region_id: region_stat.id,
413            written_bytes: 1000, // Higher than current, would cause underflow
414        };
415        let result = compute_persist_region_stat(
416            &region_stat,
417            datanode_id,
418            timestamp_millis,
419            Some(persisted_stat),
420        );
421        assert_eq!(result.table_id, 8);
422        assert_eq!(result.region_id, region_stat.id.as_u64());
423        assert_eq!(result.region_number, 15);
424        assert_eq!(result.manifest_size, 256);
425        assert_eq!(result.datanode_id, datanode_id);
426        assert_eq!(result.engine, "mito");
427        assert_eq!(result.num_rows, 1000);
428        assert_eq!(result.sst_num, 5);
429        assert_eq!(result.sst_size, 2048);
430        assert_eq!(result.write_bytes_delta, 0); // checked_sub returns None, so default to 0
431        assert_eq!(result.timestamp_millis, timestamp_millis);
432    }
433
434    struct MockInserter {
435        requests: Arc<Mutex<Vec<api::v1::RowInsertRequest>>>,
436    }
437
438    #[async_trait::async_trait]
439    impl Inserter for MockInserter {
440        async fn insert_rows(
441            &self,
442            _context: &InserterContext<'_>,
443            requests: api::v1::RowInsertRequests,
444        ) -> client::error::Result<()> {
445            self.requests.lock().unwrap().extend(requests.inserts);
446
447            Ok(())
448        }
449
450        fn set_options(&mut self, _options: &InsertOptions) {}
451    }
452
453    #[tokio::test]
454    async fn test_not_persist_region_stats() {
455        let env = TestEnv::new();
456        let mut ctx = env.ctx();
457
458        let requests = Arc::new(Mutex::new(vec![]));
459        let inserter = MockInserter {
460            requests: requests.clone(),
461        };
462        let handler = PersistStatsHandler::new(Box::new(inserter), Duration::from_secs(10));
463        let mut acc = HeartbeatAccumulator {
464            stat: Some(Stat {
465                id: 1,
466                timestamp_millis: 1640995200000,
467                region_stats: vec![create_test_region_stat(1, 1, 1000, "mito")],
468                ..Default::default()
469            }),
470            ..Default::default()
471        };
472        handler.last_persisted_time.insert(1, Instant::now());
473        // Do not persist
474        handler
475            .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
476            .await
477            .unwrap();
478        assert!(requests.lock().unwrap().is_empty());
479    }
480
481    #[tokio::test]
482    async fn test_persist_region_stats() {
483        let env = TestEnv::new();
484        let mut ctx = env.ctx();
485        let requests = Arc::new(Mutex::new(vec![]));
486        let inserter = MockInserter {
487            requests: requests.clone(),
488        };
489
490        let handler = PersistStatsHandler::new(Box::new(inserter), Duration::from_secs(10));
491
492        let region_stat = create_test_region_stat(1, 1, 1000, "mito");
493        let timestamp_millis = 1640995200000;
494        let datanode_id = 1;
495        let region_id = RegionId::new(1, 1);
496        let mut acc = HeartbeatAccumulator {
497            stat: Some(Stat {
498                id: datanode_id,
499                timestamp_millis,
500                region_stats: vec![region_stat.clone()],
501                ..Default::default()
502            }),
503            ..Default::default()
504        };
505
506        handler.last_persisted_region_stats.insert(
507            region_id,
508            PersistedRegionStat {
509                region_id,
510                written_bytes: 500,
511            },
512        );
513        let (expected_row, expected_persisted_region_stat) = to_persisted_if_leader(
514            &region_stat,
515            &handler.last_persisted_region_stats,
516            datanode_id,
517            timestamp_millis,
518        )
519        .unwrap();
520        let before_insert_time = Instant::now();
521        // Persist
522        handler
523            .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
524            .await
525            .unwrap();
526        let request = {
527            let mut requests = requests.lock().unwrap();
528            assert_eq!(requests.len(), 1);
529            requests.pop().unwrap()
530        };
531        assert_eq!(
532            request.table_name,
533            META_REGION_STATS_HISTORY_TABLE_NAME.to_string()
534        );
535        assert_eq!(request.rows.unwrap().rows, vec![expected_row]);
536
537        // Check last persisted time
538        assert!(handler
539            .last_persisted_time
540            .get(&datanode_id)
541            .unwrap()
542            .gt(&before_insert_time));
543
544        // Check last persisted region stats
545        assert_eq!(
546            handler
547                .last_persisted_region_stats
548                .get(&region_id)
549                .unwrap()
550                .value(),
551            &expected_persisted_region_stat
552        );
553    }
554}