mito2/worker/
handle_rebuild_index.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handles index build requests.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use common_telemetry::{debug, error, warn};
21use store_api::region_request::RegionBuildIndexRequest;
22use store_api::storage::{FileId, RegionId};
23use tokio::sync::mpsc;
24
25use crate::cache::CacheStrategy;
26use crate::error::Result;
27use crate::region::MitoRegionRef;
28use crate::request::{
29    BuildIndexRequest, IndexBuildFailed, IndexBuildFinished, IndexBuildStopped, OptionOutputTx,
30};
31use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId};
32use crate::sst::index::{
33    IndexBuildOutcome, IndexBuildTask, IndexBuildType, IndexerBuilderImpl, ResultMpscSender,
34};
35use crate::sst::parquet::WriteOptions;
36use crate::worker::RegionWorkerLoop;
37
38impl<S> RegionWorkerLoop<S> {
39    pub(crate) fn new_index_build_task(
40        &self,
41        region: &MitoRegionRef,
42        file: FileHandle,
43        build_type: IndexBuildType,
44        result_sender: ResultMpscSender,
45    ) -> IndexBuildTask {
46        let version = region.version();
47        let access_layer = region.access_layer.clone();
48
49        let puffin_manager = if let Some(write_cache) = self.cache_manager.write_cache() {
50            write_cache.build_puffin_manager()
51        } else {
52            access_layer.build_puffin_manager()
53        };
54
55        let intermediate_manager = if let Some(write_cache) = self.cache_manager.write_cache() {
56            write_cache.intermediate_manager().clone()
57        } else {
58            access_layer.intermediate_manager().clone()
59        };
60
61        let indexer_builder_ref = Arc::new(IndexerBuilderImpl {
62            build_type: build_type.clone(),
63            metadata: version.metadata.clone(),
64            inverted_index_config: self.config.inverted_index.clone(),
65            fulltext_index_config: self.config.fulltext_index.clone(),
66            bloom_filter_index_config: self.config.bloom_filter_index.clone(),
67            index_options: version.options.index_options.clone(),
68            row_group_size: WriteOptions::default().row_group_size,
69            intermediate_manager,
70            puffin_manager,
71            write_cache_enabled: self.cache_manager.write_cache().is_some(),
72        });
73
74        IndexBuildTask {
75            file: file.clone(),
76            file_meta: file.meta_ref().clone(),
77            reason: build_type,
78            access_layer: access_layer.clone(),
79            listener: self.listener.clone(),
80            manifest_ctx: region.manifest_ctx.clone(),
81            write_cache: self.cache_manager.write_cache().cloned(),
82            file_purger: file.file_purger(),
83            request_sender: self.sender.clone(),
84            indexer_builder: indexer_builder_ref.clone(),
85            result_sender,
86        }
87    }
88
89    /// Handles manual build index requests.
90    pub(crate) async fn handle_build_index_request(
91        &mut self,
92        region_id: RegionId,
93        _req: RegionBuildIndexRequest,
94        sender: OptionOutputTx,
95    ) {
96        self.handle_rebuild_index(
97            BuildIndexRequest {
98                region_id,
99                build_type: IndexBuildType::Manual,
100                file_metas: Vec::new(),
101            },
102            sender,
103        )
104        .await;
105    }
106
107    pub(crate) async fn handle_rebuild_index(
108        &mut self,
109        request: BuildIndexRequest,
110        mut sender: OptionOutputTx,
111    ) {
112        let region_id = request.region_id;
113        let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
114            return;
115        };
116
117        let version_control = region.version_control.clone();
118        let version = version_control.current().version;
119
120        let all_files: HashMap<FileId, FileHandle> = version
121            .ssts
122            .levels()
123            .iter()
124            .flat_map(|level| level.files.iter())
125            .filter(|(_, handle)| !handle.is_deleted() && !handle.compacting())
126            .map(|(id, handle)| (*id, handle.clone()))
127            .collect();
128
129        let build_tasks = if request.file_metas.is_empty() {
130            // If no specific files are provided, find files whose index is inconsistent with the region metadata.
131            all_files
132                .values()
133                .filter(|file| {
134                    !file
135                        .meta_ref()
136                        .is_index_consistent_with_region(&version.metadata.column_metadatas)
137                })
138                .cloned()
139                .collect::<Vec<_>>()
140        } else {
141            request
142                .file_metas
143                .iter()
144                .filter_map(|meta| all_files.get(&meta.file_id).cloned())
145                .collect::<Vec<_>>()
146        };
147
148        if build_tasks.is_empty() {
149            debug!(
150                "No files need to build index for region {}, request: {:?}",
151                region_id, request
152            );
153            sender.send(Ok(0));
154            return;
155        }
156
157        let num_tasks = build_tasks.len();
158        let (tx, mut rx) = mpsc::channel::<Result<IndexBuildOutcome>>(num_tasks);
159
160        for file_handle in build_tasks {
161            debug!(
162                "Scheduling index build for region {}, file_id {}",
163                region_id,
164                file_handle.meta_ref().file_id
165            );
166
167            if region.should_abort_index() {
168                warn!(
169                    "Region {} is in state {:?}, abort index rebuild process for file_id {}",
170                    region_id,
171                    region.state(),
172                    file_handle.meta_ref().file_id
173                );
174                break;
175            }
176
177            let task = self.new_index_build_task(
178                &region,
179                file_handle.clone(),
180                request.build_type.clone(),
181                tx.clone(),
182            );
183            let _ = self
184                .index_build_scheduler
185                .schedule_build(&region.version_control, task)
186                .await;
187        }
188        // Wait for all index build tasks to finish and notify the caller.
189        common_runtime::spawn_global(async move {
190            for _ in 0..num_tasks {
191                if let Some(Err(e)) = rx.recv().await {
192                    warn!(e; "Index build task failed for region: {}", region_id);
193                    sender.send(Err(e));
194                    return;
195                }
196            }
197            sender.send(Ok(0));
198        });
199    }
200
201    pub(crate) async fn handle_index_build_finished(
202        &mut self,
203        region_id: RegionId,
204        request: IndexBuildFinished,
205    ) {
206        let region = match self.regions.get_region(region_id) {
207            Some(region) => region,
208            None => {
209                warn!(
210                    "Region not found for index build finished, region_id: {}",
211                    region_id
212                );
213                return;
214            }
215        };
216
217        // Clean old puffin-related cache for all rebuilt files.
218        let cache_strategy = CacheStrategy::EnableAll(self.cache_manager.clone());
219        for file_meta in &request.edit.files_to_add {
220            let region_file_id = RegionFileId::new(region_id, file_meta.file_id);
221            let index_id = RegionIndexId::new(region_file_id, file_meta.index_version);
222            cache_strategy.evict_puffin_cache(index_id).await;
223        }
224
225        region.version_control.apply_edit(
226            Some(request.edit.clone()),
227            &[],
228            region.file_purger.clone(),
229        );
230
231        for file_meta in &request.edit.files_to_add {
232            self.listener
233                .on_index_build_finish(RegionFileId::new(region_id, file_meta.file_id))
234                .await;
235        }
236    }
237
238    pub(crate) async fn handle_index_build_failed(
239        &mut self,
240        region_id: RegionId,
241        request: IndexBuildFailed,
242    ) {
243        error!(request.err; "Index build failed for region: {}", region_id);
244        self.index_build_scheduler
245            .on_failure(region_id, request.err.clone())
246            .await;
247    }
248
249    pub(crate) async fn handle_index_build_stopped(
250        &mut self,
251        region_id: RegionId,
252        request: IndexBuildStopped,
253    ) {
254        let Some(region) = self.regions.get_region(region_id) else {
255            warn!(
256                "Region not found for index build stopped, region_id: {}",
257                region_id
258            );
259            return;
260        };
261        self.index_build_scheduler.on_task_stopped(
262            region_id,
263            request.file_id,
264            &region.version_control,
265        );
266    }
267}