1use std::num::NonZero;
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::v1::region::compact_request;
20use common_meta::key::SchemaMetadataManagerRef;
21use common_telemetry::{info, warn};
22use common_time::TimeToLive;
23use either::Either;
24use itertools::Itertools;
25use object_store::manager::ObjectStoreManagerRef;
26use serde::{Deserialize, Serialize};
27use snafu::{OptionExt, ResultExt};
28use store_api::metadata::RegionMetadataRef;
29use store_api::region_request::PathType;
30use store_api::storage::RegionId;
31
32use crate::access_layer::{AccessLayer, AccessLayerRef, OperationType, SstWriteRequest, WriteType};
33use crate::cache::{CacheManager, CacheManagerRef};
34use crate::compaction::picker::{new_picker, PickerOutput};
35use crate::compaction::{find_ttl, CompactionSstReaderBuilder};
36use crate::config::MitoConfig;
37use crate::error::{EmptyRegionDirSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result};
38use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
39use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
40use crate::manifest::storage::manifest_compress_type;
41use crate::metrics;
42use crate::read::Source;
43use crate::region::opener::new_manifest_dir;
44use crate::region::options::RegionOptions;
45use crate::region::version::VersionRef;
46use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
47use crate::schedule::scheduler::LocalScheduler;
48use crate::sst::file::FileMeta;
49use crate::sst::file_purger::LocalFilePurger;
50use crate::sst::index::intermediate::IntermediateManager;
51use crate::sst::index::puffin_manager::PuffinManagerFactory;
52use crate::sst::location::region_dir_from_table_dir;
53use crate::sst::parquet::WriteOptions;
54use crate::sst::version::{SstVersion, SstVersionRef};
55
56#[derive(Clone)]
58pub struct CompactionVersion {
59 pub(crate) metadata: RegionMetadataRef,
64 pub(crate) options: RegionOptions,
66 pub(crate) ssts: SstVersionRef,
68 pub(crate) compaction_time_window: Option<Duration>,
70}
71
72impl From<VersionRef> for CompactionVersion {
73 fn from(value: VersionRef) -> Self {
74 Self {
75 metadata: value.metadata.clone(),
76 options: value.options.clone(),
77 ssts: value.ssts.clone(),
78 compaction_time_window: value.compaction_time_window,
79 }
80 }
81}
82
83#[derive(Clone)]
86pub struct CompactionRegion {
87 pub region_id: RegionId,
88 pub region_options: RegionOptions,
89
90 pub(crate) engine_config: Arc<MitoConfig>,
91 pub(crate) region_metadata: RegionMetadataRef,
92 pub(crate) cache_manager: CacheManagerRef,
93 pub(crate) access_layer: AccessLayerRef,
94 pub(crate) manifest_ctx: Arc<ManifestContext>,
95 pub(crate) current_version: CompactionVersion,
96 pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
97 pub(crate) ttl: Option<TimeToLive>,
98
99 pub max_parallelism: usize,
104}
105
106#[derive(Debug, Clone)]
108pub struct OpenCompactionRegionRequest {
109 pub region_id: RegionId,
110 pub table_dir: String,
111 pub path_type: PathType,
112 pub region_options: RegionOptions,
113 pub max_parallelism: usize,
114}
115
116pub async fn open_compaction_region(
119 req: &OpenCompactionRegionRequest,
120 mito_config: &MitoConfig,
121 object_store_manager: ObjectStoreManagerRef,
122 ttl_provider: Either<TimeToLive, SchemaMetadataManagerRef>,
123) -> Result<CompactionRegion> {
124 let object_store = {
125 let name = &req.region_options.storage;
126 if let Some(name) = name {
127 object_store_manager
128 .find(name)
129 .context(ObjectStoreNotFoundSnafu {
130 object_store: name.to_string(),
131 })?
132 } else {
133 object_store_manager.default_object_store()
134 }
135 };
136
137 let access_layer = {
138 let puffin_manager_factory = PuffinManagerFactory::new(
139 &mito_config.index.aux_path,
140 mito_config.index.staging_size.as_bytes(),
141 Some(mito_config.index.write_buffer_size.as_bytes() as _),
142 mito_config.index.staging_ttl,
143 )
144 .await?;
145 let intermediate_manager =
146 IntermediateManager::init_fs(mito_config.index.aux_path.clone()).await?;
147
148 Arc::new(AccessLayer::new(
149 &req.table_dir,
150 req.path_type,
151 object_store.clone(),
152 puffin_manager_factory,
153 intermediate_manager,
154 ))
155 };
156
157 let manifest_manager = {
158 let region_manifest_options = RegionManifestOptions {
159 manifest_dir: new_manifest_dir(®ion_dir_from_table_dir(
160 &req.table_dir,
161 req.region_id,
162 req.path_type,
163 )),
164 object_store: object_store.clone(),
165 compress_type: manifest_compress_type(mito_config.compress_manifest),
166 checkpoint_distance: mito_config.manifest_checkpoint_distance,
167 };
168
169 RegionManifestManager::open(
170 region_manifest_options,
171 Default::default(),
172 Default::default(),
173 )
174 .await?
175 .context(EmptyRegionDirSnafu {
176 region_id: req.region_id,
177 region_dir: ®ion_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type),
178 })?
179 };
180
181 let manifest = manifest_manager.manifest();
182 let region_metadata = manifest.metadata.clone();
183 let manifest_ctx = Arc::new(ManifestContext::new(
184 manifest_manager,
185 RegionRoleState::Leader(RegionLeaderState::Writable),
186 ));
187
188 let file_purger = {
189 let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_purges));
190 Arc::new(LocalFilePurger::new(
191 purge_scheduler.clone(),
192 access_layer.clone(),
193 None,
194 ))
195 };
196
197 let current_version = {
198 let mut ssts = SstVersion::new();
199 ssts.add_files(file_purger.clone(), manifest.files.values().cloned());
200 CompactionVersion {
201 metadata: region_metadata.clone(),
202 options: req.region_options.clone(),
203 ssts: Arc::new(ssts),
204 compaction_time_window: manifest.compaction_time_window,
205 }
206 };
207
208 let ttl = match ttl_provider {
209 Either::Left(ttl) => ttl,
211 Either::Right(schema_metadata_manager) => find_ttl(
213 req.region_id.table_id(),
214 current_version.options.ttl,
215 &schema_metadata_manager,
216 )
217 .await
218 .unwrap_or_else(|e| {
219 warn!(e; "Failed to get ttl for region: {}", region_metadata.region_id);
220 TimeToLive::default()
221 }),
222 };
223
224 Ok(CompactionRegion {
225 region_id: req.region_id,
226 region_options: req.region_options.clone(),
227 engine_config: Arc::new(mito_config.clone()),
228 region_metadata: region_metadata.clone(),
229 cache_manager: Arc::new(CacheManager::default()),
230 access_layer,
231 manifest_ctx,
232 current_version,
233 file_purger: Some(file_purger),
234 ttl: Some(ttl),
235 max_parallelism: req.max_parallelism,
236 })
237}
238
239impl CompactionRegion {
240 pub fn file_purger(&self) -> Option<Arc<LocalFilePurger>> {
241 self.file_purger.clone()
242 }
243}
244
245#[derive(Default, Clone, Debug, Serialize, Deserialize)]
247pub struct MergeOutput {
248 pub files_to_add: Vec<FileMeta>,
249 pub files_to_remove: Vec<FileMeta>,
250 pub compaction_time_window: Option<i64>,
251}
252
253impl MergeOutput {
254 pub fn is_empty(&self) -> bool {
255 self.files_to_add.is_empty() && self.files_to_remove.is_empty()
256 }
257
258 pub fn input_file_size(&self) -> u64 {
259 self.files_to_remove.iter().map(|f| f.file_size).sum()
260 }
261
262 pub fn output_file_size(&self) -> u64 {
263 self.files_to_add.iter().map(|f| f.file_size).sum()
264 }
265}
266
267#[async_trait::async_trait]
269pub trait Compactor: Send + Sync + 'static {
270 async fn merge_ssts(
272 &self,
273 compaction_region: &CompactionRegion,
274 picker_output: PickerOutput,
275 ) -> Result<MergeOutput>;
276
277 async fn update_manifest(
279 &self,
280 compaction_region: &CompactionRegion,
281 merge_output: MergeOutput,
282 ) -> Result<RegionEdit>;
283
284 async fn compact(
286 &self,
287 compaction_region: &CompactionRegion,
288 compact_request_options: compact_request::Options,
289 ) -> Result<()>;
290}
291
292pub struct DefaultCompactor;
294
295#[async_trait::async_trait]
296impl Compactor for DefaultCompactor {
297 async fn merge_ssts(
298 &self,
299 compaction_region: &CompactionRegion,
300 mut picker_output: PickerOutput,
301 ) -> Result<MergeOutput> {
302 let mut futs = Vec::with_capacity(picker_output.outputs.len());
303 let mut compacted_inputs =
304 Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum());
305 let internal_parallelism = compaction_region.max_parallelism.max(1);
306
307 for output in picker_output.outputs.drain(..) {
308 compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone()));
309 let write_opts = WriteOptions {
310 write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
311 max_file_size: picker_output.max_file_size,
312 ..Default::default()
313 };
314
315 let region_metadata = compaction_region.region_metadata.clone();
316 let sst_layer = compaction_region.access_layer.clone();
317 let region_id = compaction_region.region_id;
318 let cache_manager = compaction_region.cache_manager.clone();
319 let storage = compaction_region.region_options.storage.clone();
320 let index_options = compaction_region
321 .current_version
322 .options
323 .index_options
324 .clone();
325 let append_mode = compaction_region.current_version.options.append_mode;
326 let merge_mode = compaction_region.current_version.options.merge_mode();
327 let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
328 let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
329 let bloom_filter_index_config =
330 compaction_region.engine_config.bloom_filter_index.clone();
331 let max_sequence = output
332 .inputs
333 .iter()
334 .map(|f| f.meta_ref().sequence)
335 .max()
336 .flatten();
337 futs.push(async move {
338 let input_file_names = output
339 .inputs
340 .iter()
341 .map(|f| f.file_id().to_string())
342 .join(",");
343 let reader = CompactionSstReaderBuilder {
344 metadata: region_metadata.clone(),
345 sst_layer: sst_layer.clone(),
346 cache: cache_manager.clone(),
347 inputs: &output.inputs,
348 append_mode,
349 filter_deleted: output.filter_deleted,
350 time_range: output.output_time_range,
351 merge_mode,
352 }
353 .build_sst_reader()
354 .await?;
355 let (sst_infos, metrics) = sst_layer
356 .write_sst(
357 SstWriteRequest {
358 op_type: OperationType::Compact,
359 metadata: region_metadata,
360 source: Source::Reader(reader),
361 cache_manager,
362 storage,
363 max_sequence: max_sequence.map(NonZero::get),
364 index_options,
365 inverted_index_config,
366 fulltext_index_config,
367 bloom_filter_index_config,
368 },
369 &write_opts,
370 WriteType::Compaction,
371 )
372 .await?;
373 let output_files = sst_infos
374 .into_iter()
375 .map(|sst_info| FileMeta {
376 region_id,
377 file_id: sst_info.file_id,
378 time_range: sst_info.time_range,
379 level: output.output_level,
380 file_size: sst_info.file_size,
381 available_indexes: sst_info.index_metadata.build_available_indexes(),
382 index_file_size: sst_info.index_metadata.file_size,
383 num_rows: sst_info.num_rows as u64,
384 num_row_groups: sst_info.num_row_groups,
385 sequence: max_sequence,
386 })
387 .collect::<Vec<_>>();
388 let output_file_names =
389 output_files.iter().map(|f| f.file_id.to_string()).join(",");
390 info!(
391 "Region {} compaction inputs: [{}], outputs: [{}], metrics: {:?}",
392 region_id, input_file_names, output_file_names, metrics
393 );
394 metrics.observe();
395 Ok(output_files)
396 });
397 }
398 let mut output_files = Vec::with_capacity(futs.len());
399 while !futs.is_empty() {
400 let mut task_chunk = Vec::with_capacity(internal_parallelism);
401 for _ in 0..internal_parallelism {
402 if let Some(task) = futs.pop() {
403 task_chunk.push(common_runtime::spawn_compact(task));
404 }
405 }
406 let metas = futures::future::try_join_all(task_chunk)
407 .await
408 .context(JoinSnafu)?
409 .into_iter()
410 .collect::<Result<Vec<Vec<_>>>>()?;
411 output_files.extend(metas.into_iter().flatten());
412 }
413
414 let mut inputs: Vec<_> = compacted_inputs.into_iter().collect();
415 inputs.extend(
416 picker_output
417 .expired_ssts
418 .iter()
419 .map(|f| f.meta_ref().clone()),
420 );
421
422 Ok(MergeOutput {
423 files_to_add: output_files,
424 files_to_remove: inputs,
425 compaction_time_window: Some(picker_output.time_window_size),
426 })
427 }
428
429 async fn update_manifest(
430 &self,
431 compaction_region: &CompactionRegion,
432 merge_output: MergeOutput,
433 ) -> Result<RegionEdit> {
434 let edit = RegionEdit {
436 files_to_add: merge_output.files_to_add,
437 files_to_remove: merge_output.files_to_remove,
438 compaction_time_window: merge_output
439 .compaction_time_window
440 .map(|seconds| Duration::from_secs(seconds as u64)),
441 flushed_entry_id: None,
442 flushed_sequence: None,
443 };
444
445 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
446 compaction_region
448 .manifest_ctx
449 .update_manifest(RegionLeaderState::Writable, action_list)
450 .await?;
451
452 Ok(edit)
453 }
454
455 async fn compact(
458 &self,
459 compaction_region: &CompactionRegion,
460 compact_request_options: compact_request::Options,
461 ) -> Result<()> {
462 let picker_output = {
463 let picker_output = new_picker(
464 &compact_request_options,
465 &compaction_region.region_options.compaction,
466 compaction_region.region_options.append_mode,
467 )
468 .pick(compaction_region);
469
470 if let Some(picker_output) = picker_output {
471 picker_output
472 } else {
473 info!(
474 "No files to compact for region_id: {}",
475 compaction_region.region_id
476 );
477 return Ok(());
478 }
479 };
480
481 let merge_output = self.merge_ssts(compaction_region, picker_output).await?;
482 if merge_output.is_empty() {
483 info!(
484 "No files to compact for region_id: {}",
485 compaction_region.region_id
486 );
487 return Ok(());
488 }
489
490 metrics::COMPACTION_INPUT_BYTES.inc_by(merge_output.input_file_size() as f64);
491 metrics::COMPACTION_OUTPUT_BYTES.inc_by(merge_output.output_file_size() as f64);
492 self.update_manifest(compaction_region, merge_output)
493 .await?;
494
495 Ok(())
496 }
497}