1use std::collections::HashMap;
18use std::sync::Arc;
19
20use common_telemetry::{debug, error, warn};
21use store_api::region_request::RegionBuildIndexRequest;
22use store_api::storage::{FileId, RegionId};
23use tokio::sync::mpsc;
24
25use crate::cache::CacheStrategy;
26use crate::error::Result;
27use crate::region::MitoRegionRef;
28use crate::request::{
29 BuildIndexRequest, IndexBuildFailed, IndexBuildFinished, IndexBuildStopped, OptionOutputTx,
30};
31use crate::sst::file::{FileHandle, RegionFileId};
32use crate::sst::index::{
33 IndexBuildOutcome, IndexBuildTask, IndexBuildType, IndexerBuilderImpl, ResultMpscSender,
34};
35use crate::sst::parquet::WriteOptions;
36use crate::worker::RegionWorkerLoop;
37
38impl<S> RegionWorkerLoop<S> {
39 pub(crate) fn new_index_build_task(
40 &self,
41 region: &MitoRegionRef,
42 file: FileHandle,
43 build_type: IndexBuildType,
44 result_sender: ResultMpscSender,
45 ) -> IndexBuildTask {
46 let version = region.version();
47 let access_layer = region.access_layer.clone();
48
49 let puffin_manager = if let Some(write_cache) = self.cache_manager.write_cache() {
50 write_cache.build_puffin_manager()
51 } else {
52 access_layer.build_puffin_manager()
53 };
54
55 let intermediate_manager = if let Some(write_cache) = self.cache_manager.write_cache() {
56 write_cache.intermediate_manager().clone()
57 } else {
58 access_layer.intermediate_manager().clone()
59 };
60
61 let indexer_builder_ref = Arc::new(IndexerBuilderImpl {
62 build_type: build_type.clone(),
63 metadata: version.metadata.clone(),
64 inverted_index_config: self.config.inverted_index.clone(),
65 fulltext_index_config: self.config.fulltext_index.clone(),
66 bloom_filter_index_config: self.config.bloom_filter_index.clone(),
67 index_options: version.options.index_options.clone(),
68 row_group_size: WriteOptions::default().row_group_size,
69 intermediate_manager,
70 puffin_manager,
71 });
72
73 IndexBuildTask {
74 file_meta: file.meta_ref().clone(),
75 reason: build_type,
76 access_layer: access_layer.clone(),
77 listener: self.listener.clone(),
78 manifest_ctx: region.manifest_ctx.clone(),
79 write_cache: self.cache_manager.write_cache().cloned(),
80 file_purger: file.file_purger(),
81 request_sender: self.sender.clone(),
82 indexer_builder: indexer_builder_ref.clone(),
83 result_sender,
84 }
85 }
86
87 pub(crate) async fn handle_build_index_request(
90 &mut self,
91 region_id: RegionId,
92 _req: RegionBuildIndexRequest,
93 sender: OptionOutputTx,
94 ) {
95 self.handle_rebuild_index(
96 BuildIndexRequest {
97 region_id,
98 build_type: IndexBuildType::Manual,
99 file_metas: Vec::new(),
100 },
101 sender,
102 )
103 .await;
104 }
105
106 pub(crate) async fn handle_rebuild_index(
107 &mut self,
108 request: BuildIndexRequest,
109 mut sender: OptionOutputTx,
110 ) {
111 let region_id = request.region_id;
112 let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
113 return;
114 };
115
116 let version_control = region.version_control.clone();
117 let version = version_control.current().version;
118
119 let all_files: HashMap<FileId, FileHandle> = version
120 .ssts
121 .levels()
122 .iter()
123 .flat_map(|level| level.files.iter())
124 .filter(|(_, handle)| !handle.is_deleted() && !handle.compacting())
125 .map(|(id, handle)| (*id, handle.clone()))
126 .collect();
127
128 let build_tasks = if request.file_metas.is_empty() {
129 all_files.values().cloned().collect::<Vec<_>>()
133 } else {
134 request
135 .file_metas
136 .iter()
137 .filter_map(|meta| all_files.get(&meta.file_id).cloned())
138 .collect::<Vec<_>>()
139 };
140
141 if build_tasks.is_empty() {
142 debug!(
143 "No files need to build index for region {}, request: {:?}",
144 region_id, request
145 );
146 sender.send(Ok(0));
147 return;
148 }
149
150 let num_tasks = build_tasks.len();
151 let (tx, mut rx) = mpsc::channel::<Result<IndexBuildOutcome>>(num_tasks);
152
153 for file_handle in build_tasks {
154 debug!(
155 "Scheduling index build for region {}, file_id {}",
156 region_id,
157 file_handle.meta_ref().file_id
158 );
159
160 if region.should_abort_index() {
161 warn!(
162 "Region {} is in state {:?}, abort index rebuild process for file_id {}",
163 region_id,
164 region.state(),
165 file_handle.meta_ref().file_id
166 );
167 break;
168 }
169
170 let task = self.new_index_build_task(
171 ®ion,
172 file_handle.clone(),
173 request.build_type.clone(),
174 tx.clone(),
175 );
176 let _ = self
177 .index_build_scheduler
178 .schedule_build(®ion.version_control, task)
179 .await;
180 }
181 common_runtime::spawn_global(async move {
183 for _ in 0..num_tasks {
184 if let Some(Err(e)) = rx.recv().await {
185 warn!(e; "Index build task failed for region: {}", region_id);
186 sender.send(Err(e));
187 return;
188 }
189 }
190 sender.send(Ok(0));
191 });
192 }
193
194 pub(crate) async fn handle_index_build_finished(
195 &mut self,
196 region_id: RegionId,
197 request: IndexBuildFinished,
198 ) {
199 let region = match self.regions.get_region(region_id) {
200 Some(region) => region,
201 None => {
202 warn!(
203 "Region not found for index build finished, region_id: {}",
204 region_id
205 );
206 return;
207 }
208 };
209
210 let cache_strategy = CacheStrategy::EnableAll(self.cache_manager.clone());
212 for file_meta in &request.edit.files_to_add {
213 let region_file_id = RegionFileId::new(region_id, file_meta.file_id);
214 cache_strategy.evict_puffin_cache(region_file_id).await;
215 }
216
217 region.version_control.apply_edit(
218 Some(request.edit.clone()),
219 &[],
220 region.file_purger.clone(),
221 );
222
223 for file_meta in &request.edit.files_to_add {
224 self.listener
225 .on_index_build_finish(RegionFileId::new(region_id, file_meta.file_id))
226 .await;
227 }
228 }
229
230 pub(crate) async fn handle_index_build_failed(
231 &mut self,
232 region_id: RegionId,
233 request: IndexBuildFailed,
234 ) {
235 error!(request.err; "Index build failed for region: {}", region_id);
236 self.index_build_scheduler
237 .on_failure(region_id, request.err.clone())
238 .await;
239 }
240
241 pub(crate) async fn handle_index_build_stopped(
242 &mut self,
243 region_id: RegionId,
244 request: IndexBuildStopped,
245 ) {
246 let Some(region) = self.regions.get_region(region_id) else {
247 warn!(
248 "Region not found for index build stopped, region_id: {}",
249 region_id
250 );
251 return;
252 };
253 self.index_build_scheduler.on_task_stopped(
254 region_id,
255 request.file_id,
256 ®ion.version_control,
257 );
258 }
259}