metric_engine/
repeated_task.rs1use std::sync::{Arc, RwLock};
16use std::time::Instant;
17
18use common_runtime::TaskFunction;
19use common_telemetry::{debug, error};
20use mito2::engine::MitoEngine;
21use store_api::region_engine::{RegionEngine, RegionRole};
22use store_api::region_request::{RegionFlushRequest, RegionRequest};
23
24use crate::engine::MetricEngineState;
25use crate::error::{Error, Result};
26use crate::utils;
27
28pub(crate) struct FlushMetadataRegionTask {
33 pub(crate) state: Arc<RwLock<MetricEngineState>>,
34 pub(crate) mito: MitoEngine,
35}
36
37#[async_trait::async_trait]
38impl TaskFunction<Error> for FlushMetadataRegionTask {
39 fn name(&self) -> &str {
40 "FlushMetadataRegionTask"
41 }
42
43 async fn call(&mut self) -> Result<()> {
44 let region_ids = {
45 let state = self.state.read().unwrap();
46 state
47 .physical_region_states()
48 .keys()
49 .cloned()
50 .collect::<Vec<_>>()
51 };
52
53 let num_region = region_ids.len();
54 let now = Instant::now();
55 for region_id in region_ids {
56 let Some(role) = self.mito.role(region_id) else {
57 continue;
58 };
59 if role == RegionRole::Follower {
60 continue;
61 }
62 let metadata_region_id = utils::to_metadata_region_id(region_id);
63 if let Err(e) = self
64 .mito
65 .handle_request(
66 metadata_region_id,
67 RegionRequest::Flush(RegionFlushRequest::default()),
68 )
69 .await
70 {
71 error!(e; "Failed to flush metadata region {}", metadata_region_id);
72 }
73 }
74 debug!(
75 "Flushed {} metadata regions, elapsed: {:?}",
76 num_region,
77 now.elapsed()
78 );
79
80 Ok(())
81 }
82}
83
84#[cfg(test)]
85mod tests {
86 use std::assert_matches;
87 use std::time::Duration;
88
89 use store_api::region_engine::{RegionEngine, RegionManifestInfo};
90
91 use crate::config::{DEFAULT_FLUSH_METADATA_REGION_INTERVAL, EngineConfig};
92 use crate::test_util::TestEnv;
93
94 #[tokio::test]
95 async fn test_flush_metadata_region_task() {
96 let env = TestEnv::with_prefix_and_config(
97 "test_flush_metadata_region_task",
98 EngineConfig {
99 flush_metadata_region_interval: Duration::from_millis(10),
100 ..Default::default()
101 },
102 )
103 .await;
104 env.init_metric_region().await;
105 let engine = env.metric();
106 tokio::time::sleep(Duration::from_millis(500)).await;
108 let physical_region_id = env.default_physical_region_id();
109 let stat = engine.region_statistic(physical_region_id).unwrap();
110
111 assert_matches!(
112 stat.manifest,
113 RegionManifestInfo::Metric {
114 metadata_manifest_version: 1,
115 metadata_flushed_entry_id: 1,
116 ..
117 }
118 )
119 }
120
121 #[tokio::test]
122 async fn test_flush_metadata_region_task_with_long_interval() {
123 let env = TestEnv::with_prefix_and_config(
124 "test_flush_metadata_region_task_with_long_interval",
125 EngineConfig {
126 flush_metadata_region_interval: Duration::from_secs(60),
127 ..Default::default()
128 },
129 )
130 .await;
131 env.init_metric_region().await;
132 let engine = env.metric();
133 tokio::time::sleep(Duration::from_millis(200)).await;
135 let physical_region_id = env.default_physical_region_id();
136 let stat = engine.region_statistic(physical_region_id).unwrap();
137
138 assert_matches!(
139 stat.manifest,
140 RegionManifestInfo::Metric {
141 metadata_manifest_version: 0,
142 metadata_flushed_entry_id: 0,
143 ..
144 }
145 )
146 }
147
148 #[tokio::test]
149 async fn test_flush_metadata_region_sanitize() {
150 let env = TestEnv::with_prefix_and_config(
151 "test_flush_metadata_region_sanitize",
152 EngineConfig {
153 flush_metadata_region_interval: Duration::from_secs(0),
154 ..Default::default()
155 },
156 )
157 .await;
158 let metric = env.metric();
159 let config = metric.config();
160 assert_eq!(
161 config.flush_metadata_region_interval,
162 DEFAULT_FLUSH_METADATA_REGION_INTERVAL
163 );
164 }
165}