mito2/worker/
handle_rebuild_index.rs1use std::collections::HashMap;
18use std::sync::Arc;
19
20use common_telemetry::{error, warn};
21use store_api::storage::{FileId, RegionId};
22use tokio::sync::oneshot;
23
24use crate::region::MitoRegionRef;
25use crate::request::{IndexBuildFailed, IndexBuildFinished, RegionBuildIndexRequest};
26use crate::sst::file::FileHandle;
27use crate::sst::index::{IndexBuildOutcome, IndexBuildTask, IndexBuildType, IndexerBuilderImpl};
28use crate::sst::parquet::WriteOptions;
29use crate::worker::RegionWorkerLoop;
30
31impl<S> RegionWorkerLoop<S> {
32 pub(crate) fn new_index_build_task(
33 &self,
34 region: &MitoRegionRef,
35 file: FileHandle,
36 build_type: IndexBuildType,
37 result_sender: Option<oneshot::Sender<IndexBuildOutcome>>,
38 ) -> IndexBuildTask {
39 let version = region.version();
40 let access_layer = region.access_layer.clone();
41
42 let puffin_manager = if let Some(write_cache) = self.cache_manager.write_cache() {
43 write_cache.build_puffin_manager()
44 } else {
45 access_layer.build_puffin_manager()
46 };
47
48 let intermediate_manager = if let Some(write_cache) = self.cache_manager.write_cache() {
49 write_cache.intermediate_manager().clone()
50 } else {
51 access_layer.intermediate_manager().clone()
52 };
53
54 let indexer_builder_ref = Arc::new(IndexerBuilderImpl {
55 build_type: build_type.clone(),
56 metadata: version.metadata.clone(),
57 inverted_index_config: self.config.inverted_index.clone(),
58 fulltext_index_config: self.config.fulltext_index.clone(),
59 bloom_filter_index_config: self.config.bloom_filter_index.clone(),
60 index_options: version.options.index_options.clone(),
61 row_group_size: WriteOptions::default().row_group_size,
62 intermediate_manager,
63 puffin_manager,
64 });
65
66 IndexBuildTask {
67 file_meta: file.meta_ref().clone(),
68 reason: build_type,
69 access_layer: access_layer.clone(),
70 manifest_ctx: region.manifest_ctx.clone(),
71 write_cache: self.cache_manager.write_cache().cloned(),
72 file_purger: file.file_purger(),
73 request_sender: self.sender.clone(),
74 indexer_builder: indexer_builder_ref.clone(),
75 result_sender,
76 }
77 }
78
79 pub(crate) async fn handle_rebuild_index(&mut self, request: RegionBuildIndexRequest) {
80 let region_id = request.region_id;
81 let Some(region) = self.regions.get_region(region_id) else {
82 return;
83 };
84
85 let version_control = region.version_control.clone();
86 let version = version_control.current().version;
87
88 let all_files: HashMap<FileId, FileHandle> = version
89 .ssts
90 .levels()
91 .iter()
92 .flat_map(|level| level.files.iter())
93 .filter(|(_, handle)| !handle.is_deleted() && !handle.compacting())
94 .map(|(id, handle)| (*id, handle.clone()))
95 .collect();
96
97 let build_tasks = if request.file_metas.is_empty() {
98 all_files.values().cloned().collect::<Vec<_>>()
102 } else {
103 request
104 .file_metas
105 .iter()
106 .filter_map(|meta| all_files.get(&meta.file_id).cloned())
107 .collect::<Vec<_>>()
108 };
109
110 for file_handle in build_tasks {
111 let task =
112 self.new_index_build_task(®ion, file_handle, request.build_type.clone(), None);
113 let _ = self
114 .index_build_scheduler
115 .schedule_build(®ion.version_control, task);
116 }
117 }
118
119 pub(crate) async fn handle_index_build_finished(
120 &mut self,
121 region_id: RegionId,
122 request: IndexBuildFinished,
123 ) {
124 let region = match self.regions.get_region(region_id) {
125 Some(region) => region,
126 None => {
127 warn!(
128 "Region not found for index build finished, region_id: {}",
129 region_id
130 );
131 return;
132 }
133 };
134 region.version_control.apply_edit(
135 Some(request.edit.clone()),
136 &[],
137 region.file_purger.clone(),
138 );
139 }
140
141 pub(crate) async fn handle_index_build_failed(
142 &mut self,
143 region_id: RegionId,
144 request: IndexBuildFailed,
145 ) {
146 error!(request.err; "Index build failed for region: {}", region_id);
147 }
149}