1use std::num::NonZero;
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::v1::region::compact_request;
20use common_meta::key::SchemaMetadataManagerRef;
21use common_telemetry::{info, warn};
22use common_time::TimeToLive;
23use either::Either;
24use itertools::Itertools;
25use object_store::manager::ObjectStoreManagerRef;
26use partition::expr::PartitionExpr;
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt};
29use store_api::metadata::RegionMetadataRef;
30use store_api::region_request::PathType;
31use store_api::storage::RegionId;
32
33use crate::access_layer::{
34 AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType,
35};
36use crate::cache::{CacheManager, CacheManagerRef};
37use crate::compaction::picker::{PickerOutput, new_picker};
38use crate::compaction::{CompactionOutput, CompactionSstReaderBuilder, find_ttl};
39use crate::config::MitoConfig;
40use crate::error::{
41 EmptyRegionDirSnafu, InvalidPartitionExprSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result,
42};
43use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
44use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions};
45use crate::manifest::storage::manifest_compress_type;
46use crate::metrics;
47use crate::read::{FlatSource, Source};
48use crate::region::opener::new_manifest_dir;
49use crate::region::options::RegionOptions;
50use crate::region::version::VersionRef;
51use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
52use crate::schedule::scheduler::LocalScheduler;
53use crate::sst::FormatType;
54use crate::sst::file::FileMeta;
55use crate::sst::file_purger::LocalFilePurger;
56use crate::sst::index::intermediate::IntermediateManager;
57use crate::sst::index::puffin_manager::PuffinManagerFactory;
58use crate::sst::location::region_dir_from_table_dir;
59use crate::sst::parquet::WriteOptions;
60use crate::sst::version::{SstVersion, SstVersionRef};
61
62#[derive(Clone)]
64pub struct CompactionVersion {
65 pub(crate) metadata: RegionMetadataRef,
70 pub(crate) options: RegionOptions,
72 pub(crate) ssts: SstVersionRef,
74 pub(crate) compaction_time_window: Option<Duration>,
76}
77
78impl From<VersionRef> for CompactionVersion {
79 fn from(value: VersionRef) -> Self {
80 Self {
81 metadata: value.metadata.clone(),
82 options: value.options.clone(),
83 ssts: value.ssts.clone(),
84 compaction_time_window: value.compaction_time_window,
85 }
86 }
87}
88
89#[derive(Clone)]
92pub struct CompactionRegion {
93 pub region_id: RegionId,
94 pub region_options: RegionOptions,
95
96 pub(crate) engine_config: Arc<MitoConfig>,
97 pub(crate) region_metadata: RegionMetadataRef,
98 pub(crate) cache_manager: CacheManagerRef,
99 pub access_layer: AccessLayerRef,
101 pub(crate) manifest_ctx: Arc<ManifestContext>,
102 pub(crate) current_version: CompactionVersion,
103 pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
104 pub(crate) ttl: Option<TimeToLive>,
105
106 pub max_parallelism: usize,
111}
112
113#[derive(Debug, Clone)]
115pub struct OpenCompactionRegionRequest {
116 pub region_id: RegionId,
117 pub table_dir: String,
118 pub path_type: PathType,
119 pub region_options: RegionOptions,
120 pub max_parallelism: usize,
121}
122
123pub async fn open_compaction_region(
126 req: &OpenCompactionRegionRequest,
127 mito_config: &MitoConfig,
128 object_store_manager: ObjectStoreManagerRef,
129 ttl_provider: Either<TimeToLive, SchemaMetadataManagerRef>,
130) -> Result<CompactionRegion> {
131 let object_store = {
132 let name = &req.region_options.storage;
133 if let Some(name) = name {
134 object_store_manager
135 .find(name)
136 .with_context(|| ObjectStoreNotFoundSnafu {
137 object_store: name.clone(),
138 })?
139 } else {
140 object_store_manager.default_object_store()
141 }
142 };
143
144 let access_layer = {
145 let puffin_manager_factory = PuffinManagerFactory::new(
146 &mito_config.index.aux_path,
147 mito_config.index.staging_size.as_bytes(),
148 Some(mito_config.index.write_buffer_size.as_bytes() as _),
149 mito_config.index.staging_ttl,
150 )
151 .await?;
152 let intermediate_manager =
153 IntermediateManager::init_fs(mito_config.index.aux_path.clone()).await?;
154
155 Arc::new(AccessLayer::new(
156 &req.table_dir,
157 req.path_type,
158 object_store.clone(),
159 puffin_manager_factory,
160 intermediate_manager,
161 ))
162 };
163
164 let manifest_manager = {
165 let region_manifest_options = RegionManifestOptions {
166 manifest_dir: new_manifest_dir(®ion_dir_from_table_dir(
167 &req.table_dir,
168 req.region_id,
169 req.path_type,
170 )),
171 object_store: object_store.clone(),
172 compress_type: manifest_compress_type(mito_config.compress_manifest),
173 checkpoint_distance: mito_config.manifest_checkpoint_distance,
174 remove_file_options: RemoveFileOptions {
175 keep_count: mito_config.experimental_manifest_keep_removed_file_count,
176 keep_ttl: mito_config.experimental_manifest_keep_removed_file_ttl,
177 },
178 };
179
180 RegionManifestManager::open(
181 region_manifest_options,
182 Default::default(),
183 Default::default(),
184 )
185 .await?
186 .context(EmptyRegionDirSnafu {
187 region_id: req.region_id,
188 region_dir: ®ion_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type),
189 })?
190 };
191
192 let manifest = manifest_manager.manifest();
193 let region_metadata = manifest.metadata.clone();
194 let manifest_ctx = Arc::new(ManifestContext::new(
195 manifest_manager,
196 RegionRoleState::Leader(RegionLeaderState::Writable),
197 ));
198
199 let file_purger = {
200 let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_purges));
201 Arc::new(LocalFilePurger::new(
202 purge_scheduler.clone(),
203 access_layer.clone(),
204 None,
205 ))
206 };
207
208 let current_version = {
209 let mut ssts = SstVersion::new();
210 ssts.add_files(file_purger.clone(), manifest.files.values().cloned());
211 CompactionVersion {
212 metadata: region_metadata.clone(),
213 options: req.region_options.clone(),
214 ssts: Arc::new(ssts),
215 compaction_time_window: manifest.compaction_time_window,
216 }
217 };
218
219 let ttl = match ttl_provider {
220 Either::Left(ttl) => ttl,
222 Either::Right(schema_metadata_manager) => find_ttl(
224 req.region_id.table_id(),
225 current_version.options.ttl,
226 &schema_metadata_manager,
227 )
228 .await
229 .unwrap_or_else(|e| {
230 warn!(e; "Failed to get ttl for region: {}", region_metadata.region_id);
231 TimeToLive::default()
232 }),
233 };
234
235 Ok(CompactionRegion {
236 region_id: req.region_id,
237 region_options: req.region_options.clone(),
238 engine_config: Arc::new(mito_config.clone()),
239 region_metadata: region_metadata.clone(),
240 cache_manager: Arc::new(CacheManager::default()),
241 access_layer,
242 manifest_ctx,
243 current_version,
244 file_purger: Some(file_purger),
245 ttl: Some(ttl),
246 max_parallelism: req.max_parallelism,
247 })
248}
249
250impl CompactionRegion {
251 pub fn file_purger(&self) -> Option<Arc<LocalFilePurger>> {
253 self.file_purger.clone()
254 }
255
256 pub async fn stop_purger_scheduler(&self) -> Result<()> {
258 if let Some(file_purger) = &self.file_purger {
259 file_purger.stop_scheduler().await
260 } else {
261 Ok(())
262 }
263 }
264}
265
266#[derive(Default, Clone, Debug, Serialize, Deserialize)]
268pub struct MergeOutput {
269 pub files_to_add: Vec<FileMeta>,
270 pub files_to_remove: Vec<FileMeta>,
271 pub compaction_time_window: Option<i64>,
272}
273
274impl MergeOutput {
275 pub fn is_empty(&self) -> bool {
276 self.files_to_add.is_empty() && self.files_to_remove.is_empty()
277 }
278
279 pub fn input_file_size(&self) -> u64 {
280 self.files_to_remove.iter().map(|f| f.file_size).sum()
281 }
282
283 pub fn output_file_size(&self) -> u64 {
284 self.files_to_add.iter().map(|f| f.file_size).sum()
285 }
286}
287
288#[async_trait::async_trait]
290pub trait Compactor: Send + Sync + 'static {
291 async fn merge_ssts(
293 &self,
294 compaction_region: &CompactionRegion,
295 picker_output: PickerOutput,
296 ) -> Result<MergeOutput>;
297
298 async fn update_manifest(
300 &self,
301 compaction_region: &CompactionRegion,
302 merge_output: MergeOutput,
303 ) -> Result<RegionEdit>;
304
305 async fn compact(
307 &self,
308 compaction_region: &CompactionRegion,
309 compact_request_options: compact_request::Options,
310 ) -> Result<()>;
311}
312
313pub struct DefaultCompactor;
315
316impl DefaultCompactor {
317 async fn merge_single_output(
319 compaction_region: CompactionRegion,
320 output: CompactionOutput,
321 write_opts: WriteOptions,
322 ) -> Result<Vec<FileMeta>> {
323 let region_id = compaction_region.region_id;
324 let storage = compaction_region.region_options.storage.clone();
325 let index_options = compaction_region
326 .current_version
327 .options
328 .index_options
329 .clone();
330 let append_mode = compaction_region.current_version.options.append_mode;
331 let merge_mode = compaction_region.current_version.options.merge_mode();
332 let flat_format = compaction_region
333 .region_options
334 .sst_format
335 .map(|format| format == FormatType::Flat)
336 .unwrap_or(
337 compaction_region
338 .engine_config
339 .default_experimental_flat_format,
340 );
341
342 let index_config = compaction_region.engine_config.index.clone();
343 let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
344 let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
345 let bloom_filter_index_config = compaction_region.engine_config.bloom_filter_index.clone();
346
347 let input_file_names = output
348 .inputs
349 .iter()
350 .map(|f| f.file_id().to_string())
351 .join(",");
352 let max_sequence = output
353 .inputs
354 .iter()
355 .map(|f| f.meta_ref().sequence)
356 .max()
357 .flatten();
358 let builder = CompactionSstReaderBuilder {
359 metadata: compaction_region.region_metadata.clone(),
360 sst_layer: compaction_region.access_layer.clone(),
361 cache: compaction_region.cache_manager.clone(),
362 inputs: &output.inputs,
363 append_mode,
364 filter_deleted: output.filter_deleted,
365 time_range: output.output_time_range,
366 merge_mode,
367 };
368 let source = if flat_format {
369 let reader = builder.build_flat_sst_reader().await?;
370 Either::Right(FlatSource::Stream(reader))
371 } else {
372 let reader = builder.build_sst_reader().await?;
373 Either::Left(Source::Reader(reader))
374 };
375 let mut metrics = Metrics::new(WriteType::Compaction);
376 let region_metadata = compaction_region.region_metadata.clone();
377 let sst_infos = compaction_region
378 .access_layer
379 .write_sst(
380 SstWriteRequest {
381 op_type: OperationType::Compact,
382 metadata: region_metadata.clone(),
383 source,
384 cache_manager: compaction_region.cache_manager.clone(),
385 storage,
386 max_sequence: max_sequence.map(NonZero::get),
387 index_options,
388 index_config,
389 inverted_index_config,
390 fulltext_index_config,
391 bloom_filter_index_config,
392 },
393 &write_opts,
394 &mut metrics,
395 )
396 .await?;
397 let partition_expr = match ®ion_metadata.partition_expr {
399 None => None,
400 Some(json_str) if json_str.is_empty() => None,
401 Some(json_str) => PartitionExpr::from_json_str(json_str).with_context(|_| {
402 InvalidPartitionExprSnafu {
403 expr: json_str.clone(),
404 }
405 })?,
406 };
407
408 let output_files = sst_infos
409 .into_iter()
410 .map(|sst_info| FileMeta {
411 region_id,
412 file_id: sst_info.file_id,
413 time_range: sst_info.time_range,
414 level: output.output_level,
415 file_size: sst_info.file_size,
416 available_indexes: sst_info.index_metadata.build_available_indexes(),
417 index_file_size: sst_info.index_metadata.file_size,
418 index_file_id: None,
419 num_rows: sst_info.num_rows as u64,
420 num_row_groups: sst_info.num_row_groups,
421 sequence: max_sequence,
422 partition_expr: partition_expr.clone(),
423 num_series: sst_info.num_series,
424 })
425 .collect::<Vec<_>>();
426 let output_file_names = output_files.iter().map(|f| f.file_id.to_string()).join(",");
427 info!(
428 "Region {} compaction inputs: [{}], outputs: [{}], flat_format: {}, metrics: {:?}",
429 region_id, input_file_names, output_file_names, flat_format, metrics
430 );
431 metrics.observe();
432 Ok(output_files)
433 }
434}
435
436#[async_trait::async_trait]
437impl Compactor for DefaultCompactor {
438 async fn merge_ssts(
439 &self,
440 compaction_region: &CompactionRegion,
441 mut picker_output: PickerOutput,
442 ) -> Result<MergeOutput> {
443 let mut futs = Vec::with_capacity(picker_output.outputs.len());
444 let mut compacted_inputs =
445 Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum());
446 let internal_parallelism = compaction_region.max_parallelism.max(1);
447 let compaction_time_window = picker_output.time_window_size;
448
449 for output in picker_output.outputs.drain(..) {
450 let inputs_to_remove: Vec<_> =
451 output.inputs.iter().map(|f| f.meta_ref().clone()).collect();
452 compacted_inputs.extend(inputs_to_remove.iter().cloned());
453 let write_opts = WriteOptions {
454 write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
455 max_file_size: picker_output.max_file_size,
456 ..Default::default()
457 };
458 futs.push(Self::merge_single_output(
459 compaction_region.clone(),
460 output,
461 write_opts,
462 ));
463 }
464 let mut output_files = Vec::with_capacity(futs.len());
465 while !futs.is_empty() {
466 let mut task_chunk = Vec::with_capacity(internal_parallelism);
467 for _ in 0..internal_parallelism {
468 if let Some(task) = futs.pop() {
469 task_chunk.push(common_runtime::spawn_compact(task));
470 }
471 }
472 let metas = futures::future::try_join_all(task_chunk)
473 .await
474 .context(JoinSnafu)?
475 .into_iter()
476 .collect::<Result<Vec<Vec<_>>>>()?;
477 output_files.extend(metas.into_iter().flatten());
478 }
479
480 let mut inputs: Vec<_> = compacted_inputs.into_iter().collect();
483 inputs.extend(
484 picker_output
485 .expired_ssts
486 .iter()
487 .map(|f| f.meta_ref().clone()),
488 );
489
490 Ok(MergeOutput {
491 files_to_add: output_files,
492 files_to_remove: inputs,
493 compaction_time_window: Some(compaction_time_window),
494 })
495 }
496
497 async fn update_manifest(
498 &self,
499 compaction_region: &CompactionRegion,
500 merge_output: MergeOutput,
501 ) -> Result<RegionEdit> {
502 let edit = RegionEdit {
504 files_to_add: merge_output.files_to_add,
505 files_to_remove: merge_output.files_to_remove,
506 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
508 compaction_time_window: merge_output
509 .compaction_time_window
510 .map(|seconds| Duration::from_secs(seconds as u64)),
511 flushed_entry_id: None,
512 flushed_sequence: None,
513 committed_sequence: None,
514 };
515
516 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
517 compaction_region
519 .manifest_ctx
520 .update_manifest(RegionLeaderState::Writable, action_list)
521 .await?;
522
523 Ok(edit)
524 }
525
526 async fn compact(
529 &self,
530 compaction_region: &CompactionRegion,
531 compact_request_options: compact_request::Options,
532 ) -> Result<()> {
533 let picker_output = {
534 let picker_output = new_picker(
535 &compact_request_options,
536 &compaction_region.region_options.compaction,
537 compaction_region.region_options.append_mode,
538 None,
539 )
540 .pick(compaction_region);
541
542 if let Some(picker_output) = picker_output {
543 picker_output
544 } else {
545 info!(
546 "No files to compact for region_id: {}",
547 compaction_region.region_id
548 );
549 return Ok(());
550 }
551 };
552
553 let merge_output = self.merge_ssts(compaction_region, picker_output).await?;
554 if merge_output.is_empty() {
555 info!(
556 "No files to compact for region_id: {}",
557 compaction_region.region_id
558 );
559 return Ok(());
560 }
561
562 metrics::COMPACTION_INPUT_BYTES.inc_by(merge_output.input_file_size() as f64);
563 metrics::COMPACTION_OUTPUT_BYTES.inc_by(merge_output.output_file_size() as f64);
564 self.update_manifest(compaction_region, merge_output)
565 .await?;
566
567 Ok(())
568 }
569}