1use std::collections::HashMap;
18use std::sync::Arc;
19
20use common_telemetry::{debug, error, warn};
21use store_api::region_request::RegionBuildIndexRequest;
22use store_api::storage::{FileId, RegionId};
23use tokio::sync::mpsc;
24
25use crate::cache::CacheStrategy;
26use crate::error::Result;
27use crate::region::MitoRegionRef;
28use crate::request::{
29 BuildIndexRequest, IndexBuildFailed, IndexBuildFinished, IndexBuildStopped, OptionOutputTx,
30};
31use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId};
32use crate::sst::index::{
33 IndexBuildOutcome, IndexBuildTask, IndexBuildType, IndexerBuilderImpl, ResultMpscSender,
34};
35use crate::sst::parquet::WriteOptions;
36use crate::worker::RegionWorkerLoop;
37
38impl<S> RegionWorkerLoop<S> {
39 pub(crate) fn new_index_build_task(
40 &self,
41 region: &MitoRegionRef,
42 file: FileHandle,
43 build_type: IndexBuildType,
44 result_sender: ResultMpscSender,
45 ) -> IndexBuildTask {
46 let version = region.version();
47 let access_layer = region.access_layer.clone();
48
49 let puffin_manager = if let Some(write_cache) = self.cache_manager.write_cache() {
50 write_cache.build_puffin_manager()
51 } else {
52 access_layer.build_puffin_manager()
53 };
54
55 let intermediate_manager = if let Some(write_cache) = self.cache_manager.write_cache() {
56 write_cache.intermediate_manager().clone()
57 } else {
58 access_layer.intermediate_manager().clone()
59 };
60
61 let indexer_builder_ref = Arc::new(IndexerBuilderImpl {
62 build_type: build_type.clone(),
63 metadata: version.metadata.clone(),
64 inverted_index_config: self.config.inverted_index.clone(),
65 fulltext_index_config: self.config.fulltext_index.clone(),
66 bloom_filter_index_config: self.config.bloom_filter_index.clone(),
67 index_options: version.options.index_options.clone(),
68 row_group_size: WriteOptions::default().row_group_size,
69 intermediate_manager,
70 puffin_manager,
71 write_cache_enabled: self.cache_manager.write_cache().is_some(),
72 });
73
74 IndexBuildTask {
75 file: file.clone(),
76 file_meta: file.meta_ref().clone(),
77 reason: build_type,
78 access_layer: access_layer.clone(),
79 listener: self.listener.clone(),
80 manifest_ctx: region.manifest_ctx.clone(),
81 write_cache: self.cache_manager.write_cache().cloned(),
82 file_purger: file.file_purger(),
83 request_sender: self.sender.clone(),
84 indexer_builder: indexer_builder_ref.clone(),
85 result_sender,
86 }
87 }
88
89 pub(crate) async fn handle_build_index_request(
91 &mut self,
92 region_id: RegionId,
93 _req: RegionBuildIndexRequest,
94 sender: OptionOutputTx,
95 ) {
96 self.handle_rebuild_index(
97 BuildIndexRequest {
98 region_id,
99 build_type: IndexBuildType::Manual,
100 file_metas: Vec::new(),
101 },
102 sender,
103 )
104 .await;
105 }
106
107 pub(crate) async fn handle_rebuild_index(
108 &mut self,
109 request: BuildIndexRequest,
110 mut sender: OptionOutputTx,
111 ) {
112 let region_id = request.region_id;
113 let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
114 return;
115 };
116
117 let version_control = region.version_control.clone();
118 let version = version_control.current().version;
119
120 let all_files: HashMap<FileId, FileHandle> = version
121 .ssts
122 .levels()
123 .iter()
124 .flat_map(|level| level.files.iter())
125 .filter(|(_, handle)| !handle.is_deleted() && !handle.compacting())
126 .map(|(id, handle)| (*id, handle.clone()))
127 .collect();
128
129 let build_tasks = if request.file_metas.is_empty() {
130 all_files
132 .values()
133 .filter(|file| {
134 !file
135 .meta_ref()
136 .is_index_consistent_with_region(&version.metadata.column_metadatas)
137 })
138 .cloned()
139 .collect::<Vec<_>>()
140 } else {
141 request
142 .file_metas
143 .iter()
144 .filter_map(|meta| all_files.get(&meta.file_id).cloned())
145 .collect::<Vec<_>>()
146 };
147
148 if build_tasks.is_empty() {
149 debug!(
150 "No files need to build index for region {}, request: {:?}",
151 region_id, request
152 );
153 sender.send(Ok(0));
154 return;
155 }
156
157 let num_tasks = build_tasks.len();
158 let (tx, mut rx) = mpsc::channel::<Result<IndexBuildOutcome>>(num_tasks);
159
160 for file_handle in build_tasks {
161 debug!(
162 "Scheduling index build for region {}, file_id {}",
163 region_id,
164 file_handle.meta_ref().file_id
165 );
166
167 if region.should_abort_index() {
168 warn!(
169 "Region {} is in state {:?}, abort index rebuild process for file_id {}",
170 region_id,
171 region.state(),
172 file_handle.meta_ref().file_id
173 );
174 break;
175 }
176
177 let task = self.new_index_build_task(
178 ®ion,
179 file_handle.clone(),
180 request.build_type.clone(),
181 tx.clone(),
182 );
183 let _ = self
184 .index_build_scheduler
185 .schedule_build(®ion.version_control, task)
186 .await;
187 }
188 common_runtime::spawn_global(async move {
190 for _ in 0..num_tasks {
191 if let Some(Err(e)) = rx.recv().await {
192 warn!(e; "Index build task failed for region: {}", region_id);
193 sender.send(Err(e));
194 return;
195 }
196 }
197 sender.send(Ok(0));
198 });
199 }
200
201 pub(crate) async fn handle_index_build_finished(
202 &mut self,
203 region_id: RegionId,
204 request: IndexBuildFinished,
205 ) {
206 let region = match self.regions.get_region(region_id) {
207 Some(region) => region,
208 None => {
209 warn!(
210 "Region not found for index build finished, region_id: {}",
211 region_id
212 );
213 return;
214 }
215 };
216
217 let cache_strategy = CacheStrategy::EnableAll(self.cache_manager.clone());
219 for file_meta in &request.edit.files_to_add {
220 let region_file_id = RegionFileId::new(region_id, file_meta.file_id);
221 let index_id = RegionIndexId::new(region_file_id, file_meta.index_version);
222 cache_strategy.evict_puffin_cache(index_id).await;
223 }
224
225 region.version_control.apply_edit(
226 Some(request.edit.clone()),
227 &[],
228 region.file_purger.clone(),
229 );
230
231 for file_meta in &request.edit.files_to_add {
232 self.listener
233 .on_index_build_finish(RegionFileId::new(region_id, file_meta.file_id))
234 .await;
235 }
236 }
237
238 pub(crate) async fn handle_index_build_failed(
239 &mut self,
240 region_id: RegionId,
241 request: IndexBuildFailed,
242 ) {
243 error!(request.err; "Index build failed for region: {}", region_id);
244 self.index_build_scheduler
245 .on_failure(region_id, request.err.clone())
246 .await;
247 }
248
249 pub(crate) async fn handle_index_build_stopped(
250 &mut self,
251 region_id: RegionId,
252 request: IndexBuildStopped,
253 ) {
254 let Some(region) = self.regions.get_region(region_id) else {
255 warn!(
256 "Region not found for index build stopped, region_id: {}",
257 region_id
258 );
259 return;
260 };
261 self.index_build_scheduler.on_task_stopped(
262 region_id,
263 request.file_id,
264 ®ion.version_control,
265 );
266 }
267}