mito2/worker/
handle_rebuild_index.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handles index build requests.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use common_telemetry::{debug, error, warn};
21use store_api::region_request::RegionBuildIndexRequest;
22use store_api::storage::{FileId, RegionId};
23use tokio::sync::mpsc;
24
25use crate::cache::CacheStrategy;
26use crate::error::Result;
27use crate::region::MitoRegionRef;
28use crate::request::{
29    BuildIndexRequest, IndexBuildFailed, IndexBuildFinished, IndexBuildStopped, OptionOutputTx,
30};
31use crate::sst::file::{FileHandle, RegionFileId};
32use crate::sst::index::{
33    IndexBuildOutcome, IndexBuildTask, IndexBuildType, IndexerBuilderImpl, ResultMpscSender,
34};
35use crate::sst::parquet::WriteOptions;
36use crate::worker::RegionWorkerLoop;
37
38impl<S> RegionWorkerLoop<S> {
39    pub(crate) fn new_index_build_task(
40        &self,
41        region: &MitoRegionRef,
42        file: FileHandle,
43        build_type: IndexBuildType,
44        result_sender: ResultMpscSender,
45    ) -> IndexBuildTask {
46        let version = region.version();
47        let access_layer = region.access_layer.clone();
48
49        let puffin_manager = if let Some(write_cache) = self.cache_manager.write_cache() {
50            write_cache.build_puffin_manager()
51        } else {
52            access_layer.build_puffin_manager()
53        };
54
55        let intermediate_manager = if let Some(write_cache) = self.cache_manager.write_cache() {
56            write_cache.intermediate_manager().clone()
57        } else {
58            access_layer.intermediate_manager().clone()
59        };
60
61        let indexer_builder_ref = Arc::new(IndexerBuilderImpl {
62            build_type: build_type.clone(),
63            metadata: version.metadata.clone(),
64            inverted_index_config: self.config.inverted_index.clone(),
65            fulltext_index_config: self.config.fulltext_index.clone(),
66            bloom_filter_index_config: self.config.bloom_filter_index.clone(),
67            index_options: version.options.index_options.clone(),
68            row_group_size: WriteOptions::default().row_group_size,
69            intermediate_manager,
70            puffin_manager,
71        });
72
73        IndexBuildTask {
74            file_meta: file.meta_ref().clone(),
75            reason: build_type,
76            access_layer: access_layer.clone(),
77            listener: self.listener.clone(),
78            manifest_ctx: region.manifest_ctx.clone(),
79            write_cache: self.cache_manager.write_cache().cloned(),
80            file_purger: file.file_purger(),
81            request_sender: self.sender.clone(),
82            indexer_builder: indexer_builder_ref.clone(),
83            result_sender,
84        }
85    }
86
87    /// Handles manual build index requests.
88    /// TODO(SNC123): Support admin function of manual index building later.
89    pub(crate) async fn handle_build_index_request(
90        &mut self,
91        region_id: RegionId,
92        _req: RegionBuildIndexRequest,
93        sender: OptionOutputTx,
94    ) {
95        self.handle_rebuild_index(
96            BuildIndexRequest {
97                region_id,
98                build_type: IndexBuildType::Manual,
99                file_metas: Vec::new(),
100            },
101            sender,
102        )
103        .await;
104    }
105
106    pub(crate) async fn handle_rebuild_index(
107        &mut self,
108        request: BuildIndexRequest,
109        mut sender: OptionOutputTx,
110    ) {
111        let region_id = request.region_id;
112        let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
113            return;
114        };
115
116        let version_control = region.version_control.clone();
117        let version = version_control.current().version;
118
119        let all_files: HashMap<FileId, FileHandle> = version
120            .ssts
121            .levels()
122            .iter()
123            .flat_map(|level| level.files.iter())
124            .filter(|(_, handle)| !handle.is_deleted() && !handle.compacting())
125            .map(|(id, handle)| (*id, handle.clone()))
126            .collect();
127
128        let build_tasks = if request.file_metas.is_empty() {
129            // NOTE: Currently, rebuilding the index will reconstruct the index for all
130            // files in the region, which is a simplified approach and is not yet available for
131            // production use; further optimization is required.
132            all_files.values().cloned().collect::<Vec<_>>()
133        } else {
134            request
135                .file_metas
136                .iter()
137                .filter_map(|meta| all_files.get(&meta.file_id).cloned())
138                .collect::<Vec<_>>()
139        };
140
141        if build_tasks.is_empty() {
142            debug!(
143                "No files need to build index for region {}, request: {:?}",
144                region_id, request
145            );
146            sender.send(Ok(0));
147            return;
148        }
149
150        let num_tasks = build_tasks.len();
151        let (tx, mut rx) = mpsc::channel::<Result<IndexBuildOutcome>>(num_tasks);
152
153        for file_handle in build_tasks {
154            debug!(
155                "Scheduling index build for region {}, file_id {}",
156                region_id,
157                file_handle.meta_ref().file_id
158            );
159
160            if region.should_abort_index() {
161                warn!(
162                    "Region {} is in state {:?}, abort index rebuild process for file_id {}",
163                    region_id,
164                    region.state(),
165                    file_handle.meta_ref().file_id
166                );
167                break;
168            }
169
170            let task = self.new_index_build_task(
171                &region,
172                file_handle.clone(),
173                request.build_type.clone(),
174                tx.clone(),
175            );
176            let _ = self
177                .index_build_scheduler
178                .schedule_build(&region.version_control, task)
179                .await;
180        }
181        // Wait for all index build tasks to finish and notify the caller.
182        common_runtime::spawn_global(async move {
183            for _ in 0..num_tasks {
184                if let Some(Err(e)) = rx.recv().await {
185                    warn!(e; "Index build task failed for region: {}", region_id);
186                    sender.send(Err(e));
187                    return;
188                }
189            }
190            sender.send(Ok(0));
191        });
192    }
193
194    pub(crate) async fn handle_index_build_finished(
195        &mut self,
196        region_id: RegionId,
197        request: IndexBuildFinished,
198    ) {
199        let region = match self.regions.get_region(region_id) {
200            Some(region) => region,
201            None => {
202                warn!(
203                    "Region not found for index build finished, region_id: {}",
204                    region_id
205                );
206                return;
207            }
208        };
209
210        // Clean old puffin-related cache for all rebuilt files.
211        let cache_strategy = CacheStrategy::EnableAll(self.cache_manager.clone());
212        for file_meta in &request.edit.files_to_add {
213            let region_file_id = RegionFileId::new(region_id, file_meta.file_id);
214            cache_strategy.evict_puffin_cache(region_file_id).await;
215        }
216
217        region.version_control.apply_edit(
218            Some(request.edit.clone()),
219            &[],
220            region.file_purger.clone(),
221        );
222
223        for file_meta in &request.edit.files_to_add {
224            self.listener
225                .on_index_build_finish(RegionFileId::new(region_id, file_meta.file_id))
226                .await;
227        }
228    }
229
230    pub(crate) async fn handle_index_build_failed(
231        &mut self,
232        region_id: RegionId,
233        request: IndexBuildFailed,
234    ) {
235        error!(request.err; "Index build failed for region: {}", region_id);
236        self.index_build_scheduler
237            .on_failure(region_id, request.err.clone())
238            .await;
239    }
240
241    pub(crate) async fn handle_index_build_stopped(
242        &mut self,
243        region_id: RegionId,
244        request: IndexBuildStopped,
245    ) {
246        let Some(region) = self.regions.get_region(region_id) else {
247            warn!(
248                "Region not found for index build stopped, region_id: {}",
249                region_id
250            );
251            return;
252        };
253        self.index_build_scheduler.on_task_stopped(
254            region_id,
255            request.file_id,
256            &region.version_control,
257        );
258    }
259}