metric_engine/
repeated_task.rs1use std::sync::{Arc, RwLock};
16use std::time::Instant;
17
18use common_runtime::TaskFunction;
19use common_telemetry::{debug, error};
20use mito2::engine::MitoEngine;
21use store_api::region_engine::{RegionEngine, RegionRole};
22use store_api::region_request::{RegionFlushRequest, RegionRequest};
23
24use crate::engine::MetricEngineState;
25use crate::error::{Error, Result};
26use crate::utils;
27
28pub(crate) struct FlushMetadataRegionTask {
33 pub(crate) state: Arc<RwLock<MetricEngineState>>,
34 pub(crate) mito: MitoEngine,
35}
36
37#[async_trait::async_trait]
38impl TaskFunction<Error> for FlushMetadataRegionTask {
39 fn name(&self) -> &str {
40 "FlushMetadataRegionTask"
41 }
42
43 async fn call(&mut self) -> Result<()> {
44 let region_ids = {
45 let state = self.state.read().unwrap();
46 state
47 .physical_region_states()
48 .keys()
49 .cloned()
50 .collect::<Vec<_>>()
51 };
52
53 let num_region = region_ids.len();
54 let now = Instant::now();
55 for region_id in region_ids {
56 let Some(role) = self.mito.role(region_id) else {
57 continue;
58 };
59 if role == RegionRole::Follower {
60 continue;
61 }
62 let metadata_region_id = utils::to_metadata_region_id(region_id);
63 if let Err(e) = self
64 .mito
65 .handle_request(
66 metadata_region_id,
67 RegionRequest::Flush(RegionFlushRequest {
68 row_group_size: None,
69 }),
70 )
71 .await
72 {
73 error!(e; "Failed to flush metadata region {}", metadata_region_id);
74 }
75 }
76 debug!(
77 "Flushed {} metadata regions, elapsed: {:?}",
78 num_region,
79 now.elapsed()
80 );
81
82 Ok(())
83 }
84}
85
86#[cfg(test)]
87mod tests {
88 use std::assert_matches::assert_matches;
89 use std::time::Duration;
90
91 use store_api::region_engine::{RegionEngine, RegionManifestInfo};
92
93 use crate::config::{EngineConfig, DEFAULT_FLUSH_METADATA_REGION_INTERVAL};
94 use crate::test_util::TestEnv;
95
96 #[tokio::test]
97 async fn test_flush_metadata_region_task() {
98 let env = TestEnv::with_prefix_and_config(
99 "test_flush_metadata_region_task",
100 EngineConfig {
101 flush_metadata_region_interval: Duration::from_millis(100),
102 ..Default::default()
103 },
104 )
105 .await;
106 env.init_metric_region().await;
107 let engine = env.metric();
108 tokio::time::sleep(Duration::from_millis(200)).await;
110 let physical_region_id = env.default_physical_region_id();
111 let stat = engine.region_statistic(physical_region_id).unwrap();
112
113 assert_matches!(
114 stat.manifest,
115 RegionManifestInfo::Metric {
116 metadata_manifest_version: 1,
117 metadata_flushed_entry_id: 1,
118 ..
119 }
120 )
121 }
122
123 #[tokio::test]
124 async fn test_flush_metadata_region_task_with_long_interval() {
125 let env = TestEnv::with_prefix_and_config(
126 "test_flush_metadata_region_task_with_long_interval",
127 EngineConfig {
128 flush_metadata_region_interval: Duration::from_secs(60),
129 ..Default::default()
130 },
131 )
132 .await;
133 env.init_metric_region().await;
134 let engine = env.metric();
135 tokio::time::sleep(Duration::from_millis(200)).await;
137 let physical_region_id = env.default_physical_region_id();
138 let stat = engine.region_statistic(physical_region_id).unwrap();
139
140 assert_matches!(
141 stat.manifest,
142 RegionManifestInfo::Metric {
143 metadata_manifest_version: 0,
144 metadata_flushed_entry_id: 0,
145 ..
146 }
147 )
148 }
149
150 #[tokio::test]
151 async fn test_flush_metadata_region_sanitize() {
152 let env = TestEnv::with_prefix_and_config(
153 "test_flush_metadata_region_sanitize",
154 EngineConfig {
155 flush_metadata_region_interval: Duration::from_secs(0),
156 ..Default::default()
157 },
158 )
159 .await;
160 let metric = env.metric();
161 let config = metric.config();
162 assert_eq!(
163 config.flush_metadata_region_interval,
164 DEFAULT_FLUSH_METADATA_REGION_INTERVAL
165 );
166 }
167}