1use std::collections::HashMap;
18use std::sync::Arc;
19
20use common_telemetry::{debug, error, warn};
21use store_api::region_request::RegionBuildIndexRequest;
22use store_api::storage::{FileId, RegionId};
23use tokio::sync::mpsc;
24
25use crate::cache::CacheStrategy;
26use crate::error::Result;
27use crate::region::MitoRegionRef;
28use crate::request::{
29 BuildIndexRequest, IndexBuildFailed, IndexBuildFinished, IndexBuildStopped, OptionOutputTx,
30};
31use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId};
32use crate::sst::index::{
33 IndexBuildOutcome, IndexBuildTask, IndexBuildType, IndexerBuilderImpl, ResultMpscSender,
34};
35use crate::sst::parquet::WriteOptions;
36use crate::worker::RegionWorkerLoop;
37
38impl<S> RegionWorkerLoop<S> {
39 pub(crate) fn new_index_build_task(
40 &self,
41 region: &MitoRegionRef,
42 file: FileHandle,
43 build_type: IndexBuildType,
44 result_sender: ResultMpscSender,
45 ) -> IndexBuildTask {
46 let version = region.version();
47 let access_layer = region.access_layer.clone();
48
49 let puffin_manager = if let Some(write_cache) = self.cache_manager.write_cache() {
50 write_cache.build_puffin_manager()
51 } else {
52 access_layer.build_puffin_manager()
53 };
54
55 let intermediate_manager = if let Some(write_cache) = self.cache_manager.write_cache() {
56 write_cache.intermediate_manager().clone()
57 } else {
58 access_layer.intermediate_manager().clone()
59 };
60
61 let indexer_builder_ref = Arc::new(IndexerBuilderImpl {
62 build_type: build_type.clone(),
63 metadata: version.metadata.clone(),
64 inverted_index_config: self.config.inverted_index.clone(),
65 fulltext_index_config: self.config.fulltext_index.clone(),
66 bloom_filter_index_config: self.config.bloom_filter_index.clone(),
67 #[cfg(feature = "vector_index")]
68 vector_index_config: self.config.vector_index.clone(),
69 index_options: version.options.index_options.clone(),
70 row_group_size: WriteOptions::default().row_group_size,
71 intermediate_manager,
72 puffin_manager,
73 write_cache_enabled: self.cache_manager.write_cache().is_some(),
74 });
75
76 IndexBuildTask {
77 file: file.clone(),
78 file_meta: file.meta_ref().clone(),
79 reason: build_type,
80 access_layer: access_layer.clone(),
81 listener: self.listener.clone(),
82 manifest_ctx: region.manifest_ctx.clone(),
83 write_cache: self.cache_manager.write_cache().cloned(),
84 file_purger: file.file_purger(),
85 request_sender: self.sender.clone(),
86 indexer_builder: indexer_builder_ref.clone(),
87 result_sender,
88 }
89 }
90
91 pub(crate) async fn handle_build_index_request(
93 &mut self,
94 region_id: RegionId,
95 _req: RegionBuildIndexRequest,
96 sender: OptionOutputTx,
97 ) {
98 self.handle_rebuild_index(
99 BuildIndexRequest {
100 region_id,
101 build_type: IndexBuildType::Manual,
102 file_metas: Vec::new(),
103 },
104 sender,
105 )
106 .await;
107 }
108
109 pub(crate) async fn handle_rebuild_index(
110 &mut self,
111 request: BuildIndexRequest,
112 mut sender: OptionOutputTx,
113 ) {
114 let region_id = request.region_id;
115 let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
116 return;
117 };
118
119 let version_control = region.version_control.clone();
120 let version = version_control.current().version;
121
122 let all_files: HashMap<FileId, FileHandle> = version
123 .ssts
124 .levels()
125 .iter()
126 .flat_map(|level| level.files.iter())
127 .filter(|(_, handle)| !handle.is_deleted() && !handle.compacting())
128 .map(|(id, handle)| (*id, handle.clone()))
129 .collect();
130
131 let build_tasks = if request.file_metas.is_empty() {
132 all_files
134 .values()
135 .filter(|file| {
136 !file
137 .meta_ref()
138 .is_index_consistent_with_region(&version.metadata.column_metadatas)
139 })
140 .cloned()
141 .collect::<Vec<_>>()
142 } else {
143 request
144 .file_metas
145 .iter()
146 .filter_map(|meta| all_files.get(&meta.file_id).cloned())
147 .collect::<Vec<_>>()
148 };
149
150 if build_tasks.is_empty() {
151 debug!(
152 "No files need to build index for region {}, request: {:?}",
153 region_id, request
154 );
155 sender.send(Ok(0));
156 return;
157 }
158
159 let num_tasks = build_tasks.len();
160 let (tx, mut rx) = mpsc::channel::<Result<IndexBuildOutcome>>(num_tasks);
161
162 for file_handle in build_tasks {
163 debug!(
164 "Scheduling index build for region {}, file_id {}",
165 region_id,
166 file_handle.meta_ref().file_id
167 );
168
169 if region.should_abort_index() {
170 warn!(
171 "Region {} is in state {:?}, abort index rebuild process for file_id {}",
172 region_id,
173 region.state(),
174 file_handle.meta_ref().file_id
175 );
176 break;
177 }
178
179 let task = self.new_index_build_task(
180 ®ion,
181 file_handle.clone(),
182 request.build_type.clone(),
183 tx.clone(),
184 );
185 let _ = self
186 .index_build_scheduler
187 .schedule_build(®ion.version_control, task)
188 .await;
189 }
190 common_runtime::spawn_global(async move {
192 for _ in 0..num_tasks {
193 if let Some(Err(e)) = rx.recv().await {
194 warn!(e; "Index build task failed for region: {}", region_id);
195 sender.send(Err(e));
196 return;
197 }
198 }
199 sender.send(Ok(0));
200 });
201 }
202
203 pub(crate) async fn handle_index_build_finished(
204 &mut self,
205 region_id: RegionId,
206 request: IndexBuildFinished,
207 ) {
208 let region = match self.regions.get_region(region_id) {
209 Some(region) => region,
210 None => {
211 warn!(
212 "Region not found for index build finished, region_id: {}",
213 region_id
214 );
215 return;
216 }
217 };
218
219 let cache_strategy = CacheStrategy::EnableAll(self.cache_manager.clone());
221 for file_meta in &request.edit.files_to_add {
222 let region_file_id = RegionFileId::new(region_id, file_meta.file_id);
223 let index_id = RegionIndexId::new(region_file_id, file_meta.index_version);
224 cache_strategy.evict_puffin_cache(index_id).await;
225 }
226
227 region.version_control.apply_edit(
228 Some(request.edit.clone()),
229 &[],
230 region.file_purger.clone(),
231 );
232
233 for file_meta in &request.edit.files_to_add {
234 self.listener
235 .on_index_build_finish(RegionFileId::new(region_id, file_meta.file_id))
236 .await;
237 }
238 }
239
240 pub(crate) async fn handle_index_build_failed(
241 &mut self,
242 region_id: RegionId,
243 request: IndexBuildFailed,
244 ) {
245 error!(request.err; "Index build failed for region: {}", region_id);
246 self.index_build_scheduler
247 .on_failure(region_id, request.err.clone())
248 .await;
249 }
250
251 pub(crate) async fn handle_index_build_stopped(
252 &mut self,
253 region_id: RegionId,
254 request: IndexBuildStopped,
255 ) {
256 let Some(region) = self.regions.get_region(region_id) else {
257 warn!(
258 "Region not found for index build stopped, region_id: {}",
259 region_id
260 );
261 return;
262 };
263 self.index_build_scheduler.on_task_stopped(
264 region_id,
265 request.file_id,
266 ®ion.version_control,
267 );
268 }
269}