metric_engine/
repeated_task.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, RwLock};
16use std::time::Instant;
17
18use common_runtime::TaskFunction;
19use common_telemetry::{debug, error};
20use mito2::engine::MitoEngine;
21use store_api::region_engine::{RegionEngine, RegionRole};
22use store_api::region_request::{RegionFlushRequest, RegionRequest};
23
24use crate::engine::MetricEngineState;
25use crate::error::{Error, Result};
26use crate::utils;
27
28/// Task to flush metadata regions.
29///
30/// This task is used to send flush requests to the metadata regions
31/// periodically.
32pub(crate) struct FlushMetadataRegionTask {
33    pub(crate) state: Arc<RwLock<MetricEngineState>>,
34    pub(crate) mito: MitoEngine,
35}
36
37#[async_trait::async_trait]
38impl TaskFunction<Error> for FlushMetadataRegionTask {
39    fn name(&self) -> &str {
40        "FlushMetadataRegionTask"
41    }
42
43    async fn call(&mut self) -> Result<()> {
44        let region_ids = {
45            let state = self.state.read().unwrap();
46            state
47                .physical_region_states()
48                .keys()
49                .cloned()
50                .collect::<Vec<_>>()
51        };
52
53        let num_region = region_ids.len();
54        let now = Instant::now();
55        for region_id in region_ids {
56            let Some(role) = self.mito.role(region_id) else {
57                continue;
58            };
59            if role == RegionRole::Follower {
60                continue;
61            }
62            let metadata_region_id = utils::to_metadata_region_id(region_id);
63            if let Err(e) = self
64                .mito
65                .handle_request(
66                    metadata_region_id,
67                    RegionRequest::Flush(RegionFlushRequest {
68                        row_group_size: None,
69                    }),
70                )
71                .await
72            {
73                error!(e; "Failed to flush metadata region {}", metadata_region_id);
74            }
75        }
76        debug!(
77            "Flushed {} metadata regions, elapsed: {:?}",
78            num_region,
79            now.elapsed()
80        );
81
82        Ok(())
83    }
84}
85
86#[cfg(test)]
87mod tests {
88    use std::assert_matches::assert_matches;
89    use std::time::Duration;
90
91    use store_api::region_engine::{RegionEngine, RegionManifestInfo};
92
93    use crate::config::{EngineConfig, DEFAULT_FLUSH_METADATA_REGION_INTERVAL};
94    use crate::test_util::TestEnv;
95
96    #[tokio::test]
97    async fn test_flush_metadata_region_task() {
98        let env = TestEnv::with_prefix_and_config(
99            "test_flush_metadata_region_task",
100            EngineConfig {
101                flush_metadata_region_interval: Duration::from_millis(100),
102                ..Default::default()
103            },
104        )
105        .await;
106        env.init_metric_region().await;
107        let engine = env.metric();
108        // Wait for flush task run
109        tokio::time::sleep(Duration::from_millis(200)).await;
110        let physical_region_id = env.default_physical_region_id();
111        let stat = engine.region_statistic(physical_region_id).unwrap();
112
113        assert_matches!(
114            stat.manifest,
115            RegionManifestInfo::Metric {
116                metadata_manifest_version: 1,
117                metadata_flushed_entry_id: 1,
118                ..
119            }
120        )
121    }
122
123    #[tokio::test]
124    async fn test_flush_metadata_region_task_with_long_interval() {
125        let env = TestEnv::with_prefix_and_config(
126            "test_flush_metadata_region_task_with_long_interval",
127            EngineConfig {
128                flush_metadata_region_interval: Duration::from_secs(60),
129                ..Default::default()
130            },
131        )
132        .await;
133        env.init_metric_region().await;
134        let engine = env.metric();
135        // Wait for flush task run, should not flush metadata region
136        tokio::time::sleep(Duration::from_millis(200)).await;
137        let physical_region_id = env.default_physical_region_id();
138        let stat = engine.region_statistic(physical_region_id).unwrap();
139
140        assert_matches!(
141            stat.manifest,
142            RegionManifestInfo::Metric {
143                metadata_manifest_version: 0,
144                metadata_flushed_entry_id: 0,
145                ..
146            }
147        )
148    }
149
150    #[tokio::test]
151    async fn test_flush_metadata_region_sanitize() {
152        let env = TestEnv::with_prefix_and_config(
153            "test_flush_metadata_region_sanitize",
154            EngineConfig {
155                flush_metadata_region_interval: Duration::from_secs(0),
156                ..Default::default()
157            },
158        )
159        .await;
160        let metric = env.metric();
161        let config = metric.config();
162        assert_eq!(
163            config.flush_metadata_region_interval,
164            DEFAULT_FLUSH_METADATA_REGION_INTERVAL
165        );
166    }
167}