Skip to main content

metric_engine/
repeated_task.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, RwLock};
16use std::time::Instant;
17
18use common_runtime::TaskFunction;
19use common_telemetry::{debug, error};
20use mito2::engine::MitoEngine;
21use store_api::region_engine::{RegionEngine, RegionRole};
22use store_api::region_request::{RegionFlushRequest, RegionRequest};
23
24use crate::engine::MetricEngineState;
25use crate::error::{Error, Result};
26use crate::utils;
27
28/// Task to flush metadata regions.
29///
30/// This task is used to send flush requests to the metadata regions
31/// periodically.
32pub(crate) struct FlushMetadataRegionTask {
33    pub(crate) state: Arc<RwLock<MetricEngineState>>,
34    pub(crate) mito: MitoEngine,
35}
36
37#[async_trait::async_trait]
38impl TaskFunction<Error> for FlushMetadataRegionTask {
39    fn name(&self) -> &str {
40        "FlushMetadataRegionTask"
41    }
42
43    async fn call(&mut self) -> Result<()> {
44        let region_ids = {
45            let state = self.state.read().unwrap();
46            state
47                .physical_region_states()
48                .keys()
49                .cloned()
50                .collect::<Vec<_>>()
51        };
52
53        let num_region = region_ids.len();
54        let now = Instant::now();
55        for region_id in region_ids {
56            let Some(role) = self.mito.role(region_id) else {
57                continue;
58            };
59            if role == RegionRole::Follower {
60                continue;
61            }
62            let metadata_region_id = utils::to_metadata_region_id(region_id);
63            if let Err(e) = self
64                .mito
65                .handle_request(
66                    metadata_region_id,
67                    RegionRequest::Flush(RegionFlushRequest::default()),
68                )
69                .await
70            {
71                error!(e; "Failed to flush metadata region {}", metadata_region_id);
72            }
73        }
74        debug!(
75            "Flushed {} metadata regions, elapsed: {:?}",
76            num_region,
77            now.elapsed()
78        );
79
80        Ok(())
81    }
82}
83
84#[cfg(test)]
85mod tests {
86    use std::assert_matches;
87    use std::time::Duration;
88
89    use store_api::region_engine::{RegionEngine, RegionManifestInfo};
90
91    use crate::config::{DEFAULT_FLUSH_METADATA_REGION_INTERVAL, EngineConfig};
92    use crate::test_util::TestEnv;
93
94    #[tokio::test]
95    async fn test_flush_metadata_region_task() {
96        let env = TestEnv::with_prefix_and_config(
97            "test_flush_metadata_region_task",
98            EngineConfig {
99                flush_metadata_region_interval: Duration::from_millis(10),
100                ..Default::default()
101            },
102        )
103        .await;
104        env.init_metric_region().await;
105        let engine = env.metric();
106        // Wait for flush task run
107        tokio::time::sleep(Duration::from_millis(500)).await;
108        let physical_region_id = env.default_physical_region_id();
109        let stat = engine.region_statistic(physical_region_id).unwrap();
110
111        assert_matches!(
112            stat.manifest,
113            RegionManifestInfo::Metric {
114                metadata_manifest_version: 1,
115                metadata_flushed_entry_id: 1,
116                ..
117            }
118        )
119    }
120
121    #[tokio::test]
122    async fn test_flush_metadata_region_task_with_long_interval() {
123        let env = TestEnv::with_prefix_and_config(
124            "test_flush_metadata_region_task_with_long_interval",
125            EngineConfig {
126                flush_metadata_region_interval: Duration::from_secs(60),
127                ..Default::default()
128            },
129        )
130        .await;
131        env.init_metric_region().await;
132        let engine = env.metric();
133        // Wait for flush task run, should not flush metadata region
134        tokio::time::sleep(Duration::from_millis(200)).await;
135        let physical_region_id = env.default_physical_region_id();
136        let stat = engine.region_statistic(physical_region_id).unwrap();
137
138        assert_matches!(
139            stat.manifest,
140            RegionManifestInfo::Metric {
141                metadata_manifest_version: 0,
142                metadata_flushed_entry_id: 0,
143                ..
144            }
145        )
146    }
147
148    #[tokio::test]
149    async fn test_flush_metadata_region_sanitize() {
150        let env = TestEnv::with_prefix_and_config(
151            "test_flush_metadata_region_sanitize",
152            EngineConfig {
153                flush_metadata_region_interval: Duration::from_secs(0),
154                ..Default::default()
155            },
156        )
157        .await;
158        let metric = env.metric();
159        let config = metric.config();
160        assert_eq!(
161            config.flush_metadata_region_interval,
162            DEFAULT_FLUSH_METADATA_REGION_INTERVAL
163        );
164    }
165}