1use std::num::NonZero;
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::v1::region::compact_request;
20use common_meta::key::SchemaMetadataManagerRef;
21use common_telemetry::{info, warn};
22use common_time::TimeToLive;
23use either::Either;
24use itertools::Itertools;
25use object_store::manager::ObjectStoreManagerRef;
26use partition::expr::PartitionExpr;
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt};
29use store_api::metadata::RegionMetadataRef;
30use store_api::region_request::PathType;
31use store_api::storage::RegionId;
32
33use crate::access_layer::{AccessLayer, AccessLayerRef, OperationType, SstWriteRequest, WriteType};
34use crate::cache::{CacheManager, CacheManagerRef};
35use crate::compaction::picker::{PickerOutput, new_picker};
36use crate::compaction::{CompactionSstReaderBuilder, find_ttl};
37use crate::config::MitoConfig;
38use crate::error::{
39 EmptyRegionDirSnafu, InvalidPartitionExprSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result,
40};
41use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
42use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions};
43use crate::manifest::storage::manifest_compress_type;
44use crate::metrics;
45use crate::read::{FlatSource, Source};
46use crate::region::opener::new_manifest_dir;
47use crate::region::options::RegionOptions;
48use crate::region::version::VersionRef;
49use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
50use crate::schedule::scheduler::LocalScheduler;
51use crate::sst::file::FileMeta;
52use crate::sst::file_purger::LocalFilePurger;
53use crate::sst::index::intermediate::IntermediateManager;
54use crate::sst::index::puffin_manager::PuffinManagerFactory;
55use crate::sst::location::region_dir_from_table_dir;
56use crate::sst::parquet::WriteOptions;
57use crate::sst::version::{SstVersion, SstVersionRef};
58
59#[derive(Clone)]
61pub struct CompactionVersion {
62 pub(crate) metadata: RegionMetadataRef,
67 pub(crate) options: RegionOptions,
69 pub(crate) ssts: SstVersionRef,
71 pub(crate) compaction_time_window: Option<Duration>,
73}
74
75impl From<VersionRef> for CompactionVersion {
76 fn from(value: VersionRef) -> Self {
77 Self {
78 metadata: value.metadata.clone(),
79 options: value.options.clone(),
80 ssts: value.ssts.clone(),
81 compaction_time_window: value.compaction_time_window,
82 }
83 }
84}
85
86#[derive(Clone)]
89pub struct CompactionRegion {
90 pub region_id: RegionId,
91 pub region_options: RegionOptions,
92
93 pub(crate) engine_config: Arc<MitoConfig>,
94 pub(crate) region_metadata: RegionMetadataRef,
95 pub(crate) cache_manager: CacheManagerRef,
96 pub access_layer: AccessLayerRef,
98 pub(crate) manifest_ctx: Arc<ManifestContext>,
99 pub(crate) current_version: CompactionVersion,
100 pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
101 pub(crate) ttl: Option<TimeToLive>,
102
103 pub max_parallelism: usize,
108}
109
110#[derive(Debug, Clone)]
112pub struct OpenCompactionRegionRequest {
113 pub region_id: RegionId,
114 pub table_dir: String,
115 pub path_type: PathType,
116 pub region_options: RegionOptions,
117 pub max_parallelism: usize,
118}
119
120pub async fn open_compaction_region(
123 req: &OpenCompactionRegionRequest,
124 mito_config: &MitoConfig,
125 object_store_manager: ObjectStoreManagerRef,
126 ttl_provider: Either<TimeToLive, SchemaMetadataManagerRef>,
127) -> Result<CompactionRegion> {
128 let object_store = {
129 let name = &req.region_options.storage;
130 if let Some(name) = name {
131 object_store_manager
132 .find(name)
133 .context(ObjectStoreNotFoundSnafu {
134 object_store: name.to_string(),
135 })?
136 } else {
137 object_store_manager.default_object_store()
138 }
139 };
140
141 let access_layer = {
142 let puffin_manager_factory = PuffinManagerFactory::new(
143 &mito_config.index.aux_path,
144 mito_config.index.staging_size.as_bytes(),
145 Some(mito_config.index.write_buffer_size.as_bytes() as _),
146 mito_config.index.staging_ttl,
147 )
148 .await?;
149 let intermediate_manager =
150 IntermediateManager::init_fs(mito_config.index.aux_path.clone()).await?;
151
152 Arc::new(AccessLayer::new(
153 &req.table_dir,
154 req.path_type,
155 object_store.clone(),
156 puffin_manager_factory,
157 intermediate_manager,
158 ))
159 };
160
161 let manifest_manager = {
162 let region_manifest_options = RegionManifestOptions {
163 manifest_dir: new_manifest_dir(®ion_dir_from_table_dir(
164 &req.table_dir,
165 req.region_id,
166 req.path_type,
167 )),
168 object_store: object_store.clone(),
169 compress_type: manifest_compress_type(mito_config.compress_manifest),
170 checkpoint_distance: mito_config.manifest_checkpoint_distance,
171 remove_file_options: RemoveFileOptions {
172 keep_count: mito_config.experimental_manifest_keep_removed_file_count,
173 keep_ttl: mito_config.experimental_manifest_keep_removed_file_ttl,
174 },
175 };
176
177 RegionManifestManager::open(
178 region_manifest_options,
179 Default::default(),
180 Default::default(),
181 )
182 .await?
183 .context(EmptyRegionDirSnafu {
184 region_id: req.region_id,
185 region_dir: ®ion_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type),
186 })?
187 };
188
189 let manifest = manifest_manager.manifest();
190 let region_metadata = manifest.metadata.clone();
191 let manifest_ctx = Arc::new(ManifestContext::new(
192 manifest_manager,
193 RegionRoleState::Leader(RegionLeaderState::Writable),
194 ));
195
196 let file_purger = {
197 let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_purges));
198 Arc::new(LocalFilePurger::new(
199 purge_scheduler.clone(),
200 access_layer.clone(),
201 None,
202 ))
203 };
204
205 let current_version = {
206 let mut ssts = SstVersion::new();
207 ssts.add_files(file_purger.clone(), manifest.files.values().cloned());
208 CompactionVersion {
209 metadata: region_metadata.clone(),
210 options: req.region_options.clone(),
211 ssts: Arc::new(ssts),
212 compaction_time_window: manifest.compaction_time_window,
213 }
214 };
215
216 let ttl = match ttl_provider {
217 Either::Left(ttl) => ttl,
219 Either::Right(schema_metadata_manager) => find_ttl(
221 req.region_id.table_id(),
222 current_version.options.ttl,
223 &schema_metadata_manager,
224 )
225 .await
226 .unwrap_or_else(|e| {
227 warn!(e; "Failed to get ttl for region: {}", region_metadata.region_id);
228 TimeToLive::default()
229 }),
230 };
231
232 Ok(CompactionRegion {
233 region_id: req.region_id,
234 region_options: req.region_options.clone(),
235 engine_config: Arc::new(mito_config.clone()),
236 region_metadata: region_metadata.clone(),
237 cache_manager: Arc::new(CacheManager::default()),
238 access_layer,
239 manifest_ctx,
240 current_version,
241 file_purger: Some(file_purger),
242 ttl: Some(ttl),
243 max_parallelism: req.max_parallelism,
244 })
245}
246
247impl CompactionRegion {
248 pub fn file_purger(&self) -> Option<Arc<LocalFilePurger>> {
250 self.file_purger.clone()
251 }
252
253 pub async fn stop_purger_scheduler(&self) -> Result<()> {
255 if let Some(file_purger) = &self.file_purger {
256 file_purger.stop_scheduler().await
257 } else {
258 Ok(())
259 }
260 }
261}
262
263#[derive(Default, Clone, Debug, Serialize, Deserialize)]
265pub struct MergeOutput {
266 pub files_to_add: Vec<FileMeta>,
267 pub files_to_remove: Vec<FileMeta>,
268 pub compaction_time_window: Option<i64>,
269}
270
271impl MergeOutput {
272 pub fn is_empty(&self) -> bool {
273 self.files_to_add.is_empty() && self.files_to_remove.is_empty()
274 }
275
276 pub fn input_file_size(&self) -> u64 {
277 self.files_to_remove.iter().map(|f| f.file_size).sum()
278 }
279
280 pub fn output_file_size(&self) -> u64 {
281 self.files_to_add.iter().map(|f| f.file_size).sum()
282 }
283}
284
285#[async_trait::async_trait]
287pub trait Compactor: Send + Sync + 'static {
288 async fn merge_ssts(
290 &self,
291 compaction_region: &CompactionRegion,
292 picker_output: PickerOutput,
293 ) -> Result<MergeOutput>;
294
295 async fn update_manifest(
297 &self,
298 compaction_region: &CompactionRegion,
299 merge_output: MergeOutput,
300 ) -> Result<RegionEdit>;
301
302 async fn compact(
304 &self,
305 compaction_region: &CompactionRegion,
306 compact_request_options: compact_request::Options,
307 ) -> Result<()>;
308}
309
310pub struct DefaultCompactor;
312
313#[async_trait::async_trait]
314impl Compactor for DefaultCompactor {
315 async fn merge_ssts(
316 &self,
317 compaction_region: &CompactionRegion,
318 mut picker_output: PickerOutput,
319 ) -> Result<MergeOutput> {
320 let mut futs = Vec::with_capacity(picker_output.outputs.len());
321 let mut compacted_inputs =
322 Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum());
323 let internal_parallelism = compaction_region.max_parallelism.max(1);
324
325 for output in picker_output.outputs.drain(..) {
326 compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone()));
327 let write_opts = WriteOptions {
328 write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
329 max_file_size: picker_output.max_file_size,
330 ..Default::default()
331 };
332
333 let region_metadata = compaction_region.region_metadata.clone();
334 let sst_layer = compaction_region.access_layer.clone();
335 let region_id = compaction_region.region_id;
336 let cache_manager = compaction_region.cache_manager.clone();
337 let storage = compaction_region.region_options.storage.clone();
338 let index_options = compaction_region
339 .current_version
340 .options
341 .index_options
342 .clone();
343 let append_mode = compaction_region.current_version.options.append_mode;
344 let merge_mode = compaction_region.current_version.options.merge_mode();
345 let flat_format = compaction_region
346 .engine_config
347 .enable_experimental_flat_format;
348 let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
349 let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
350 let bloom_filter_index_config =
351 compaction_region.engine_config.bloom_filter_index.clone();
352 let max_sequence = output
353 .inputs
354 .iter()
355 .map(|f| f.meta_ref().sequence)
356 .max()
357 .flatten();
358 let region_metadata_for_filemeta = region_metadata.clone();
359 futs.push(async move {
360 let input_file_names = output
361 .inputs
362 .iter()
363 .map(|f| f.file_id().to_string())
364 .join(",");
365 let builder = CompactionSstReaderBuilder {
366 metadata: region_metadata.clone(),
367 sst_layer: sst_layer.clone(),
368 cache: cache_manager.clone(),
369 inputs: &output.inputs,
370 append_mode,
371 filter_deleted: output.filter_deleted,
372 time_range: output.output_time_range,
373 merge_mode,
374 };
375 let source = if flat_format {
376 let reader = builder.build_flat_sst_reader().await?;
377 either::Right(FlatSource::Stream(reader))
378 } else {
379 let reader = builder.build_sst_reader().await?;
380 either::Left(Source::Reader(reader))
381 };
382 let (sst_infos, metrics) = sst_layer
383 .write_sst(
384 SstWriteRequest {
385 op_type: OperationType::Compact,
386 metadata: region_metadata,
387 source,
388 cache_manager,
389 storage,
390 max_sequence: max_sequence.map(NonZero::get),
391 index_options,
392 inverted_index_config,
393 fulltext_index_config,
394 bloom_filter_index_config,
395 },
396 &write_opts,
397 WriteType::Compaction,
398 )
399 .await?;
400 let partition_expr = match ®ion_metadata_for_filemeta.partition_expr {
402 None => None,
403 Some(json_str) if json_str.is_empty() => None,
404 Some(json_str) => {
405 PartitionExpr::from_json_str(json_str).with_context(|_| {
406 InvalidPartitionExprSnafu {
407 expr: json_str.clone(),
408 }
409 })?
410 }
411 };
412
413 let output_files = sst_infos
414 .into_iter()
415 .map(|sst_info| FileMeta {
416 region_id,
417 file_id: sst_info.file_id,
418 time_range: sst_info.time_range,
419 level: output.output_level,
420 file_size: sst_info.file_size,
421 available_indexes: sst_info.index_metadata.build_available_indexes(),
422 index_file_size: sst_info.index_metadata.file_size,
423 num_rows: sst_info.num_rows as u64,
424 num_row_groups: sst_info.num_row_groups,
425 sequence: max_sequence,
426 partition_expr: partition_expr.clone(),
427 })
428 .collect::<Vec<_>>();
429 let output_file_names =
430 output_files.iter().map(|f| f.file_id.to_string()).join(",");
431 info!(
432 "Region {} compaction inputs: [{}], outputs: [{}], metrics: {:?}",
433 region_id, input_file_names, output_file_names, metrics
434 );
435 metrics.observe();
436 Ok(output_files)
437 });
438 }
439 let mut output_files = Vec::with_capacity(futs.len());
440 while !futs.is_empty() {
441 let mut task_chunk = Vec::with_capacity(internal_parallelism);
442 for _ in 0..internal_parallelism {
443 if let Some(task) = futs.pop() {
444 task_chunk.push(common_runtime::spawn_compact(task));
445 }
446 }
447 let metas = futures::future::try_join_all(task_chunk)
448 .await
449 .context(JoinSnafu)?
450 .into_iter()
451 .collect::<Result<Vec<Vec<_>>>>()?;
452 output_files.extend(metas.into_iter().flatten());
453 }
454
455 let mut inputs: Vec<_> = compacted_inputs.into_iter().collect();
456 inputs.extend(
457 picker_output
458 .expired_ssts
459 .iter()
460 .map(|f| f.meta_ref().clone()),
461 );
462
463 Ok(MergeOutput {
464 files_to_add: output_files,
465 files_to_remove: inputs,
466 compaction_time_window: Some(picker_output.time_window_size),
467 })
468 }
469
470 async fn update_manifest(
471 &self,
472 compaction_region: &CompactionRegion,
473 merge_output: MergeOutput,
474 ) -> Result<RegionEdit> {
475 let edit = RegionEdit {
477 files_to_add: merge_output.files_to_add,
478 files_to_remove: merge_output.files_to_remove,
479 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
481 compaction_time_window: merge_output
482 .compaction_time_window
483 .map(|seconds| Duration::from_secs(seconds as u64)),
484 flushed_entry_id: None,
485 flushed_sequence: None,
486 committed_sequence: None,
487 };
488
489 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
490 compaction_region
492 .manifest_ctx
493 .update_manifest(RegionLeaderState::Writable, action_list)
494 .await?;
495
496 Ok(edit)
497 }
498
499 async fn compact(
502 &self,
503 compaction_region: &CompactionRegion,
504 compact_request_options: compact_request::Options,
505 ) -> Result<()> {
506 let picker_output = {
507 let picker_output = new_picker(
508 &compact_request_options,
509 &compaction_region.region_options.compaction,
510 compaction_region.region_options.append_mode,
511 )
512 .pick(compaction_region);
513
514 if let Some(picker_output) = picker_output {
515 picker_output
516 } else {
517 info!(
518 "No files to compact for region_id: {}",
519 compaction_region.region_id
520 );
521 return Ok(());
522 }
523 };
524
525 let merge_output = self.merge_ssts(compaction_region, picker_output).await?;
526 if merge_output.is_empty() {
527 info!(
528 "No files to compact for region_id: {}",
529 compaction_region.region_id
530 );
531 return Ok(());
532 }
533
534 metrics::COMPACTION_INPUT_BYTES.inc_by(merge_output.input_file_size() as f64);
535 metrics::COMPACTION_OUTPUT_BYTES.inc_by(merge_output.output_file_size() as f64);
536 self.update_manifest(compaction_region, merge_output)
537 .await?;
538
539 Ok(())
540 }
541}