1use std::num::NonZero;
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::v1::region::compact_request;
20use common_meta::key::SchemaMetadataManagerRef;
21use common_telemetry::{info, warn};
22use common_time::TimeToLive;
23use itertools::Itertools;
24use object_store::manager::ObjectStoreManagerRef;
25use serde::{Deserialize, Serialize};
26use snafu::{OptionExt, ResultExt};
27use store_api::metadata::RegionMetadataRef;
28use store_api::storage::RegionId;
29
30use crate::access_layer::{AccessLayer, AccessLayerRef, OperationType, SstWriteRequest};
31use crate::cache::{CacheManager, CacheManagerRef};
32use crate::compaction::picker::{new_picker, PickerOutput};
33use crate::compaction::{find_ttl, CompactionSstReaderBuilder};
34use crate::config::MitoConfig;
35use crate::error::{EmptyRegionDirSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result};
36use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
37use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
38use crate::manifest::storage::manifest_compress_type;
39use crate::read::Source;
40use crate::region::opener::new_manifest_dir;
41use crate::region::options::RegionOptions;
42use crate::region::version::VersionRef;
43use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
44use crate::schedule::scheduler::LocalScheduler;
45use crate::sst::file::FileMeta;
46use crate::sst::file_purger::LocalFilePurger;
47use crate::sst::index::intermediate::IntermediateManager;
48use crate::sst::index::puffin_manager::PuffinManagerFactory;
49use crate::sst::parquet::WriteOptions;
50use crate::sst::version::{SstVersion, SstVersionRef};
51
52#[derive(Clone)]
54pub struct CompactionVersion {
55 pub(crate) metadata: RegionMetadataRef,
60 pub(crate) options: RegionOptions,
62 pub(crate) ssts: SstVersionRef,
64 pub(crate) compaction_time_window: Option<Duration>,
66}
67
68impl From<VersionRef> for CompactionVersion {
69 fn from(value: VersionRef) -> Self {
70 Self {
71 metadata: value.metadata.clone(),
72 options: value.options.clone(),
73 ssts: value.ssts.clone(),
74 compaction_time_window: value.compaction_time_window,
75 }
76 }
77}
78
79#[derive(Clone)]
82pub struct CompactionRegion {
83 pub region_id: RegionId,
84 pub region_options: RegionOptions,
85 pub region_dir: String,
86
87 pub(crate) engine_config: Arc<MitoConfig>,
88 pub(crate) region_metadata: RegionMetadataRef,
89 pub(crate) cache_manager: CacheManagerRef,
90 pub(crate) access_layer: AccessLayerRef,
91 pub(crate) manifest_ctx: Arc<ManifestContext>,
92 pub(crate) current_version: CompactionVersion,
93 pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
94 pub(crate) ttl: Option<TimeToLive>,
95
96 pub max_parallelism: usize,
101}
102
103#[derive(Debug, Clone)]
105pub struct OpenCompactionRegionRequest {
106 pub region_id: RegionId,
107 pub region_dir: String,
108 pub region_options: RegionOptions,
109 pub max_parallelism: usize,
110}
111
112pub async fn open_compaction_region(
115 req: &OpenCompactionRegionRequest,
116 mito_config: &MitoConfig,
117 object_store_manager: ObjectStoreManagerRef,
118 schema_metadata_manager: SchemaMetadataManagerRef,
119) -> Result<CompactionRegion> {
120 let object_store = {
121 let name = &req.region_options.storage;
122 if let Some(name) = name {
123 object_store_manager
124 .find(name)
125 .context(ObjectStoreNotFoundSnafu {
126 object_store: name.to_string(),
127 })?
128 } else {
129 object_store_manager.default_object_store()
130 }
131 };
132
133 let access_layer = {
134 let puffin_manager_factory = PuffinManagerFactory::new(
135 &mito_config.index.aux_path,
136 mito_config.index.staging_size.as_bytes(),
137 Some(mito_config.index.write_buffer_size.as_bytes() as _),
138 mito_config.index.staging_ttl,
139 )
140 .await?;
141 let intermediate_manager =
142 IntermediateManager::init_fs(mito_config.index.aux_path.clone()).await?;
143
144 Arc::new(AccessLayer::new(
145 req.region_dir.as_str(),
146 object_store.clone(),
147 puffin_manager_factory,
148 intermediate_manager,
149 ))
150 };
151
152 let manifest_manager = {
153 let region_manifest_options = RegionManifestOptions {
154 manifest_dir: new_manifest_dir(req.region_dir.as_str()),
155 object_store: object_store.clone(),
156 compress_type: manifest_compress_type(mito_config.compress_manifest),
157 checkpoint_distance: mito_config.manifest_checkpoint_distance,
158 };
159
160 RegionManifestManager::open(
161 region_manifest_options,
162 Default::default(),
163 Default::default(),
164 )
165 .await?
166 .context(EmptyRegionDirSnafu {
167 region_id: req.region_id,
168 region_dir: req.region_dir.as_str(),
169 })?
170 };
171
172 let manifest = manifest_manager.manifest();
173 let region_metadata = manifest.metadata.clone();
174 let manifest_ctx = Arc::new(ManifestContext::new(
175 manifest_manager,
176 RegionRoleState::Leader(RegionLeaderState::Writable),
177 ));
178
179 let file_purger = {
180 let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_purges));
181 Arc::new(LocalFilePurger::new(
182 purge_scheduler.clone(),
183 access_layer.clone(),
184 None,
185 ))
186 };
187
188 let current_version = {
189 let mut ssts = SstVersion::new();
190 ssts.add_files(file_purger.clone(), manifest.files.values().cloned());
191 CompactionVersion {
192 metadata: region_metadata.clone(),
193 options: req.region_options.clone(),
194 ssts: Arc::new(ssts),
195 compaction_time_window: manifest.compaction_time_window,
196 }
197 };
198
199 let ttl = find_ttl(
200 req.region_id.table_id(),
201 current_version.options.ttl,
202 &schema_metadata_manager,
203 )
204 .await
205 .unwrap_or_else(|e| {
206 warn!(e; "Failed to get ttl for region: {}", region_metadata.region_id);
207 TimeToLive::default()
208 });
209 Ok(CompactionRegion {
210 region_id: req.region_id,
211 region_options: req.region_options.clone(),
212 region_dir: req.region_dir.clone(),
213 engine_config: Arc::new(mito_config.clone()),
214 region_metadata: region_metadata.clone(),
215 cache_manager: Arc::new(CacheManager::default()),
216 access_layer,
217 manifest_ctx,
218 current_version,
219 file_purger: Some(file_purger),
220 ttl: Some(ttl),
221 max_parallelism: req.max_parallelism,
222 })
223}
224
225impl CompactionRegion {
226 pub fn file_purger(&self) -> Option<Arc<LocalFilePurger>> {
227 self.file_purger.clone()
228 }
229}
230
231#[derive(Default, Clone, Debug, Serialize, Deserialize)]
233pub struct MergeOutput {
234 pub files_to_add: Vec<FileMeta>,
235 pub files_to_remove: Vec<FileMeta>,
236 pub compaction_time_window: Option<i64>,
237}
238
239impl MergeOutput {
240 pub fn is_empty(&self) -> bool {
241 self.files_to_add.is_empty() && self.files_to_remove.is_empty()
242 }
243}
244
245#[async_trait::async_trait]
247pub trait Compactor: Send + Sync + 'static {
248 async fn merge_ssts(
250 &self,
251 compaction_region: &CompactionRegion,
252 picker_output: PickerOutput,
253 ) -> Result<MergeOutput>;
254
255 async fn update_manifest(
257 &self,
258 compaction_region: &CompactionRegion,
259 merge_output: MergeOutput,
260 ) -> Result<RegionEdit>;
261
262 async fn compact(
264 &self,
265 compaction_region: &CompactionRegion,
266 compact_request_options: compact_request::Options,
267 ) -> Result<()>;
268}
269
270pub struct DefaultCompactor;
272
273#[async_trait::async_trait]
274impl Compactor for DefaultCompactor {
275 async fn merge_ssts(
276 &self,
277 compaction_region: &CompactionRegion,
278 mut picker_output: PickerOutput,
279 ) -> Result<MergeOutput> {
280 let mut futs = Vec::with_capacity(picker_output.outputs.len());
281 let mut compacted_inputs =
282 Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum());
283 let internal_parallelism = compaction_region.max_parallelism.max(1);
284
285 for output in picker_output.outputs.drain(..) {
286 compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone()));
287 let write_opts = WriteOptions {
288 write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
289 ..Default::default()
290 };
291
292 let region_metadata = compaction_region.region_metadata.clone();
293 let sst_layer = compaction_region.access_layer.clone();
294 let region_id = compaction_region.region_id;
295 let cache_manager = compaction_region.cache_manager.clone();
296 let storage = compaction_region.region_options.storage.clone();
297 let index_options = compaction_region
298 .current_version
299 .options
300 .index_options
301 .clone();
302 let append_mode = compaction_region.current_version.options.append_mode;
303 let merge_mode = compaction_region.current_version.options.merge_mode();
304 let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
305 let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
306 let bloom_filter_index_config =
307 compaction_region.engine_config.bloom_filter_index.clone();
308 let max_sequence = output
309 .inputs
310 .iter()
311 .map(|f| f.meta_ref().sequence)
312 .max()
313 .flatten();
314 futs.push(async move {
315 let input_file_names = output
316 .inputs
317 .iter()
318 .map(|f| f.file_id().to_string())
319 .join(",");
320 let reader = CompactionSstReaderBuilder {
321 metadata: region_metadata.clone(),
322 sst_layer: sst_layer.clone(),
323 cache: cache_manager.clone(),
324 inputs: &output.inputs,
325 append_mode,
326 filter_deleted: output.filter_deleted,
327 time_range: output.output_time_range,
328 merge_mode,
329 }
330 .build_sst_reader()
331 .await?;
332 let output_files = sst_layer
333 .write_sst(
334 SstWriteRequest {
335 op_type: OperationType::Compact,
336 metadata: region_metadata,
337 source: Source::Reader(reader),
338 cache_manager,
339 storage,
340 max_sequence: max_sequence.map(NonZero::get),
341 index_options,
342 inverted_index_config,
343 fulltext_index_config,
344 bloom_filter_index_config,
345 },
346 &write_opts,
347 )
348 .await?
349 .into_iter()
350 .map(|sst_info| FileMeta {
351 region_id,
352 file_id: sst_info.file_id,
353 time_range: sst_info.time_range,
354 level: output.output_level,
355 file_size: sst_info.file_size,
356 available_indexes: sst_info.index_metadata.build_available_indexes(),
357 index_file_size: sst_info.index_metadata.file_size,
358 num_rows: sst_info.num_rows as u64,
359 num_row_groups: sst_info.num_row_groups,
360 sequence: max_sequence,
361 })
362 .collect::<Vec<_>>();
363 let output_file_names =
364 output_files.iter().map(|f| f.file_id.to_string()).join(",");
365 info!(
366 "Region {} compaction inputs: [{}], outputs: [{}]",
367 region_id, input_file_names, output_file_names
368 );
369 Ok(output_files)
370 });
371 }
372 let mut output_files = Vec::with_capacity(futs.len());
373 while !futs.is_empty() {
374 let mut task_chunk = Vec::with_capacity(internal_parallelism);
375 for _ in 0..internal_parallelism {
376 if let Some(task) = futs.pop() {
377 task_chunk.push(common_runtime::spawn_compact(task));
378 }
379 }
380 let metas = futures::future::try_join_all(task_chunk)
381 .await
382 .context(JoinSnafu)?
383 .into_iter()
384 .collect::<Result<Vec<Vec<_>>>>()?;
385 output_files.extend(metas.into_iter().flatten());
386 }
387
388 let mut inputs: Vec<_> = compacted_inputs.into_iter().collect();
389 inputs.extend(
390 picker_output
391 .expired_ssts
392 .iter()
393 .map(|f| f.meta_ref().clone()),
394 );
395
396 Ok(MergeOutput {
397 files_to_add: output_files,
398 files_to_remove: inputs,
399 compaction_time_window: Some(picker_output.time_window_size),
400 })
401 }
402
403 async fn update_manifest(
404 &self,
405 compaction_region: &CompactionRegion,
406 merge_output: MergeOutput,
407 ) -> Result<RegionEdit> {
408 let edit = RegionEdit {
410 files_to_add: merge_output.files_to_add,
411 files_to_remove: merge_output.files_to_remove,
412 compaction_time_window: merge_output
413 .compaction_time_window
414 .map(|seconds| Duration::from_secs(seconds as u64)),
415 flushed_entry_id: None,
416 flushed_sequence: None,
417 };
418
419 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
420 compaction_region
422 .manifest_ctx
423 .update_manifest(RegionLeaderState::Writable, action_list)
424 .await?;
425
426 Ok(edit)
427 }
428
429 async fn compact(
432 &self,
433 compaction_region: &CompactionRegion,
434 compact_request_options: compact_request::Options,
435 ) -> Result<()> {
436 let picker_output = {
437 let picker_output = new_picker(
438 &compact_request_options,
439 &compaction_region.region_options.compaction,
440 compaction_region.region_options.append_mode,
441 )
442 .pick(compaction_region);
443
444 if let Some(picker_output) = picker_output {
445 picker_output
446 } else {
447 info!(
448 "No files to compact for region_id: {}",
449 compaction_region.region_id
450 );
451 return Ok(());
452 }
453 };
454
455 let merge_output = self.merge_ssts(compaction_region, picker_output).await?;
456 if merge_output.is_empty() {
457 info!(
458 "No files to compact for region_id: {}",
459 compaction_region.region_id
460 );
461 return Ok(());
462 }
463 self.update_manifest(compaction_region, merge_output)
464 .await?;
465
466 Ok(())
467 }
468}