mito2/worker/
handle_rebuild_index.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handles index build requests.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use common_telemetry::{debug, error, warn};
21use store_api::region_request::RegionBuildIndexRequest;
22use store_api::storage::{FileId, RegionId};
23use tokio::sync::mpsc;
24
25use crate::cache::CacheStrategy;
26use crate::error::Result;
27use crate::region::MitoRegionRef;
28use crate::request::{
29    BuildIndexRequest, IndexBuildFailed, IndexBuildFinished, IndexBuildStopped, OptionOutputTx,
30};
31use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId};
32use crate::sst::index::{
33    IndexBuildOutcome, IndexBuildTask, IndexBuildType, IndexerBuilderImpl, ResultMpscSender,
34};
35use crate::sst::parquet::WriteOptions;
36use crate::worker::RegionWorkerLoop;
37
38impl<S> RegionWorkerLoop<S> {
39    pub(crate) fn new_index_build_task(
40        &self,
41        region: &MitoRegionRef,
42        file: FileHandle,
43        build_type: IndexBuildType,
44        result_sender: ResultMpscSender,
45    ) -> IndexBuildTask {
46        let version = region.version();
47        let access_layer = region.access_layer.clone();
48
49        let puffin_manager = if let Some(write_cache) = self.cache_manager.write_cache() {
50            write_cache.build_puffin_manager()
51        } else {
52            access_layer.build_puffin_manager()
53        };
54
55        let intermediate_manager = if let Some(write_cache) = self.cache_manager.write_cache() {
56            write_cache.intermediate_manager().clone()
57        } else {
58            access_layer.intermediate_manager().clone()
59        };
60
61        let indexer_builder_ref = Arc::new(IndexerBuilderImpl {
62            build_type: build_type.clone(),
63            metadata: version.metadata.clone(),
64            inverted_index_config: self.config.inverted_index.clone(),
65            fulltext_index_config: self.config.fulltext_index.clone(),
66            bloom_filter_index_config: self.config.bloom_filter_index.clone(),
67            #[cfg(feature = "vector_index")]
68            vector_index_config: self.config.vector_index.clone(),
69            index_options: version.options.index_options.clone(),
70            row_group_size: WriteOptions::default().row_group_size,
71            intermediate_manager,
72            puffin_manager,
73            write_cache_enabled: self.cache_manager.write_cache().is_some(),
74        });
75
76        IndexBuildTask {
77            file: file.clone(),
78            file_meta: file.meta_ref().clone(),
79            reason: build_type,
80            access_layer: access_layer.clone(),
81            listener: self.listener.clone(),
82            manifest_ctx: region.manifest_ctx.clone(),
83            write_cache: self.cache_manager.write_cache().cloned(),
84            file_purger: file.file_purger(),
85            request_sender: self.sender.clone(),
86            indexer_builder: indexer_builder_ref.clone(),
87            result_sender,
88        }
89    }
90
91    /// Handles manual build index requests.
92    pub(crate) async fn handle_build_index_request(
93        &mut self,
94        region_id: RegionId,
95        _req: RegionBuildIndexRequest,
96        sender: OptionOutputTx,
97    ) {
98        self.handle_rebuild_index(
99            BuildIndexRequest {
100                region_id,
101                build_type: IndexBuildType::Manual,
102                file_metas: Vec::new(),
103            },
104            sender,
105        )
106        .await;
107    }
108
109    pub(crate) async fn handle_rebuild_index(
110        &mut self,
111        request: BuildIndexRequest,
112        mut sender: OptionOutputTx,
113    ) {
114        let region_id = request.region_id;
115        let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
116            return;
117        };
118
119        let version_control = region.version_control.clone();
120        let version = version_control.current().version;
121
122        let all_files: HashMap<FileId, FileHandle> = version
123            .ssts
124            .levels()
125            .iter()
126            .flat_map(|level| level.files.iter())
127            .filter(|(_, handle)| !handle.is_deleted() && !handle.compacting())
128            .map(|(id, handle)| (*id, handle.clone()))
129            .collect();
130
131        let build_tasks = if request.file_metas.is_empty() {
132            // If no specific files are provided, find files whose index is inconsistent with the region metadata.
133            all_files
134                .values()
135                .filter(|file| {
136                    !file
137                        .meta_ref()
138                        .is_index_consistent_with_region(&version.metadata.column_metadatas)
139                })
140                .cloned()
141                .collect::<Vec<_>>()
142        } else {
143            request
144                .file_metas
145                .iter()
146                .filter_map(|meta| all_files.get(&meta.file_id).cloned())
147                .collect::<Vec<_>>()
148        };
149
150        if build_tasks.is_empty() {
151            debug!(
152                "No files need to build index for region {}, request: {:?}",
153                region_id, request
154            );
155            sender.send(Ok(0));
156            return;
157        }
158
159        let num_tasks = build_tasks.len();
160        let (tx, mut rx) = mpsc::channel::<Result<IndexBuildOutcome>>(num_tasks);
161
162        for file_handle in build_tasks {
163            debug!(
164                "Scheduling index build for region {}, file_id {}",
165                region_id,
166                file_handle.meta_ref().file_id
167            );
168
169            if region.should_abort_index() {
170                warn!(
171                    "Region {} is in state {:?}, abort index rebuild process for file_id {}",
172                    region_id,
173                    region.state(),
174                    file_handle.meta_ref().file_id
175                );
176                break;
177            }
178
179            let task = self.new_index_build_task(
180                &region,
181                file_handle.clone(),
182                request.build_type.clone(),
183                tx.clone(),
184            );
185            let _ = self
186                .index_build_scheduler
187                .schedule_build(&region.version_control, task)
188                .await;
189        }
190        // Wait for all index build tasks to finish and notify the caller.
191        common_runtime::spawn_global(async move {
192            for _ in 0..num_tasks {
193                if let Some(Err(e)) = rx.recv().await {
194                    warn!(e; "Index build task failed for region: {}", region_id);
195                    sender.send(Err(e));
196                    return;
197                }
198            }
199            sender.send(Ok(0));
200        });
201    }
202
203    pub(crate) async fn handle_index_build_finished(
204        &mut self,
205        region_id: RegionId,
206        request: IndexBuildFinished,
207    ) {
208        let region = match self.regions.get_region(region_id) {
209            Some(region) => region,
210            None => {
211                warn!(
212                    "Region not found for index build finished, region_id: {}",
213                    region_id
214                );
215                return;
216            }
217        };
218
219        // Clean old puffin-related cache for all rebuilt files.
220        let cache_strategy = CacheStrategy::EnableAll(self.cache_manager.clone());
221        for file_meta in &request.edit.files_to_add {
222            let region_file_id = RegionFileId::new(region_id, file_meta.file_id);
223            let index_id = RegionIndexId::new(region_file_id, file_meta.index_version);
224            cache_strategy.evict_puffin_cache(index_id).await;
225        }
226
227        region.version_control.apply_edit(
228            Some(request.edit.clone()),
229            &[],
230            region.file_purger.clone(),
231        );
232
233        for file_meta in &request.edit.files_to_add {
234            self.listener
235                .on_index_build_finish(RegionFileId::new(region_id, file_meta.file_id))
236                .await;
237        }
238    }
239
240    pub(crate) async fn handle_index_build_failed(
241        &mut self,
242        region_id: RegionId,
243        request: IndexBuildFailed,
244    ) {
245        error!(request.err; "Index build failed for region: {}", region_id);
246        self.index_build_scheduler
247            .on_failure(region_id, request.err.clone())
248            .await;
249    }
250
251    pub(crate) async fn handle_index_build_stopped(
252        &mut self,
253        region_id: RegionId,
254        request: IndexBuildStopped,
255    ) {
256        let Some(region) = self.regions.get_region(region_id) else {
257            warn!(
258                "Region not found for index build stopped, region_id: {}",
259                region_id
260            );
261            return;
262        };
263        self.index_build_scheduler.on_task_stopped(
264            region_id,
265            request.file_id,
266            &region.version_control,
267        );
268    }
269}