mito2/worker/
handle_rebuild_index.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handles index build requests.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use common_telemetry::{error, warn};
21use store_api::storage::{FileId, RegionId};
22use tokio::sync::oneshot;
23
24use crate::region::MitoRegionRef;
25use crate::request::{IndexBuildFailed, IndexBuildFinished, RegionBuildIndexRequest};
26use crate::sst::file::FileHandle;
27use crate::sst::index::{IndexBuildOutcome, IndexBuildTask, IndexBuildType, IndexerBuilderImpl};
28use crate::sst::parquet::WriteOptions;
29use crate::worker::RegionWorkerLoop;
30
31impl<S> RegionWorkerLoop<S> {
32    pub(crate) fn new_index_build_task(
33        &self,
34        region: &MitoRegionRef,
35        file: FileHandle,
36        build_type: IndexBuildType,
37        result_sender: Option<oneshot::Sender<IndexBuildOutcome>>,
38    ) -> IndexBuildTask {
39        let version = region.version();
40        let access_layer = region.access_layer.clone();
41
42        let puffin_manager = if let Some(write_cache) = self.cache_manager.write_cache() {
43            write_cache.build_puffin_manager()
44        } else {
45            access_layer.build_puffin_manager()
46        };
47
48        let intermediate_manager = if let Some(write_cache) = self.cache_manager.write_cache() {
49            write_cache.intermediate_manager().clone()
50        } else {
51            access_layer.intermediate_manager().clone()
52        };
53
54        let indexer_builder_ref = Arc::new(IndexerBuilderImpl {
55            build_type: build_type.clone(),
56            metadata: version.metadata.clone(),
57            inverted_index_config: self.config.inverted_index.clone(),
58            fulltext_index_config: self.config.fulltext_index.clone(),
59            bloom_filter_index_config: self.config.bloom_filter_index.clone(),
60            index_options: version.options.index_options.clone(),
61            row_group_size: WriteOptions::default().row_group_size,
62            intermediate_manager,
63            puffin_manager,
64        });
65
66        IndexBuildTask {
67            file_meta: file.meta_ref().clone(),
68            reason: build_type,
69            access_layer: access_layer.clone(),
70            manifest_ctx: region.manifest_ctx.clone(),
71            write_cache: self.cache_manager.write_cache().cloned(),
72            file_purger: file.file_purger(),
73            request_sender: self.sender.clone(),
74            indexer_builder: indexer_builder_ref.clone(),
75            result_sender,
76        }
77    }
78
79    pub(crate) async fn handle_rebuild_index(&mut self, request: RegionBuildIndexRequest) {
80        let region_id = request.region_id;
81        let Some(region) = self.regions.get_region(region_id) else {
82            return;
83        };
84
85        let version_control = region.version_control.clone();
86        let version = version_control.current().version;
87
88        let all_files: HashMap<FileId, FileHandle> = version
89            .ssts
90            .levels()
91            .iter()
92            .flat_map(|level| level.files.iter())
93            .filter(|(_, handle)| !handle.is_deleted() && !handle.compacting())
94            .map(|(id, handle)| (*id, handle.clone()))
95            .collect();
96
97        let build_tasks = if request.file_metas.is_empty() {
98            // NOTE: Currently, rebuilding the index will reconstruct the index for all
99            // files in the region, which is a simplified approach and is not yet available for
100            // production use; further optimization is required.
101            all_files.values().cloned().collect::<Vec<_>>()
102        } else {
103            request
104                .file_metas
105                .iter()
106                .filter_map(|meta| all_files.get(&meta.file_id).cloned())
107                .collect::<Vec<_>>()
108        };
109
110        for file_handle in build_tasks {
111            let task =
112                self.new_index_build_task(&region, file_handle, request.build_type.clone(), None);
113            let _ = self
114                .index_build_scheduler
115                .schedule_build(&region.version_control, task);
116        }
117    }
118
119    pub(crate) async fn handle_index_build_finished(
120        &mut self,
121        region_id: RegionId,
122        request: IndexBuildFinished,
123    ) {
124        let region = match self.regions.get_region(region_id) {
125            Some(region) => region,
126            None => {
127                warn!(
128                    "Region not found for index build finished, region_id: {}",
129                    region_id
130                );
131                return;
132            }
133        };
134        region.version_control.apply_edit(
135            Some(request.edit.clone()),
136            &[],
137            region.file_purger.clone(),
138        );
139    }
140
141    pub(crate) async fn handle_index_build_failed(
142        &mut self,
143        region_id: RegionId,
144        request: IndexBuildFailed,
145    ) {
146        error!(request.err; "Index build failed for region: {}", region_id);
147        // TODO(SNC123): Implement error handling logic after IndexBuildScheduler optimization.
148    }
149}